Merge branch 'develop' into fork_develop

This commit is contained in:
Dniel97 2023-04-10 18:35:43 +02:00
commit 7fdb3e8222
Signed by untrusted user: Dniel97
GPG Key ID: 6180B3C768FB2E08
21 changed files with 972 additions and 74 deletions

View File

@ -2,5 +2,19 @@ server:
enable: True
loglevel: "info"
team:
name: ARTEMiS
mods:
use_login_bonus: True
version:
11:
rom: 2.00.00
data: 2.00.00
12:
rom: 2.05.00
data: 2.05.00
crypto:
encrypted_only: False

View File

@ -6,3 +6,4 @@ server:
port_stun: 9001
port_turn: 9002
port_admission: 9003
auto_register: True

View File

@ -95,6 +95,7 @@ class HttpDispatcher(resource.Resource):
)
def render_GET(self, request: Request) -> bytes:
self.logger.debug(request.uri)
test = self.map_get.match(request.uri.decode())
client_ip = Utils.get_ip_addr(request)

View File

@ -7,4 +7,4 @@ index = ChuniServlet
database = ChuniData
reader = ChuniReader
game_codes = [ChuniConstants.GAME_CODE, ChuniConstants.GAME_CODE_NEW]
current_schema_version = 1
current_schema_version = 3

View File

@ -23,7 +23,98 @@ class ChuniBase:
self.version = ChuniConstants.VER_CHUNITHM
def handle_game_login_api_request(self, data: Dict) -> Dict:
# self.data.base.log_event("chuni", "login", logging.INFO, {"version": self.version, "user": data["userId"]})
"""
Handles the login bonus logic, required for the game because
getUserLoginBonus gets called after getUserItem and therefore the
items needs to be inserted in the database before they get requested.
Adds a bonusCount after a user logged in after 24 hours, makes sure
loginBonus 30 gets looped, only show the login banner every 24 hours,
adds the bonus to items (itemKind 6)
"""
# ignore the login bonus if disabled in config
if not self.game_cfg.mods.use_login_bonus:
return {"returnCode": 1}
user_id = data["userId"]
login_bonus_presets = self.data.static.get_login_bonus_presets(self.version)
for preset in login_bonus_presets:
# check if a user already has some pogress and if not add the
# login bonus entry
user_login_bonus = self.data.item.get_login_bonus(
user_id, self.version, preset["id"]
)
if user_login_bonus is None:
self.data.item.put_login_bonus(user_id, self.version, preset["id"])
# yeah i'm lazy
user_login_bonus = self.data.item.get_login_bonus(
user_id, self.version, preset["id"]
)
# skip the login bonus entirely if its already finished
if user_login_bonus["isFinished"]:
continue
# make sure the last login is more than 24 hours ago
if user_login_bonus["lastUpdateDate"] < datetime.now() - timedelta(
hours=24
):
# increase the login day counter and update the last login date
bonus_count = user_login_bonus["bonusCount"] + 1
last_update_date = datetime.now()
all_login_boni = self.data.static.get_login_bonus(
self.version, preset["id"]
)
# skip the current bonus preset if no boni were found
if all_login_boni is None or len(all_login_boni) < 1:
self.logger.warn(
f"No bonus entries found for bonus preset {preset['id']}"
)
continue
max_needed_days = all_login_boni[0]["needLoginDayCount"]
# make sure to not show login boni after all days got redeemed
is_finished = False
if bonus_count > max_needed_days:
# assume that all login preset ids under 3000 needs to be
# looped, like 30 and 40 are looped, 40 does not work?
if preset["id"] < 3000:
bonus_count = 1
else:
is_finished = True
# grab the item for the corresponding day
login_item = self.data.static.get_login_bonus_by_required_days(
self.version, preset["id"], bonus_count
)
if login_item is not None:
# now add the present to the database so the
# handle_get_user_item_api_request can grab them
self.data.item.put_item(
user_id,
{
"itemId": login_item["presentId"],
"itemKind": 6,
"stock": login_item["itemNum"],
"isValid": True,
},
)
self.data.item.put_login_bonus(
user_id,
self.version,
preset["id"],
bonusCount=bonus_count,
lastUpdateDate=last_update_date,
isWatched=False,
isFinished=is_finished,
)
return {"returnCode": 1}
def handle_game_logout_api_request(self, data: Dict) -> Dict:
@ -130,7 +221,7 @@ class ChuniBase:
return {
"userId": data["userId"],
"length": len(activity_list),
"kind": data["kind"],
"kind": int(data["kind"]),
"userActivityList": activity_list,
}
@ -309,26 +400,29 @@ class ChuniBase:
}
def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict:
"""
Unsure how to get this to trigger...
"""
user_id = data["userId"]
user_login_bonus = self.data.item.get_all_login_bonus(user_id, self.version)
# ignore the loginBonus request if its disabled in config
if user_login_bonus is None or not self.game_cfg.mods.use_login_bonus:
return {"userId": user_id, "length": 0, "userLoginBonusList": []}
user_login_list = []
for bonus in user_login_bonus:
user_login_list.append(
{
"presetId": bonus["presetId"],
"bonusCount": bonus["bonusCount"],
"lastUpdateDate": datetime.strftime(
bonus["lastUpdateDate"], "%Y-%m-%d %H:%M:%S"
),
"isWatched": bonus["isWatched"],
}
)
return {
"userId": data["userId"],
"length": 2,
"userLoginBonusList": [
{
"presetId": "10",
"bonusCount": "0",
"lastUpdateDate": "1970-01-01 09:00:00",
"isWatched": "true",
},
{
"presetId": "20",
"bonusCount": "0",
"lastUpdateDate": "1970-01-01 09:00:00",
"isWatched": "true",
},
],
"userId": user_id,
"length": len(user_login_list),
"userLoginBonusList": user_login_list,
}
def handle_get_user_map_api_request(self, data: Dict) -> Dict:
@ -451,13 +545,13 @@ class ChuniBase:
"playerLevel": profile["playerLevel"],
"rating": profile["rating"],
"headphone": profile["headphone"],
"chargeState": "1",
"chargeState": 1,
"userNameEx": profile["userName"],
}
def handle_get_user_recent_rating_api_request(self, data: Dict) -> Dict:
recet_rating_list = self.data.profile.get_profile_recent_rating(data["userId"])
if recet_rating_list is None:
recent_rating_list = self.data.profile.get_profile_recent_rating(data["userId"])
if recent_rating_list is None:
return {
"userId": data["userId"],
"length": 0,
@ -466,8 +560,8 @@ class ChuniBase:
return {
"userId": data["userId"],
"length": len(recet_rating_list["recentRating"]),
"userRecentRatingList": recet_rating_list["recentRating"],
"length": len(recent_rating_list["recentRating"]),
"userRecentRatingList": recent_rating_list["recentRating"],
}
def handle_get_user_region_api_request(self, data: Dict) -> Dict:
@ -479,9 +573,25 @@ class ChuniBase:
}
def handle_get_user_team_api_request(self, data: Dict) -> Dict:
# TODO: Team
# TODO: use the database "chuni_profile_team" with a GUI
team_name = self.game_cfg.team.team_name
if team_name == "":
return {"userId": data["userId"], "teamId": 0}
return {
"userId": data["userId"],
"teamId": 1,
"teamRank": 1,
"teamName": team_name,
"userTeamPoint": {
"userId": data["userId"],
"teamId": 1,
"orderId": 1,
"teamPoint": 1,
"aggrDate": data["playDate"],
},
}
def handle_get_team_course_setting_api_request(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
@ -580,9 +690,18 @@ class ChuniBase:
for emoney in upsert["userEmoneyList"]:
self.data.profile.put_profile_emoney(user_id, emoney)
if "userLoginBonusList" in upsert:
for login in upsert["userLoginBonusList"]:
self.data.item.put_login_bonus(
user_id, self.version, login["presetId"], isWatched=True
)
return {"returnCode": "1"}
def handle_upsert_user_chargelog_api_request(self, data: Dict) -> Dict:
# add tickets after they got bought, this makes sure the tickets are
# still valid after an unsuccessful logout
self.data.profile.put_profile_charge(data["userId"], data["userCharge"])
return {"returnCode": "1"}
def handle_upsert_client_bookkeeping_api_request(self, data: Dict) -> Dict:
@ -603,7 +722,5 @@ class ChuniBase:
def handle_get_user_net_battle_data_api_request(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"userNetBattleData": {
"recentNBSelectMusicList": []
}
"userNetBattleData": {"recentNBSelectMusicList": []},
}

View File

@ -21,6 +21,42 @@ class ChuniServerConfig:
)
class ChuniTeamConfig:
def __init__(self, parent_config: "ChuniConfig") -> None:
self.__config = parent_config
@property
def team_name(self) -> str:
return CoreConfig.get_config_field(
self.__config, "chuni", "team", "name", default=""
)
class ChuniModsConfig:
def __init__(self, parent_config: "ChuniConfig") -> None:
self.__config = parent_config
@property
def use_login_bonus(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "chuni", "mods", "use_login_bonus", default=True
)
class ChuniVersionConfig:
def __init__(self, parent_config: "ChuniConfig") -> None:
self.__config = parent_config
def version(self, version: int) -> Dict:
"""
in the form of:
11: {"rom": 2.00.00, "data": 2.00.00}
"""
return CoreConfig.get_config_field(
self.__config, "chuni", "version", default={}
)[version]
class ChuniCryptoConfig:
def __init__(self, parent_config: "ChuniConfig") -> None:
self.__config = parent_config
@ -46,4 +82,7 @@ class ChuniCryptoConfig:
class ChuniConfig(dict):
def __init__(self) -> None:
self.server = ChuniServerConfig(self)
self.team = ChuniTeamConfig(self)
self.mods = ChuniModsConfig(self)
self.version = ChuniVersionConfig(self)
self.crypto = ChuniCryptoConfig(self)

View File

@ -49,8 +49,8 @@ class ChuniNew(ChuniBase):
"matchEndTime": match_end,
"matchTimeLimit": 99,
"matchErrorLimit": 9999,
"romVersion": "2.00.00",
"dataVersion": "2.00.00",
"romVersion": self.game_cfg.version.version(self.version)["rom"],
"dataVersion": self.game_cfg.version.version(self.version)["data"],
"matchingUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"matchingUriX": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"udpHolePunchUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
@ -269,10 +269,10 @@ class ChuniNew(ChuniBase):
tmp = user_print_list[x]._asdict()
print_list.append(tmp["cardId"])
if len(user_print_list) >= max_ct:
if len(print_list) >= max_ct:
break
if len(user_print_list) >= max_ct:
if len(print_list) >= max_ct:
next_idx = next_idx + max_ct
else:
next_idx = -1
@ -454,9 +454,7 @@ class ChuniNew(ChuniBase):
# set the card print state to success and use the orderId as the key
self.data.item.put_user_print_state(
user_id,
id=upsert["orderId"],
hasCompleted=True
user_id, id=upsert["orderId"], hasCompleted=True
)
return {"returnCode": "1", "apiName": "CMUpsertUserPrintSubtractApi"}
@ -467,10 +465,6 @@ class ChuniNew(ChuniBase):
# set the card print state to success and use the orderId as the key
for order_id in order_ids:
self.data.item.put_user_print_state(
user_id,
id=order_id,
hasCompleted=True
)
self.data.item.put_user_print_state(user_id, id=order_id, hasCompleted=True)
return {"returnCode": "1", "apiName": "CMUpsertUserPrintCancelApi"}

View File

@ -1,6 +1,4 @@
from datetime import datetime, timedelta
from typing import Dict, Any
import pytz
from core.config import CoreConfig
from titles.chuni.new import ChuniNew
@ -15,8 +13,8 @@ class ChuniNewPlus(ChuniNew):
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["romVersion"] = "2.05.00"
ret["gameSetting"]["dataVersion"] = "2.05.00"
ret["gameSetting"]["romVersion"] = self.game_cfg.version.version(self.version)["rom"]
ret["gameSetting"]["dataVersion"] = self.game_cfg.version.version(self.version)["data"]
ret["gameSetting"][
"matchingUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"

View File

@ -42,6 +42,80 @@ class ChuniReader(BaseReader):
self.read_music(f"{dir}/music")
self.read_charges(f"{dir}/chargeItem")
self.read_avatar(f"{dir}/avatarAccessory")
self.read_login_bonus(f"{dir}/")
def read_login_bonus(self, root_dir: str) -> None:
for root, dirs, files in walk(f"{root_dir}loginBonusPreset"):
for dir in dirs:
if path.exists(f"{root}/{dir}/LoginBonusPreset.xml"):
with open(f"{root}/{dir}/LoginBonusPreset.xml", "rb") as fp:
bytedata = fp.read()
strdata = bytedata.decode("UTF-8")
xml_root = ET.fromstring(strdata)
for name in xml_root.findall("name"):
id = name.find("id").text
name = name.find("str").text
is_enabled = (
True if xml_root.find("disableFlag").text == "false" else False
)
result = self.data.static.put_login_bonus_preset(
self.version, id, name, is_enabled
)
if result is not None:
self.logger.info(f"Inserted login bonus preset {id}")
else:
self.logger.warn(f"Failed to insert login bonus preset {id}")
for bonus in xml_root.find("infos").findall("LoginBonusDataInfo"):
for name in bonus.findall("loginBonusName"):
bonus_id = name.find("id").text
bonus_name = name.find("str").text
if path.exists(
f"{root_dir}/loginBonus/loginBonus{bonus_id}/LoginBonus.xml"
):
with open(
f"{root_dir}/loginBonus/loginBonus{bonus_id}/LoginBonus.xml",
"rb",
) as fp:
bytedata = fp.read()
strdata = bytedata.decode("UTF-8")
bonus_root = ET.fromstring(strdata)
for present in bonus_root.findall("present"):
present_id = present.find("id").text
present_name = present.find("str").text
item_num = int(bonus_root.find("itemNum").text)
need_login_day_count = int(
bonus_root.find("needLoginDayCount").text
)
login_bonus_category_type = int(
bonus_root.find("loginBonusCategoryType").text
)
result = self.data.static.put_login_bonus(
self.version,
id,
bonus_id,
bonus_name,
present_id,
present_name,
item_num,
need_login_day_count,
login_bonus_category_type,
)
if result is not None:
self.logger.info(f"Inserted login bonus {bonus_id}")
else:
self.logger.warn(
f"Failed to insert login bonus {bonus_id}"
)
def read_events(self, evt_dir: str) -> None:
for root, dirs, files in walk(evt_dir):

View File

@ -184,8 +184,73 @@ print_detail = Table(
mysql_charset="utf8mb4",
)
login_bonus = Table(
"chuni_item_login_bonus",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("version", Integer, nullable=False),
Column("presetId", Integer, nullable=False),
Column("bonusCount", Integer, nullable=False, server_default="0"),
Column("lastUpdateDate", TIMESTAMP, server_default="2018-01-01 00:00:00.0"),
Column("isWatched", Boolean, server_default="0"),
Column("isFinished", Boolean, server_default="0"),
UniqueConstraint("version", "user", "presetId", name="chuni_item_login_bonus_uk"),
mysql_charset="utf8mb4",
)
class ChuniItemData(BaseData):
def put_login_bonus(
self, user_id: int, version: int, preset_id: int, **login_bonus_data
) -> Optional[int]:
sql = insert(login_bonus).values(
version=version, user=user_id, presetId=preset_id, **login_bonus_data
)
conflict = sql.on_duplicate_key_update(presetId=preset_id, **login_bonus_data)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def get_all_login_bonus(
self, user_id: int, version: int, is_finished: bool = False
) -> Optional[List[Row]]:
sql = login_bonus.select(
and_(
login_bonus.c.version == version,
login_bonus.c.user == user_id,
login_bonus.c.isFinished == is_finished,
)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def get_login_bonus(
self, user_id: int, version: int, preset_id: int
) -> Optional[Row]:
sql = login_bonus.select(
and_(
login_bonus.c.version == version,
login_bonus.c.user == user_id,
login_bonus.c.presetId == preset_id,
)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def put_character(self, user_id: int, character_data: Dict) -> Optional[int]:
character_data["user"] = user_id
@ -335,7 +400,7 @@ class ChuniItemData(BaseData):
sql = print_state.select(
and_(
print_state.c.user == aime_id,
print_state.c.hasCompleted == has_completed
print_state.c.hasCompleted == has_completed,
)
)
@ -351,7 +416,7 @@ class ChuniItemData(BaseData):
and_(
print_state.c.user == aime_id,
print_state.c.gachaId == gacha_id,
print_state.c.hasCompleted == has_completed
print_state.c.hasCompleted == has_completed,
)
)
@ -380,9 +445,7 @@ class ChuniItemData(BaseData):
user=aime_id, serialId=serial_id, **user_print_data
)
conflict = sql.on_duplicate_key_update(
user=aime_id, **user_print_data
)
conflict = sql.on_duplicate_key_update(user=aime_id, **user_print_data)
result = self.execute(conflict)
if result is None:

View File

@ -558,8 +558,10 @@ class ChuniProfileData(BaseData):
return result.lastrowid
def get_profile_activity(self, aime_id: int, kind: int) -> Optional[List[Row]]:
sql = select(activity).where(
and_(activity.c.user == aime_id, activity.c.kind == kind)
sql = (
select(activity)
.where(and_(activity.c.user == aime_id, activity.c.kind == kind))
.order_by(activity.c.sortNumber.desc()) # to get the last played track
)
result = self.execute(sql)

View File

@ -122,8 +122,148 @@ gacha_cards = Table(
mysql_charset="utf8mb4",
)
login_bonus_preset = Table(
"chuni_static_login_bonus_preset",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column("presetName", String(255), nullable=False),
Column("isEnabled", Boolean, server_default="1"),
UniqueConstraint("version", "id", name="chuni_static_login_bonus_preset_uk"),
mysql_charset="utf8mb4",
)
login_bonus = Table(
"chuni_static_login_bonus",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column(
"presetId",
ForeignKey(
"chuni_static_login_bonus_preset.id",
ondelete="cascade",
onupdate="cascade",
),
nullable=False,
),
Column("loginBonusId", Integer, nullable=False),
Column("loginBonusName", String(255), nullable=False),
Column("presentId", Integer, nullable=False),
Column("presentName", String(255), nullable=False),
Column("itemNum", Integer, nullable=False),
Column("needLoginDayCount", Integer, nullable=False),
Column("loginBonusCategoryType", Integer, nullable=False),
UniqueConstraint(
"version", "presetId", "loginBonusId", name="chuni_static_login_bonus_uk"
),
mysql_charset="utf8mb4",
)
class ChuniStaticData(BaseData):
def put_login_bonus(
self,
version: int,
preset_id: int,
login_bonus_id: int,
login_bonus_name: str,
present_id: int,
present_ame: str,
item_num: int,
need_login_day_count: int,
login_bonus_category_type: int,
) -> Optional[int]:
sql = insert(login_bonus).values(
version=version,
presetId=preset_id,
loginBonusId=login_bonus_id,
loginBonusName=login_bonus_name,
presentId=present_id,
presentName=present_ame,
itemNum=item_num,
needLoginDayCount=need_login_day_count,
loginBonusCategoryType=login_bonus_category_type,
)
conflict = sql.on_duplicate_key_update(
loginBonusName=login_bonus_name,
presentName=present_ame,
itemNum=item_num,
needLoginDayCount=need_login_day_count,
loginBonusCategoryType=login_bonus_category_type,
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def get_login_bonus(
self, version: int, preset_id: int,
) -> Optional[List[Row]]:
sql = login_bonus.select(
and_(
login_bonus.c.version == version,
login_bonus.c.presetId == preset_id,
)
).order_by(login_bonus.c.needLoginDayCount.desc())
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def get_login_bonus_by_required_days(
self, version: int, preset_id: int, need_login_day_count: int
) -> Optional[Row]:
sql = login_bonus.select(
and_(
login_bonus.c.version == version,
login_bonus.c.presetId == preset_id,
login_bonus.c.needLoginDayCount == need_login_day_count,
)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def put_login_bonus_preset(
self, version: int, preset_id: int, preset_name: str, is_enabled: bool
) -> Optional[int]:
sql = insert(login_bonus_preset).values(
id=preset_id,
version=version,
presetName=preset_name,
isEnabled=is_enabled,
)
conflict = sql.on_duplicate_key_update(
presetName=preset_name, isEnabled=is_enabled
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def get_login_bonus_presets(
self, version: int, is_enabled: bool = True
) -> Optional[List[Row]]:
sql = login_bonus_preset.select(
and_(
login_bonus_preset.c.version == version,
login_bonus_preset.c.isEnabled == is_enabled,
)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def put_event(
self, version: int, event_id: int, type: int, name: str
) -> Optional[int]:
@ -390,20 +530,17 @@ class ChuniStaticData(BaseData):
return None
return result.fetchall()
def get_gacha_card_by_character(self, gacha_id: int, chara_id: int) -> Optional[Dict]:
def get_gacha_card_by_character(
self, gacha_id: int, chara_id: int
) -> Optional[Dict]:
sql_sub = (
select(cards.c.cardId)
.filter(
cards.c.charaId == chara_id
)
.scalar_subquery()
select(cards.c.cardId).filter(cards.c.charaId == chara_id).scalar_subquery()
)
# Perform the main query, also rename the resulting column to ranking
sql = gacha_cards.select(and_(
gacha_cards.c.gachaId == gacha_id,
gacha_cards.c.cardId == sql_sub
))
sql = gacha_cards.select(
and_(gacha_cards.c.gachaId == gacha_id, gacha_cards.c.cardId == sql_sub)
)
result = self.execute(sql)
if result is None:

View File

@ -1,10 +1,13 @@
from datetime import datetime, timedelta
import json, logging
from typing import Any, Dict
import random
from core.config import CoreConfig
from titles.pokken.config import PokkenConfig
from titles.pokken.proto import jackal_pb2
from core.data import Data
from core import CoreConfig
from .config import PokkenConfig
from .proto import jackal_pb2
from .database import PokkenData
class PokkenBase:
@ -13,6 +16,7 @@ class PokkenBase:
self.game_cfg = game_cfg
self.version = 0
self.logger = logging.getLogger("pokken")
self.data = PokkenData(core_cfg)
def handle_noop(self, request: Any) -> bytes:
res = jackal_pb2.Response()
@ -95,7 +99,7 @@ class PokkenBase:
res.type = jackal_pb2.MessageType.LOAD_CLIENT_SETTINGS
settings = jackal_pb2.LoadClientSettingsResponseData()
settings.money_magnification = 0
settings.money_magnification = 1
settings.continue_bonus_exp = 100
settings.continue_fight_money = 100
settings.event_bonus_exp = 100
@ -123,6 +127,159 @@ class PokkenBase:
ranking.event_end = True
ranking.modify_date = int(datetime.now().timestamp() / 1000)
res.load_ranking.CopyFrom(ranking)
return res.SerializeToString()
def handle_load_user(self, request: jackal_pb2.Request) -> bytes:
res = jackal_pb2.Response()
res.result = 1
res.type = jackal_pb2.MessageType.LOAD_USER
access_code = request.load_user.access_code
load_usr = jackal_pb2.LoadUserResponseData()
user_id = self.data.card.get_user_id_from_card(access_code)
if user_id is None and self.game_cfg.server.auto_register:
user_id = self.data.user.create_user()
card_id = self.data.card.create_card(user_id, access_code)
self.logger.info(f"Register new card {access_code} (UserId {user_id}, CardId {card_id})")
elif user_id is None:
self.logger.info(f"Registration of card {access_code} blocked!")
res.load_user.CopyFrom(load_usr)
return res.SerializeToString()
"""
TODO: Add repeated values
tutorial_progress_flag
rankmatch_progress
support_pokemon_list
support_set_1
support_set_2
support_set_3
aid_skill_list
achievement_flag
pokemon_data
event_achievement_flag
event_achievement_param
"""
profile = self.data.profile.get_profile(user_id)
load_usr.commidserv_result = 1
load_usr.load_hash = 1
load_usr.cardlock_status = False
load_usr.banapass_id = user_id
load_usr.access_code = access_code
load_usr.precedent_release_flag = 0xFFFFFFFF
if profile is None:
profile_id = self.data.profile.create_profile(user_id)
profile_dict = {'id': profile_id, 'user': user_id}
pokemon_data = []
tutorial_progress = []
rankmatch_progress = []
achievement_flag = []
event_achievement_flag = []
event_achievement_param = []
load_usr.new_card_flag = True
else:
profile_dict = { k: v for k, v in profile._asdict().items() if v is not None }
self.logger.info(f"Card-in user {user_id} (Trainer name {profile_dict.get('trainer_name', '')})")
pokemon_data = self.data.profile.get_all_pokemon_data(user_id)
tutorial_progress = []
rankmatch_progress = []
achievement_flag = []
event_achievement_flag = []
event_achievement_param = []
load_usr.new_card_flag = False
load_usr.navi_newbie_flag = profile_dict.get('navi_newbie_flag', True)
load_usr.navi_enable_flag = profile_dict.get('navi_enable_flag', True)
load_usr.pad_vibrate_flag = profile_dict.get('pad_vibrate_flag', True)
load_usr.home_region_code = profile_dict.get('home_region_code', 0)
load_usr.home_loc_name = profile_dict.get('home_loc_name', "")
load_usr.pref_code = profile_dict.get('pref_code', 0)
load_usr.trainer_name = profile_dict.get('trainer_name', "Newb" + str(random.randint(1111,999999)))
load_usr.trainer_rank_point = profile_dict.get('trainer_rank_point', 0)
load_usr.wallet = profile_dict.get('wallet', 0)
load_usr.fight_money = profile_dict.get('fight_money', 0)
load_usr.score_point = profile_dict.get('score_point', 0)
load_usr.grade_max_num = profile_dict.get('grade_max_num', 0)
load_usr.extra_counter = profile_dict.get('extra_counter', 0)
load_usr.total_play_days = profile_dict.get('total_play_days', 0)
load_usr.play_date_time = profile_dict.get('play_date_time', 0)
load_usr.lucky_box_fail_num = profile_dict.get('lucky_box_fail_num', 0)
load_usr.event_reward_get_flag = profile_dict.get('event_reward_get_flag', 0)
load_usr.rank_pvp_all = profile_dict.get('rank_pvp_all', 0)
load_usr.rank_pvp_loc = profile_dict.get('rank_pvp_loc', 0)
load_usr.rank_cpu_all = profile_dict.get('rank_cpu_all', 0)
load_usr.rank_cpu_loc = profile_dict.get('rank_cpu_loc', 0)
load_usr.rank_event = profile_dict.get('rank_event', 0)
load_usr.awake_num = profile_dict.get('awake_num', 0)
load_usr.use_support_num = profile_dict.get('use_support_num', 0)
load_usr.rankmatch_flag = profile_dict.get('rankmatch_flag', 0)
load_usr.rankmatch_max = profile_dict.get('rankmatch_max', 0)
load_usr.rankmatch_success = profile_dict.get('rankmatch_success', 0)
load_usr.beat_num = profile_dict.get('beat_num', 0)
load_usr.title_text_id = profile_dict.get('title_text_id', 0)
load_usr.title_plate_id = profile_dict.get('title_plate_id', 0)
load_usr.title_decoration_id = profile_dict.get('title_decoration_id', 0)
load_usr.navi_trainer = profile_dict.get('navi_trainer', 0)
load_usr.navi_version_id = profile_dict.get('navi_version_id', 0)
load_usr.aid_skill = profile_dict.get('aid_skill', 0)
load_usr.comment_text_id = profile_dict.get('comment_text_id', 0)
load_usr.comment_word_id = profile_dict.get('comment_word_id', 0)
load_usr.latest_use_pokemon = profile_dict.get('latest_use_pokemon', 0)
load_usr.ex_ko_num = profile_dict.get('ex_ko_num', 0)
load_usr.wko_num = profile_dict.get('wko_num', 0)
load_usr.timeup_win_num = profile_dict.get('timeup_win_num', 0)
load_usr.cool_ko_num = profile_dict.get('cool_ko_num', 0)
load_usr.perfect_ko_num = profile_dict.get('perfect_ko_num', 0)
load_usr.record_flag = profile_dict.get('record_flag', 0)
load_usr.site_register_status = profile_dict.get('site_register_status', 0)
load_usr.continue_num = profile_dict.get('continue_num', 0)
load_usr.avatar_body = profile_dict.get('avatar_body', 0)
load_usr.avatar_gender = profile_dict.get('avatar_gender', 0)
load_usr.avatar_background = profile_dict.get('avatar_background', 0)
load_usr.avatar_head = profile_dict.get('avatar_head', 0)
load_usr.avatar_battleglass = profile_dict.get('avatar_battleglass', 0)
load_usr.avatar_face0 = profile_dict.get('avatar_face0', 0)
load_usr.avatar_face1 = profile_dict.get('avatar_face1', 0)
load_usr.avatar_face2 = profile_dict.get('avatar_face2', 0)
load_usr.avatar_bodyall = profile_dict.get('avatar_bodyall', 0)
load_usr.avatar_wear = profile_dict.get('avatar_wear', 0)
load_usr.avatar_accessory = profile_dict.get('avatar_accessory', 0)
load_usr.avatar_stamp = profile_dict.get('avatar_stamp', 0)
load_usr.event_state = profile_dict.get('event_state', 0)
load_usr.event_id = profile_dict.get('event_id', 0)
load_usr.sp_bonus_category_id_1 = profile_dict.get('sp_bonus_category_id_1', 0)
load_usr.sp_bonus_key_value_1 = profile_dict.get('sp_bonus_key_value_1', 0)
load_usr.sp_bonus_category_id_2 = profile_dict.get('sp_bonus_category_id_2', 0)
load_usr.sp_bonus_key_value_2 = profile_dict.get('sp_bonus_key_value_2', 0)
load_usr.last_play_event_id = profile_dict.get('last_play_event_id', 0)
res.load_user.CopyFrom(load_usr)
return res.SerializeToString()
def handle_set_bnpassid_lock(self, data: jackal_pb2.Request) -> bytes:
res = jackal_pb2.Response()
res.result = 1
res.type = jackal_pb2.MessageType.SET_BNPASSID_LOCK
return res.SerializeToString()
def handle_save_user(self, request: jackal_pb2.Request) -> bytes:
res = jackal_pb2.Response()
res.result = 1
res.type = jackal_pb2.MessageType.SAVE_USER
return res.SerializeToString()
def handle_save_ingame_log(self, data: jackal_pb2.Request) -> bytes:
res = jackal_pb2.Response()
res.result = 1
res.type = jackal_pb2.MessageType.SAVE_INGAME_LOG
return res.SerializeToString()
def handle_matching_noop(self, data: Dict = {}, client_ip: str = "127.0.0.1") -> Dict:
return {}

View File

@ -49,6 +49,16 @@ class PokkenServerConfig:
self.__config, "pokken", "server", "port_admission", default=9003
)
@property
def auto_register(self) -> bool:
"""
Automatically register users in `aime_user` on first carding in with pokken
if they don't exist already. Set to false to display an error instead.
"""
return CoreConfig.get_config_field(
self.__config, "pokken", "server", "auto_register", default=True
)
class PokkenConfig(dict):
def __init__(self) -> None:
self.server = PokkenServerConfig(self)

View File

@ -1,7 +1,13 @@
from core.data import Data
from core.config import CoreConfig
from .schema import *
class PokkenData(Data):
def __init__(self, cfg: CoreConfig) -> None:
super().__init__(cfg)
self.profile = PokkenProfileData(cfg, self.session)
self.match = PokkenMatchData(cfg, self.session)
self.item = PokkenItemData(cfg, self.session)
self.static = PokkenStaticData(cfg, self.session)

View File

@ -124,7 +124,6 @@ class PokkenServlet(resource.Resource):
self.logger.debug(pokken_request)
ret = handler(pokken_request)
self.logger.debug(f"Response: {ret}")
return ret
def handle_matching(self, request: Request) -> bytes:

View File

@ -0,0 +1,4 @@
from .profile import PokkenProfileData
from .match import PokkenMatchData
from .item import PokkenItemData
from .static import PokkenStaticData

View File

@ -0,0 +1,27 @@
from typing import Optional, Dict, List
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_, case
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select, update, delete
from sqlalchemy.engine import Row
from sqlalchemy.dialects.mysql import insert
from core.data.schema import BaseData, metadata
item = Table(
'pokken_item',
metadata,
Column('id', Integer, primary_key=True, nullable=False),
Column('user', ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False, unique=True),
Column('category', Integer),
Column('content', Integer),
Column('type', Integer),
UniqueConstraint('user', 'category', 'content', 'type', name='pokken_item_uk'),
mysql_charset="utf8mb4",
)
class PokkenItemData(BaseData):
"""
Items obtained as rewards
"""
pass

View File

@ -0,0 +1,45 @@
from typing import Optional, Dict, List
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_, case
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select, update, delete
from sqlalchemy.engine import Row
from sqlalchemy.dialects.mysql import insert
from core.data.schema import BaseData, metadata
# Pokken sends depressingly little match data...
match_data = Table(
'pokken_match_data',
metadata,
Column('id', Integer, primary_key=True, nullable=False),
Column('user', ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column('play_mode', Integer),
Column('result', Integer),
Column('ex_ko_num', Integer),
Column('wko_num', Integer),
Column('timeup_win_num', Integer),
Column('cool_ko_num', Integer),
Column('perfect_ko_num', Integer),
Column('use_navi', Integer),
Column('use_navi_cloth', Integer),
Column('use_aid_skill', Integer),
Column('play_date', TIMESTAMP),
mysql_charset="utf8mb4",
)
class PokkenMatchData(BaseData):
"""
Match logs
"""
def save_match(self, user_id: int, match_data: Dict) -> Optional[int]:
pass
def get_match(self, match_id: int) -> Optional[Row]:
pass
def get_matches_by_user(self, user_id: int) -> Optional[List[Row]]:
pass
def get_matches(self, limit: int = 20) -> Optional[List[Row]]:
pass

View File

@ -0,0 +1,198 @@
from typing import Optional, Dict, List
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_, case
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select, update, delete
from sqlalchemy.engine import Row
from sqlalchemy.dialects.mysql import insert
from core.data.schema import BaseData, metadata
# Some more of the repeated fields could probably be their own tables, for now I just did the ones that made sense to me
# Having the profile table be this massive kinda blows for updates but w/e, **kwargs to the rescue
profile = Table(
'pokken_profile',
metadata,
Column('id', Integer, primary_key=True, nullable=False),
Column('user', ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False, unique=True),
Column('trainer_name', String(16)), # optional
Column('home_region_code', Integer),
Column('home_loc_name', String(255)),
Column('pref_code', Integer),
Column('navi_newbie_flag', Boolean),
Column('navi_enable_flag', Boolean),
Column('pad_vibrate_flag', Boolean),
Column('trainer_rank_point', Integer),
Column('wallet', Integer),
Column('fight_money', Integer),
Column('score_point', Integer),
Column('grade_max_num', Integer),
Column('extra_counter', Integer), # Optional
Column('total_play_days', Integer),
Column('play_date_time', Integer),
Column('lucky_box_fail_num', Integer),
Column('event_reward_get_flag', Integer),
Column('rank_pvp_all', Integer),
Column('rank_pvp_loc', Integer),
Column('rank_cpu_all', Integer),
Column('rank_cpu_loc', Integer),
Column('rank_event', Integer),
Column('awake_num', Integer),
Column('use_support_num', Integer),
Column('rankmatch_flag', Integer),
Column('rankmatch_max', Integer), # Optional
Column('rankmatch_success', Integer), # Optional
Column('beat_num', Integer), # Optional
Column('title_text_id', Integer),
Column('title_plate_id', Integer),
Column('title_decoration_id', Integer),
Column('support_pokemon_list', JSON), # Repeated, Integer
Column('support_set_1', JSON), # Repeated, Integer
Column('support_set_2', JSON), # Repeated, Integer
Column('support_set_3', JSON), # Repeated, Integer
Column('navi_trainer', Integer),
Column('navi_version_id', Integer),
Column('aid_skill_list', JSON), # Repeated, Integer
Column('aid_skill', Integer),
Column('comment_text_id', Integer),
Column('comment_word_id', Integer),
Column('latest_use_pokemon', Integer),
Column('ex_ko_num', Integer),
Column('wko_num', Integer),
Column('timeup_win_num', Integer),
Column('cool_ko_num', Integer),
Column('cool_ko_num', Integer),
Column('perfect_ko_num', Integer),
Column('record_flag', Integer),
Column('continue_num', Integer),
Column('avatar_body', Integer), # Optional
Column('avatar_gender', Integer), # Optional
Column('avatar_background', Integer), # Optional
Column('avatar_head', Integer), # Optional
Column('avatar_battleglass', Integer), # Optional
Column('avatar_face0', Integer), # Optional
Column('avatar_face1', Integer), # Optional
Column('avatar_face2', Integer), # Optional
Column('avatar_bodyall', Integer), # Optional
Column('avatar_wear', Integer), # Optional
Column('avatar_accessory', Integer), # Optional
Column('avatar_stamp', Integer), # Optional
Column('event_state', Integer),
Column('event_id', Integer),
Column('sp_bonus_category_id_1', Integer),
Column('sp_bonus_key_value_1', Integer),
Column('sp_bonus_category_id_2', Integer),
Column('sp_bonus_key_value_2', Integer),
Column('last_play_event_id', Integer), # Optional
mysql_charset="utf8mb4",
)
tutorial_progress = Table(
'pokken_tutorial_progress',
metadata,
Column('id', Integer, primary_key=True, nullable=False),
Column('user', ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column('flag', Integer),
UniqueConstraint('user', 'flag', name='pokken_tutorial_progress_uk'),
mysql_charset="utf8mb4",
)
rankmatch_progress = Table(
'pokken_rankmatch_progress',
metadata,
Column('id', Integer, primary_key=True, nullable=False),
Column('user', ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column('progress', Integer),
UniqueConstraint('user', 'progress', name='pokken_rankmatch_progress_uk'),
mysql_charset="utf8mb4",
)
achievement_flag = Table(
'pokken_achievement_flag',
metadata,
Column('id', Integer, primary_key=True, nullable=False),
Column('user', ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column('flag', Integer),
UniqueConstraint('user', 'flag', name='pokken_achievement_flag_uk'),
mysql_charset="utf8mb4",
)
event_achievement_flag = Table(
'pokken_event_achievement_flag',
metadata,
Column('id', Integer, primary_key=True, nullable=False),
Column('user', ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column('flag', Integer),
UniqueConstraint('user', 'flag', name='pokken_event_achievement_flag_uk'),
mysql_charset="utf8mb4",
)
event_achievement_param = Table(
'pokken_event_achievement_param',
metadata,
Column('id', Integer, primary_key=True, nullable=False),
Column('user', ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column('param', Integer),
UniqueConstraint('user', 'param', name='pokken_event_achievement_param_uk'),
mysql_charset="utf8mb4",
)
pokemon_data = Table(
'pokken_pokemon_data',
metadata,
Column('id', Integer, primary_key=True, nullable=False),
Column('user', ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column('char_id', Integer, nullable=False),
Column('illustration_book_no', Integer, nullable=False),
Column('pokemon_exp', Integer, nullable=False),
Column('battle_num_vs_wan', Integer, nullable=False),
Column('win_vs_wan', Integer, nullable=False),
Column('battle_num_vs_lan', Integer, nullable=False),
Column('win_vs_lan', Integer, nullable=False),
Column('battle_num_vs_cpu', Integer, nullable=False),
Column('win_cpu', Integer, nullable=False),
Column('battle_all_num_tutorial', Integer, nullable=False),
Column('battle_num_tutorial', Integer, nullable=False),
Column('bp_point_atk', Integer, nullable=False),
Column('bp_point_res', Integer, nullable=False),
Column('bp_point_def', Integer, nullable=False),
Column('bp_point_sp', Integer, nullable=False),
)
class PokkenProfileData(BaseData):
def create_profile(self, user_id: int) -> Optional[int]:
sql = insert(profile).values(user = user_id)
conflict = sql.on_duplicate_key_update(user = user_id)
result = self.execute(conflict)
if result is None:
self.logger.error(f"Failed to create pokken profile for user {user_id}!")
return None
return result.lastrowid
def set_profile_name(self, user_id: int, new_name: str) -> None:
sql = update(profile).where(profile.c.user == user_id).values(trainer_name = new_name)
result = self.execute(sql)
if result is None:
self.logger.error(f"Failed to update pokken profile name for user {user_id}!")
def update_profile(self, user_id: int, profile_data: Dict) -> None:
"""
TODO: Find out what comes in on the SaveUserRequestData protobuf and save it!
"""
pass
def get_profile(self, user_id: int) -> Optional[Row]:
sql = profile.select(profile.c.user == user_id)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def put_pokemon_data(self, user_id: int, pokemon_data: Dict) -> Optional[int]:
pass
def get_pokemon_data(self, user_id: int, pokemon_id: int) -> Optional[Row]:
pass
def get_all_pokemon_data(self, user_id: int) -> Optional[List[Row]]:
pass

View File

@ -0,0 +1,12 @@
from typing import Optional, Dict, List
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_, case
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select, update, delete
from sqlalchemy.engine import Row
from sqlalchemy.dialects.mysql import insert
from core.data.schema import BaseData, metadata
class PokkenStaticData(BaseData):
pass