Initial Card Maker ONGEKI support

This commit is contained in:
2023-03-03 23:46:29 +01:00
parent 8fe0acae93
commit 3acc2dc197
22 changed files with 3582 additions and 13 deletions

View File

@ -1,5 +1,6 @@
from datetime import date, datetime, timedelta
from typing import Any, Dict
from random import randint
import pytz
import json
@ -8,8 +9,8 @@ from titles.ongeki.base import OngekiBase
from titles.ongeki.const import OngekiConstants
from titles.ongeki.config import OngekiConfig
class OngekiBright(OngekiBase):
class OngekiBright(OngekiBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: OngekiConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = OngekiConstants.VER_ONGEKI_BRIGHT
@ -19,3 +20,609 @@ class OngekiBright(OngekiBase):
ret["gameSetting"]["dataVersion"] = "1.30.00"
ret["gameSetting"]["onlineDataVersion"] = "1.30.00"
return ret
def handle_cm_get_user_data_api_request(self, data: Dict) -> Dict:
# first check for a bright memory profile after that check for a
# bright profile
p = (self.data.profile.get_profile_data(data["userId"], self.version+1)
or self.data.profile.get_profile_data(data["userId"], self.version))
if p is None:
return {}
cards = self.data.card.get_user_cards(data["userId"])
if cards is None or len(cards) == 0:
# This should never happen
self.logger.error(
f"handle_get_user_data_api_request: Internal error - No cards found for user id {data['userId']}")
return {}
# get the dict representation of the row so we can modify values
user_data = p._asdict()
# remove the values the game doesn't want
user_data.pop("id")
user_data.pop("user")
user_data.pop("version")
# TODO: replace datetime objects with strings
# add access code that we don't store
user_data["accessCode"] = cards[0]["access_code"]
# hardcode Card Maker version for now
user_data["compatibleCmVersion"] = "1.30.01"
return {"userId": data["userId"], "userData": user_data}
def handle_printer_login_api_request(self, data: Dict):
return {"returnCode": 1}
def handle_printer_logout_api_request(self, data: Dict):
return {"returnCode": 1}
def handle_cm_get_user_card_api_request(self, data: Dict) -> Dict:
user_cards = self.data.item.get_cards(data["userId"])
if user_cards is None:
return {}
max_ct = data["maxCount"]
next_idx = data["nextIndex"]
start_idx = next_idx
end_idx = max_ct + start_idx
if len(user_cards[start_idx:]) > max_ct:
next_idx += max_ct
else:
next_idx = -1
card_list = []
for card in user_cards:
tmp = card._asdict()
tmp.pop("id")
tmp.pop("user")
card_list.append(tmp)
return {
"userId": data["userId"],
"length": len(card_list[start_idx:end_idx]),
"nextIndex": next_idx,
"userCardList": card_list[start_idx:end_idx]
}
def handle_cm_get_user_character_api_request(self, data: Dict) -> Dict:
user_characters = self.data.item.get_characters(data["userId"])
if user_characters is None:
return {}
max_ct = data["maxCount"]
next_idx = data["nextIndex"]
start_idx = next_idx
end_idx = max_ct + start_idx
if len(user_characters[start_idx:]) > max_ct:
next_idx += max_ct
else:
next_idx = -1
character_list = []
for character in user_characters:
tmp = character._asdict()
tmp.pop("id")
tmp.pop("user")
character_list.append(tmp)
return {
"userId": data["userId"],
"length": len(character_list[start_idx:end_idx]),
"nextIndex": next_idx,
"userCharacterList": character_list[start_idx:end_idx]
}
def handle_get_user_gacha_api_request(self, data: Dict) -> Dict:
user_gachas = self.data.item.get_user_gachas(data["userId"])
if user_gachas is None:
return {
"userId": data["userId"],
"length": 0,
"userGachaList": []
}
user_gacha_list = []
for gacha in user_gachas:
tmp = gacha._asdict()
tmp.pop("id")
tmp.pop("user")
tmp["dailyGachaDate"] = datetime.strftime(
tmp["dailyGachaDate"], "%Y-%m-%d")
user_gacha_list.append(tmp)
return {
"userId": data["userId"],
"length": len(user_gacha_list),
"userGachaList": user_gacha_list
}
def handle_cm_get_user_item_api_request(self, data: Dict) -> Dict:
return self.handle_get_user_item_api_request(data)
def handle_cm_get_user_gacha_supply_api_request(self, data: Dict) -> Dict:
# not used for now? not sure what it even does
user_gacha_supplies = self.data.item.get_user_gacha_supplies(
data["userId"])
if user_gacha_supplies is None:
return {
"supplyId": 1,
"length": 0,
"supplyCardList": []
}
supply_list = [gacha["cardId"] for gacha in user_gacha_supplies]
return {
"supplyId": 1,
"length": len(supply_list),
"supplyCardList": supply_list
}
def handle_get_game_gacha_api_request(self, data: Dict) -> Dict:
"""
returns all current active banners (gachas)
"Select Gacha" requires maxSelectPoint set and isCeiling set to 1
"""
game_gachas = []
# for every gacha_id in the OngekiConfig, grab the banner from the db
for gacha_id in self.game_cfg.gachas.enabled_gachas:
game_gacha = self.data.static.get_gacha(self.version, gacha_id)
if game_gacha:
game_gachas.append(game_gacha)
# clean the database rows
game_gacha_list = []
for gacha in game_gachas:
tmp = gacha._asdict()
tmp.pop("id")
tmp.pop("version")
tmp["startDate"] = datetime.strftime(
tmp["startDate"], "%Y-%m-%d %H:%M:%S")
tmp["endDate"] = datetime.strftime(
tmp["endDate"], "%Y-%m-%d %H:%M:%S")
tmp["noticeStartDate"] = datetime.strftime(
tmp["noticeStartDate"], "%Y-%m-%d %H:%M:%S")
tmp["noticeEndDate"] = datetime.strftime(
tmp["noticeEndDate"], "%Y-%m-%d %H:%M:%S")
tmp["convertEndDate"] = datetime.strftime(
tmp["convertEndDate"], "%Y-%m-%d %H:%M:%S")
# make sure to only show gachas for the current version
# so only up to bright, 1140 is the first bright memory gacha
if tmp["gachaId"] < 1140:
game_gacha_list.append(tmp)
return {
"length": len(game_gacha_list),
"gameGachaList": game_gacha_list,
# no clue
"registIdList": []
}
def handle_roll_gacha_api_request(self, data: Dict) -> Dict:
"""
Handle a gacha roll API request
"""
gacha_id = data["gachaId"]
num_rolls = data["times"]
# change_rate is the 5 gacha rool SR gurantee once a week
change_rate = data["changeRate"]
# SSR book which guarantees a SSR card, itemKind=15, itemId=1
book_used = data["bookUseCount"]
if num_rolls not in {1, 5, 11}:
return {}
# https://gamerch.com/ongeki/entry/462978
# 77% chance of gett ing a R card
# 20% chance of getting a SR card
# 3% chance of getting a SSR card
rarity = [1 for _ in range(77)]
rarity += [2 for _ in range(20)]
rarity += [3 for _ in range(3)]
# gachaId 1011 is "無料ガチャ" (free gacha), which requires GatchaTickets
# itemKind=11, itemId=1 and appearenty sucks
# 94% chance of getting a R card
# 5% chance of getting a SR card
# 1% chance of getting a SSR card
if gacha_id == 1011:
rarity = [1 for _ in range(94)]
rarity += [2 for _ in range(5)]
rarity += [3 for _ in range(1)]
# gachaId 1012 is "無料ガチャSR確定" (SR confirmed! free gacha), which
# requires GatchaTickets itemKind=11, itemId=4 and always guarantees
# a SR card or higher
# 92% chance of getting a SR card
# 8% chance of getting a SSR card
elif gacha_id == 1012:
rarity = [2 for _ in range(92)]
rarity += [3 for _ in range(8)]
assert len(rarity) == 100
# uniform distribution to get the rarity of the card
rolls = [rarity[randint(0, len(rarity)-1)] for _ in range(num_rolls)]
# if SSR book used, make sure you always get one SSR
if book_used == 1:
if rolls.count(3) == 0:
# if there is no SSR, re-roll
return self.handle_roll_gacha_api_request(data)
# make sure that 11 rolls always have at least 1 SR or SSR
elif (num_rolls == 5 and change_rate is True) or num_rolls == 11:
if rolls.count(2) == 0 and rolls.count(3) == 0:
# if there is no SR or SSR, re-roll
return self.handle_roll_gacha_api_request(data)
# get a list of cards for each rarity
cards_r = self.data.static.get_cards_by_rarity(self.version, 1)
cards_sr, cards_ssr = [], []
# free gachas are only allowed to get their specific cards! (R irrelevant)
if gacha_id in {1011, 1012}:
gacha_cards = self.data.static.get_gacha_cards(gacha_id)
for card in gacha_cards:
if card["rarity"] == 3:
cards_sr.append({
"cardId": card["cardId"],
"rarity": 2
})
elif card["rarity"] == 4:
cards_ssr.append({
"cardId": card["cardId"],
"rarity": 3
})
else:
cards_sr = self.data.static.get_cards_by_rarity(self.version, 2)
cards_ssr = self.data.static.get_cards_by_rarity(self.version, 3)
# get the promoted cards for that gacha and add them multiple
# times to increase chances by factor chances
chances = 10
gacha_cards = self.data.static.get_gacha_cards(gacha_id)
for card in gacha_cards:
# make sure to add the cards to the corresponding rarity
if card["rarity"] == 2:
cards_r += [{
"cardId": card["cardId"],
"rarity": 1
}] * chances
if card["rarity"] == 3:
cards_sr += [{
"cardId": card["cardId"],
"rarity": 2
}] * chances
elif card["rarity"] == 4:
cards_ssr += [{
"cardId": card["cardId"],
"rarity": 3
}] * chances
# get the card id for each roll
rolled_cards = []
for i in range(len(rolls)):
if rolls[i] == 1:
rolled_cards.append(cards_r[randint(0, len(cards_r)-1)])
elif rolls[i] == 2:
rolled_cards.append(cards_sr[randint(0, len(cards_sr)-1)])
elif rolls[i] == 3:
rolled_cards.append(cards_ssr[randint(0, len(cards_ssr)-1)])
game_gacha_card_list = []
for card in rolled_cards:
game_gacha_card_list.append({
"gachaId": data["gachaId"],
"cardId": card["cardId"],
# +1 because Card Maker is weird
"rarity": card["rarity"] + 1,
"weight": 1,
"isPickup": False,
"isSelect": False
})
return {
"length": len(game_gacha_card_list),
"gameGachaCardList": game_gacha_card_list
}
def handle_cm_upsert_user_gacha_api_request(self, data: Dict):
upsert = data["cmUpsertUserGacha"]
user_id = data["userId"]
gacha_id = data["gachaId"]
gacha_count = data["gachaCnt"]
play_date = datetime.strptime(data["playDate"][:10], '%Y-%m-%d')
select_point = data["selectPoint"]
total_gacha_count, ceiling_gacha_count = 0, 0
daily_gacha_cnt, five_gacha_cnt, eleven_gacha_cnt = 0, 0, 0
daily_gacha_date = datetime.strptime('2000-01-01', '%Y-%m-%d')
# check if the user previously rolled the exact same gacha
user_gacha = self.data.item.get_user_gacha(user_id, gacha_id)
if user_gacha:
total_gacha_count = user_gacha["totalGachaCnt"]
ceiling_gacha_count = user_gacha["ceilingGachaCnt"]
daily_gacha_cnt = user_gacha["dailyGachaCnt"]
five_gacha_cnt = user_gacha["fiveGachaCnt"]
eleven_gacha_cnt = user_gacha["elevenGachaCnt"]
# parse just the year, month and date
daily_gacha_date = user_gacha["dailyGachaDate"]
# if the saved dailyGachaDate is different from the roll,
# reset dailyGachaCnt and change the date
if daily_gacha_date != play_date:
daily_gacha_date = play_date
daily_gacha_cnt = 0
self.data.item.put_user_gacha(
user_id,
gacha_id,
totalGachaCnt=total_gacha_count + gacha_count,
ceilingGachaCnt=ceiling_gacha_count + gacha_count,
selectPoint=select_point,
useSelectPoint=0,
dailyGachaCnt=daily_gacha_cnt + gacha_count,
fiveGachaCnt=five_gacha_cnt+1 if gacha_count == 5 else five_gacha_cnt,
elevenGachaCnt=eleven_gacha_cnt+1 if gacha_count == 11 else eleven_gacha_cnt,
dailyGachaDate=daily_gacha_date
)
if "userData" in upsert and len(upsert["userData"]) > 0:
# check if the profile is a bright memory profile
p = self.data.profile.get_profile_data(data["userId"], self.version+1)
if p is not None:
# save the bright memory profile
self.data.profile.put_profile_data(
user_id, self.version+1, upsert["userData"][0])
else:
# save the bright profile
self.data.profile.put_profile_data(
user_id, self.version, upsert["userData"][0])
if "userCharacterList" in upsert:
for x in upsert["userCharacterList"]:
self.data.item.put_character(user_id, x)
if "userItemList" in upsert:
for x in upsert["userItemList"]:
self.data.item.put_item(user_id, x)
if "userCardList" in upsert:
for x in upsert["userCardList"]:
self.data.item.put_card(user_id, x)
# TODO?
# if "gameGachaCardList" in upsert:
# for x in upsert["gameGachaCardList"]:
return {'returnCode': 1, 'apiName': 'CMUpsertUserGachaApi'}
def handle_cm_upsert_user_select_gacha_api_request(self, data: Dict) -> Dict:
upsert = data["cmUpsertUserSelectGacha"]
user_id = data["userId"]
if "userData" in upsert and len(upsert["userData"]) > 0:
# check if the profile is a bright memory profile
p = self.data.profile.get_profile_data(data["userId"], self.version+1)
if p is not None:
# save the bright memory profile
self.data.profile.put_profile_data(
user_id, self.version+1, upsert["userData"][0])
else:
# save the bright profile
self.data.profile.put_profile_data(
user_id, self.version, upsert["userData"][0])
if "userCharacterList" in upsert:
for x in upsert["userCharacterList"]:
self.data.item.put_character(user_id, x)
if "userCardList" in upsert:
for x in upsert["userCardList"]:
self.data.item.put_card(user_id, x)
if "selectGachaLogList" in data:
for x in data["selectGachaLogList"]:
self.data.item.put_user_gacha(
user_id,
x["gachaId"],
selectPoint=0,
useSelectPoint=x["useSelectPoint"]
)
return {'returnCode': 1, 'apiName': 'cmUpsertUserSelectGacha'}
def handle_get_game_gacha_card_by_id_api_request(self, data: Dict) -> Dict:
game_gacha_cards = self.data.static.get_gacha_cards(data["gachaId"])
if game_gacha_cards == []:
# fallback to be at least able to select that gacha
return {
"gachaId": data["gachaId"],
"length": 6,
"isPickup": False,
"gameGachaCardList": [
{
"gachaId": data["gachaId"],
"cardId": 100984,
"rarity": 4,
"weight": 1,
"isPickup": False,
"isSelect": True
},
{
"gachaId": data["gachaId"],
"cardId": 100997,
"rarity": 3,
"weight": 2,
"isPickup": False,
"isSelect": True
},
{
"gachaId": data["gachaId"],
"cardId": 100998,
"rarity": 3,
"weight": 2,
"isPickup": False,
"isSelect": True
},
{
"gachaId": data["gachaId"],
"cardId": 101020,
"rarity": 2,
"weight": 3,
"isPickup": False,
"isSelect": True
},
{
"gachaId": data["gachaId"],
"cardId": 101021,
"rarity": 2,
"weight": 3,
"isPickup": False,
"isSelect": True
},
{
"gachaId": data["gachaId"],
"cardId": 101022,
"rarity": 2,
"weight": 3,
"isPickup": False,
"isSelect": True
}
],
"emissionList": [],
"afterCalcList": [],
"ssrBookCalcList": []
}
game_gacha_card_list = []
for gacha_card in game_gacha_cards:
tmp = gacha_card._asdict()
tmp.pop("id")
game_gacha_card_list.append(tmp)
return {
"gachaId": data["gachaId"],
"length": len(game_gacha_card_list),
"isPickup": False,
"gameGachaCardList": game_gacha_card_list,
# again no clue
"emissionList": [],
"afterCalcList": [],
"ssrBookCalcList": []
}
def handle_get_game_theater_api_request(self, data: Dict) -> Dict:
"""
shows a banner after every print, not sure what its used for
"""
"""
return {
"length": 1,
"gameTheaterList": [{
"theaterId": 1,
"theaterName": "theaterName",
"startDate": "2018-01-01 00:00:00.0",
"endDate": "2038-01-01 00:00:00.0",
"gameSubTheaterList": [{
"theaterId": 1,
"id": 2,
"no": 4
}]
}
],
"registIdList": []
}
"""
return {
"length": 0,
"gameTheaterList": [],
"registIdList": []
}
def handle_cm_upsert_user_print_playlog_api_request(self, data: Dict) -> Dict:
return {
"returnCode": 1,
"orderId": 0,
"serialId": "11111111111111111111",
"apiName": "CMUpsertUserPrintPlaylogApi"
}
def handle_cm_upsert_user_printlog_api_request(self, data: Dict) -> Dict:
return {
"returnCode": 1,
"orderId": 0,
"serialId": "11111111111111111111",
"apiName": "CMUpsertUserPrintlogApi"
}
def handle_cm_upsert_user_print_api_request(self, data: Dict) -> Dict:
user_print_detail = data["userPrintDetail"]
# generate random serial id
serial_id = ''.join([str(randint(0, 9)) for _ in range(20)])
# not needed because are either zero or unset
user_print_detail.pop("orderId")
user_print_detail.pop("printNumber")
user_print_detail.pop("serialId")
user_print_detail["printDate"] = datetime.strptime(
user_print_detail["printDate"], '%Y-%m-%d'
)
# add the entry to the user print table with the random serialId
self.data.item.put_user_print_detail(
data["userId"],
serial_id,
user_print_detail
)
return {
"returnCode": 1,
"serialId": serial_id,
"apiName": "CMUpsertUserPrintApi"
}
def handle_cm_upsert_user_all_api_request(self, data: Dict) -> Dict:
upsert = data["cmUpsertUserAll"]
user_id = data["userId"]
if "userData" in upsert and len(upsert["userData"]) > 0:
# check if the profile is a bright memory profile
p = self.data.profile.get_profile_data(data["userId"], self.version+1)
if p is not None:
# save the bright memory profile
self.data.profile.put_profile_data(
user_id, self.version+1, upsert["userData"][0])
else:
# save the bright profile
self.data.profile.put_profile_data(
user_id, self.version, upsert["userData"][0])
if "userActivityList" in upsert:
for act in upsert["userActivityList"]:
self.data.profile.put_profile_activity(
user_id, act["kind"], act["id"], act["sortNumber"],
act["param1"], act["param2"], act["param3"], act["param4"])
if "userItemList" in upsert:
for x in upsert["userItemList"]:
self.data.item.put_item(user_id, x)
if "userCardList" in upsert:
for x in upsert["userCardList"]:
self.data.item.put_card(user_id, x)
return {'returnCode': 1, 'apiName': 'cmUpsertUserAll'}

View File

@ -1,17 +1,31 @@
from typing import List
from core.config import CoreConfig
class OngekiServerConfig():
def __init__(self, parent_config: "OngekiConfig") -> None:
self.__config = parent_config
@property
def enable(self) -> bool:
return CoreConfig.get_config_field(self.__config, 'ongeki', 'server', 'enable', default=True)
@property
def loglevel(self) -> int:
return CoreConfig.str_to_loglevel(CoreConfig.get_config_field(self.__config, 'ongeki', 'server', 'loglevel', default="info"))
class OngekiGachaConfig():
def __init__(self, parent_config: "OngekiConfig") -> None:
self.__config = parent_config
@property
def enabled_gachas(self) -> List[int]:
return CoreConfig.get_config_field(self.__config, 'ongeki', 'gachas', 'enabled_gachas', default=[])
class OngekiConfig(dict):
def __init__(self) -> None:
self.server = OngekiServerConfig(self)
self.gachas = OngekiGachaConfig(self)

View File

@ -1,5 +1,7 @@
from typing import Final, Dict
from enum import Enum
class OngekiConstants():
GAME_CODE = "SDDT"
@ -35,6 +37,20 @@ class OngekiConstants():
'SilverJewelEvent',
])
class CM_GACHA_KINDS(Enum):
Normal = 0
Pickup = 1
BonusRestored = 2
Free = 3
PickupBonusRestored = 4
class RARITY_TYPES(Enum):
N = 0
R = 1
SR = 2
SSR = 3
SRPlus = 12
# The game expects the server to give Lunatic an ID of 10, while the game uses 4 internally... except in Music.xml
class DIFF_NAME(Enum):
Basic = 0
@ -48,4 +64,4 @@ class OngekiConstants():
@classmethod
def game_ver_to_string(cls, ver: int):
return cls.VERSION_NAMES[ver]
return cls.VERSION_NAMES[ver]

View File

@ -2,6 +2,7 @@ from typing import Dict, Optional, List
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON
from sqlalchemy.schema import ForeignKey
from sqlalchemy.engine import Row
from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert
@ -242,6 +243,63 @@ tech_event = Table(
mysql_charset='utf8mb4'
)
gacha = Table(
"ongeki_user_gacha",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("gachaId", Integer, nullable=False),
Column("totalGachaCnt", Integer, server_default="0"),
Column("ceilingGachaCnt", Integer, server_default="0"),
Column("selectPoint", Integer, server_default="0"),
Column("useSelectPoint", Integer, server_default="0"),
Column("dailyGachaCnt", Integer, server_default="0"),
Column("fiveGachaCnt", Integer, server_default="0"),
Column("elevenGachaCnt", Integer, server_default="0"),
Column("dailyGachaDate", TIMESTAMP, nullable=False, server_default=func.now()),
UniqueConstraint("user", "gachaId", name="ongeki_user_gacha_uk"),
mysql_charset='utf8mb4'
)
gacha_supply = Table(
"ongeki_user_gacha_supply",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("cardId", Integer, nullable=False),
UniqueConstraint("user", "cardId", name="ongeki_user_gacha_supply_uk"),
mysql_charset='utf8mb4'
)
print_detail = Table(
"ongeki_user_print_detail",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("cardId", Integer, nullable=False),
Column("cardType", Integer, server_default="0"),
Column("printDate", TIMESTAMP, nullable=False),
Column("serialId", String(20), nullable=False),
Column("placeId", Integer, nullable=False),
Column("clientId", String(11), nullable=False),
Column("printerSerialId", String(20), nullable=False),
Column("isHolograph", Boolean, server_default="0"),
Column("isAutographed", Boolean, server_default="0"),
Column("printOption1", Boolean, server_default="1"),
Column("printOption2", Boolean, server_default="1"),
Column("printOption3", Boolean, server_default="1"),
Column("printOption4", Boolean, server_default="1"),
Column("printOption5", Boolean, server_default="1"),
Column("printOption6", Boolean, server_default="1"),
Column("printOption7", Boolean, server_default="1"),
Column("printOption8", Boolean, server_default="1"),
Column("printOption9", Boolean, server_default="1"),
Column("printOption10", Boolean, server_default="0"),
UniqueConstraint("serialId", name="ongeki_user_print_detail_uk"),
mysql_charset='utf8mb4'
)
class OngekiItemData(BaseData):
def put_card(self, aime_id: int, card_data: Dict) -> Optional[int]:
@ -545,7 +603,7 @@ class OngekiItemData(BaseData):
if result is None: return None
return result.fetchall()
def put_memorychapter(self, aime_id: int, memorychapter_data: Dict) -> Optional[int]:
memorychapter_data["user"] = aime_id
@ -557,10 +615,73 @@ class OngekiItemData(BaseData):
self.logger.warn(f"put_memorychapter: Failed to update! aime_id: {aime_id}")
return None
return result.lastrowid
def get_memorychapters(self, aime_id: int) -> Optional[List[Dict]]:
sql = select(memorychapter).where(memorychapter.c.user == aime_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
if result is None:
return None
return result.fetchall()
def get_user_gacha(self, aime_id: int, gacha_id: int) -> Optional[Row]:
sql = gacha.select(and_(
gacha.c.user == aime_id,
gacha.c.gachaId == gacha_id
))
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_user_gachas(self, aime_id: int) -> Optional[List[Row]]:
sql = gacha.select(gacha.c.user == aime_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def get_user_gacha_supplies(self, aime_id: int) -> Optional[List[Row]]:
sql = gacha_supply.select(gacha_supply.c.user == aime_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def put_user_gacha(self, aime_id: int, gacha_id: int, **data) -> Optional[int]:
sql = insert(gacha).values(
user=aime_id,
gachaId=gacha_id,
**data)
conflict = sql.on_duplicate_key_update(
user=aime_id,
gachaId=gacha_id,
**data)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"put_user_gacha: Failed to insert! aime_id: {aime_id}")
return None
return result.lastrowid
def put_user_print_detail(self, aime_id: int, serial_id: str,
user_print_data: Dict) -> Optional[int]:
sql = insert(print_detail).values(
user=aime_id,
serialId=serial_id,
**user_print_data)
conflict = sql.on_duplicate_key_update(
user=aime_id,
serialId=serial_id,
**user_print_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"put_user_print_detail: Failed to insert! aime_id: {aime_id}")
return None
return result.lastrowid

View File

@ -78,7 +78,8 @@ profile = Table(
Column("overDamageBattlePoint", Integer, server_default="0"),
Column("bestBattlePoint", Integer, server_default="0"),
Column("lastEmoneyBrand", Integer, server_default="0"),
Column("isDialogWatchedSuggestMemory", Boolean),
Column("lastEmoneyCredit", Integer, server_default="0"),
Column("isDialogWatchedSuggestMemory", Boolean, server_default="0"),
UniqueConstraint("user", "version", name="ongeki_profile_profile_uk"),
mysql_charset='utf8mb4'
)

View File

@ -37,7 +37,210 @@ music = Table(
mysql_charset='utf8mb4'
)
gachas = Table(
"ongeki_static_gachas",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column("gachaId", Integer, nullable=False),
Column("gachaName", String(255), nullable=False),
Column("type", Integer, nullable=False, server_default="0"),
Column("kind", Integer, nullable=False, server_default="0"),
Column("isCeiling", Boolean, server_default="0"),
Column("maxSelectPoint", Integer, server_default="0"),
Column("ceilingCnt", Integer, server_default="10"),
Column("changeRateCnt1", Integer, server_default="0"),
Column("changeRateCnt2", Integer, server_default="0"),
Column("startDate", TIMESTAMP, server_default="2018-01-01 00:00:00.0"),
Column("endDate", TIMESTAMP, server_default="2038-01-01 00:00:00.0"),
Column("noticeStartDate", TIMESTAMP, server_default="2018-01-01 00:00:00.0"),
Column("noticeEndDate", TIMESTAMP, server_default="2038-01-01 00:00:00.0"),
Column("convertEndDate", TIMESTAMP, server_default="2038-01-01 00:00:00.0"),
UniqueConstraint("version", "gachaId", "gachaName", name="ongeki_static_gachas_uk"),
mysql_charset='utf8mb4'
)
gacha_cards = Table(
"ongeki_static_gacha_cards",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("gachaId", Integer, nullable=False),
Column("cardId", Integer, nullable=False),
Column("rarity", Integer, nullable=False),
Column("weight", Integer, server_default="1"),
Column("isPickup", Boolean, server_default="0"),
Column("isSelect", Boolean, server_default="0"),
UniqueConstraint("gachaId", "cardId", name="ongeki_static_gacha_cards_uk"),
mysql_charset='utf8mb4'
)
cards = Table(
"ongeki_static_cards",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column("cardId", Integer, nullable=False),
Column("name", String(255), nullable=False),
Column("charaId", Integer, nullable=False),
Column("nickName", String(255)),
Column("school", String(255), nullable=False),
Column("attribute", String(5), nullable=False),
Column("gakunen", String(255), nullable=False),
Column("rarity", Integer, nullable=False),
Column("levelParam", String(255), nullable=False),
Column("skillId", Integer, nullable=False),
Column("choKaikaSkillId", Integer, nullable=False),
Column("cardNumber", String(255)),
UniqueConstraint("version", "cardId", name="ongeki_static_cards_uk"),
mysql_charset='utf8mb4'
)
class OngekiStaticData(BaseData):
def put_card(self, version: int, card_id: int, **card_data) -> Optional[int]:
sql = insert(cards).values(
version=version,
cardId=card_id,
**card_data)
conflict = sql.on_duplicate_key_update(**card_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"Failed to insert card! card_id {card_id}")
return None
return result.lastrowid
def get_card(self, version: int, card_id: int) -> Optional[Dict]:
sql = cards.select(and_(
cards.c.version <= version,
cards.c.cardId == card_id
))
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_card_by_card_number(self, version: int, card_number: str) -> Optional[Dict]:
if not card_number.startswith("[O.N.G.E.K.I.]"):
card_number = f"[O.N.G.E.K.I.]{card_number}"
sql = cards.select(and_(
cards.c.version <= version,
cards.c.cardNumber == card_number
))
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_card_by_name(self, version: int, name: str) -> Optional[Dict]:
sql = cards.select(and_(
cards.c.version <= version,
cards.c.name == name
))
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_cards(self, version: int) -> Optional[List[Dict]]:
sql = cards.select(cards.c.version <= version)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def get_cards_by_rarity(self, version: int, rarity: int) -> Optional[List[Dict]]:
sql = cards.select(and_(
cards.c.version <= version,
cards.c.rarity == rarity
))
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def put_gacha(self, version: int, gacha_id: int, gacha_name: int,
gacha_kind: int, **gacha_data) -> Optional[int]:
sql = insert(gachas).values(
version=version,
gachaId=gacha_id,
gachaName=gacha_name,
kind=gacha_kind,
**gacha_data
)
conflict = sql.on_duplicate_key_update(
version=version,
gachaId=gacha_id,
gachaName=gacha_name,
kind=gacha_kind,
**gacha_data
)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"Failed to insert gacha! gacha_id {gacha_id}")
return None
return result.lastrowid
def get_gacha(self, version: int, gacha_id: int) -> Optional[Dict]:
sql = gachas.select(and_(
gachas.c.version == version,
gachas.c.gachaId == gacha_id
))
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_gachas(self, version: int) -> Optional[List[Dict]]:
sql = gachas.select(
gachas.c.version == version).order_by(
gachas.c.gachaId.asc()
)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def put_gacha_card(self, gacha_id: int, card_id: int, **gacha_card) -> Optional[int]:
sql = insert(gacha_cards).values(
gachaId=gacha_id,
cardId=card_id,
**gacha_card
)
conflict = sql.on_duplicate_key_update(
gachaId=gacha_id,
cardId=card_id,
**gacha_card
)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"Failed to insert gacha card! gacha_id {gacha_id}")
return None
return result.lastrowid
def get_gacha_cards(self, gacha_id: int) -> Optional[List[Dict]]:
sql = gacha_cards.select(
gacha_cards.c.gachaId == gacha_id
)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def put_event(self, version: int, event_id: int, event_type: int, event_name: str) -> Optional[int]:
sql = insert(events).values(
version = version,