1
0
Fork 0

Merge branch 'develop' of https://gitea.tendokyu.moe/Hay1tsme/artemis into develop

This commit is contained in:
Hay1tsme 2023-08-30 11:19:14 -04:00
commit 7a6272dcc5
14 changed files with 433 additions and 105 deletions

View File

@ -3,4 +3,4 @@ from .base import CompanyCodes, ReaderFwVer, CMD_CODE_GOODBYE, HEADER_SIZE
from .lookup import ADBLookupRequest, ADBLookupResponse, ADBLookupExResponse
from .campaign import ADBCampaignClearRequest, ADBCampaignClearResponse, ADBCampaignResponse, ADBOldCampaignRequest, ADBOldCampaignResponse
from .felica import ADBFelicaLookupRequest, ADBFelicaLookupResponse, ADBFelicaLookup2Request, ADBFelicaLookup2Response
from .log import ADBLogExRequest, ADBLogRequest, ADBStatusLogRequest
from .log import ADBLogExRequest, ADBLogRequest, ADBStatusLogRequest, ADBLogExResponse

View File

@ -150,8 +150,12 @@ class ADBBaseRequest:
self.head = ADBHeader.from_data(data)
class ADBBaseResponse:
def __init__(self, code: int = 0, length: int = 0x20, status: int = 1, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888") -> None:
self.head = ADBHeader(0xa13e, 0x3087, code, length, status, game_id, store_id, keychip_id)
def __init__(self, code: int = 0, length: int = 0x20, status: int = 1, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", protocol_ver: int = 0x3087) -> None:
self.head = ADBHeader(0xa13e, protocol_ver, code, length, status, game_id, store_id, keychip_id)
@classmethod
def from_req(cls, req: ADBHeader, cmd: int, length: int = 0x20, status: int = 1) -> "ADBBaseResponse":
return cls(cmd, length, status, req.game_id, req.store_id, req.keychip_id, req.protocol_ver)
def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size"""

View File

@ -55,6 +55,12 @@ class ADBCampaignResponse(ADBBaseResponse):
def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x0C, length: int = 0x200, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.campaigns = [Campaign(), Campaign(), Campaign()]
@classmethod
def from_req(cls, req: ADBHeader) -> "ADBCampaignResponse":
c = cls(req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self) -> bytes:
body = b""
@ -78,6 +84,12 @@ class ADBOldCampaignResponse(ADBBaseResponse):
self.info2 = 0
self.info3 = 0
@classmethod
def from_req(cls, req: ADBHeader) -> "ADBCampaignResponse":
c = cls(req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self) -> bytes:
resp_struct = Struct(
"info0" / Int32sl,
@ -103,7 +115,13 @@ class ADBCampaignClearResponse(ADBBaseResponse):
def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x0E, length: int = 0x50, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.campaign_clear_status = [CampaignClear(), CampaignClear(), CampaignClear()]
@classmethod
def from_req(cls, req: ADBHeader) -> "ADBCampaignResponse":
c = cls(req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self) -> bytes:
body = b""

View File

@ -13,7 +13,13 @@ class ADBFelicaLookupResponse(ADBBaseResponse):
def __init__(self, access_code: str = None, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x03, length: int = 0x30, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.access_code = access_code if access_code is not None else "00000000000000000000"
@classmethod
def from_req(cls, req: ADBHeader, access_code: str = None) -> "ADBFelicaLookupResponse":
c = cls(access_code, req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self) -> bytes:
resp_struct = Struct(
"felica_idx" / Int32ul,
@ -36,7 +42,7 @@ class ADBFelicaLookup2Request(ADBBaseRequest):
self.card_key_ver, self.write_ct, self.maca, company, fw_ver, self.dfc = struct.unpack_from("<16s16sQccH", data, 0x40)
self.idm = hex(idm)[2:].upper()
self.pmm = hex(pmm)[2:].upper()
self.company = CompanyCodes(company)
self.company = CompanyCodes(int.from_bytes(company, 'little'))
self.fw_ver = ReaderFwVer.from_byte(fw_ver)
class ADBFelicaLookup2Response(ADBBaseResponse):
@ -47,11 +53,17 @@ class ADBFelicaLookup2Response(ADBBaseResponse):
self.company = CompanyCodes.SEGA
self.portal_status = PortalRegStatus.NO_REG
@classmethod
def from_req(cls, req: ADBHeader, user_id: Union[int, None] = None, access_code: Union[str, None] = None) -> "ADBFelicaLookup2Response":
c = cls(user_id, access_code, req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self) -> bytes:
resp_struct = Struct(
"user_id" / Int32sl,
"relation1" / Int16sl,
"relation2" / Int16sl,
"relation1" / Int32sl,
"relation2" / Int32sl,
"access_code" / Int8ub[10],
"portal_status" / Int8ub,
"company_code" / Int8ub,

View File

@ -1,7 +1,15 @@
from construct import Struct, Int32sl, Padding, Int8sl
from typing import Union
from construct import Struct, Padding, Int8sl
from typing import Final, List
from .base import *
NUM_LOGS: Final[int] = 20
NUM_LEN_LOG_EX: Final[int] = 48
class AmLogEx:
def __init__(self, data: bytes) -> None:
self.aime_id, status, self.user_id, self.credit_ct, self.bet_ct, self.won_ct, self.local_time, \
self.tseq, self.place_id = struct.unpack("<IIQiii4xQiI", data)
self.status = LogStatus(status)
class ADBStatusLogRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
@ -18,6 +26,31 @@ class ADBLogRequest(ADBBaseRequest):
class ADBLogExRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.aime_id, status, self.user_id, self.credit_ct, self.bet_ct, self.won_ct, self.local_time, \
self.tseq, self.place_id, self.num_logs = struct.unpack_from("<IIQiii4xQiII", data, 0x20)
self.status = LogStatus(status)
self.logs: List[AmLogEx] = []
for x in range(NUM_LOGS):
self.logs.append(AmLogEx(data[0x20 + (NUM_LEN_LOG_EX * x): 0x50 + (NUM_LEN_LOG_EX * x)]))
self.num_logs = struct.unpack_from("<I", data, 0x03E0)[0]
class ADBLogExResponse(ADBBaseResponse):
def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", protocol_ver: int = 12423, code: int = 20, length: int = 64, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id, protocol_ver)
@classmethod
def from_req(cls, req: ADBHeader) -> "ADBLogExResponse":
c = cls(req.game_id, req.store_id, req.keychip_id, req.protocol_ver)
return c
def make(self) -> bytes:
resp_struct = Struct(
"log_result" / Int8sl[NUM_LOGS],
Padding(12)
)
body = resp_struct.build(dict(
log_result = [1] * NUM_LOGS
))
self.head.length = HEADER_SIZE + len(body)
return self.head.make() + body

View File

@ -19,13 +19,18 @@ class ADBLookupRequest(ADBBaseRequest):
self.fw_version = ReaderFwVer.from_byte(fw_version)
class ADBLookupResponse(ADBBaseResponse):
def __init__(self, user_id: Union[int, None], game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x06, length: int = 0x30, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.user_id = user_id if user_id is not None else -1
self.portal_reg = PortalRegStatus.NO_REG
@classmethod
def from_req(cls, req: ADBHeader, user_id: Union[int, None]) -> "ADBLookupResponse":
c = cls(user_id, req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self):
resp_struct = Struct(
"user_id" / Int32sl,
@ -48,6 +53,12 @@ class ADBLookupExResponse(ADBBaseResponse):
self.user_id = user_id if user_id is not None else -1
self.portal_reg = PortalRegStatus.NO_REG
@classmethod
def from_req(cls, req: ADBHeader, user_id: Union[int, None]) -> "ADBLookupExResponse":
c = cls(user_id, req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self):
resp_struct = Struct(
"user_id" / Int32sl,

View File

@ -85,6 +85,9 @@ class AimedbProtocol(Protocol):
return
if head.keychip_id == "ABCD1234567" or head.store_id == 0xfff0:
self.logger.warning(f"Request from uninitialized AMLib: {vars(head)}")
handler, resp_code, name = self.request_list.get(head.cmd, (self.handle_default, None, 'default'))
if resp_code is None:
@ -119,7 +122,7 @@ class AimedbProtocol(Protocol):
def handle_default(self, data: bytes, resp_code: int, length: int = 0x20) -> ADBBaseResponse:
req = ADBHeader.from_data(data)
return ADBBaseResponse(resp_code, length, 1, req.game_id, req.store_id, req.keychip_id)
return ADBBaseResponse(resp_code, length, 1, req.game_id, req.store_id, req.keychip_id, req.protocol_ver)
def handle_hello(self, data: bytes, resp_code: int) -> ADBBaseResponse:
return self.handle_default(data, resp_code)
@ -128,13 +131,13 @@ class AimedbProtocol(Protocol):
h = ADBHeader.from_data(data)
if h.protocol_ver >= 0x3030:
req = h
resp = ADBCampaignResponse(req.game_id, req.store_id, req.keychip_id)
resp = ADBCampaignResponse.from_req(req)
else:
req = ADBOldCampaignRequest(data)
self.logger.info(f"Legacy campaign request for campaign {req.campaign_id} (protocol version {hex(h.protocol_ver)})")
resp = ADBOldCampaignResponse(req.head.game_id, req.head.store_id, req.head.keychip_id)
resp = ADBOldCampaignResponse.from_req(req.head)
# We don't currently support campaigns
return resp
@ -143,7 +146,7 @@ class AimedbProtocol(Protocol):
req = ADBLookupRequest(data)
user_id = self.data.card.get_user_id_from_card(req.access_code)
ret = ADBLookupResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id)
ret = ADBLookupResponse.from_req(req.head, user_id)
self.logger.info(
f"access_code {req.access_code} -> user_id {ret.user_id}"
@ -154,7 +157,7 @@ class AimedbProtocol(Protocol):
req = ADBLookupRequest(data)
user_id = self.data.card.get_user_id_from_card(req.access_code)
ret = ADBLookupExResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id)
ret = ADBLookupExResponse.from_req(req.head, user_id)
self.logger.info(
f"access_code {req.access_code} -> user_id {ret.user_id}"
@ -175,7 +178,7 @@ class AimedbProtocol(Protocol):
self.logger.info(
f"idm {req.idm} ipm {req.pmm} -> access_code {ac}"
)
return ADBFelicaLookupResponse(ac, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBFelicaLookupResponse.from_req(req.head, ac)
def handle_felica_register(self, data: bytes, resp_code: int) -> bytes:
"""
@ -207,7 +210,7 @@ class AimedbProtocol(Protocol):
f"Registration blocked!: access code {ac} (IDm: {req.idm} PMm: {req.pmm})"
)
return ADBFelicaLookupResponse(ac, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBFelicaLookupResponse.from_req(req.head, ac)
def handle_felica_lookup_ex(self, data: bytes, resp_code: int) -> bytes:
req = ADBFelicaLookup2Request(data)
@ -221,12 +224,12 @@ class AimedbProtocol(Protocol):
f"idm {req.idm} ipm {req.pmm} -> access_code {access_code} user_id {user_id}"
)
return ADBFelicaLookup2Response(user_id, access_code, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBFelicaLookup2Response.from_req(req.head, user_id, access_code)
def handle_campaign_clear(self, data: bytes, resp_code: int) -> ADBBaseResponse:
req = ADBCampaignClearRequest(data)
resp = ADBCampaignClearResponse(req.head.game_id, req.head.store_id, req.head.keychip_id)
resp = ADBCampaignClearResponse.from_req(req.head)
# We don't support campaign stuff
return resp
@ -258,7 +261,7 @@ class AimedbProtocol(Protocol):
f"Registration blocked!: access code {req.access_code}"
)
resp = ADBLookupResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id)
resp = ADBLookupResponse.from_req(req.head, user_id)
if resp.user_id <= 0:
resp.head.status = ADBStatus.BAN_SYS # Closest we can get to a "You cannot register"
@ -268,17 +271,21 @@ class AimedbProtocol(Protocol):
def handle_status_log(self, data: bytes, resp_code: int) -> bytes:
req = ADBStatusLogRequest(data)
self.logger.info(f"User {req.aime_id} logged {req.status.name} event")
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id, req.head.protocol_ver)
def handle_log(self, data: bytes, resp_code: int) -> bytes:
req = ADBLogRequest(data)
self.logger.info(f"User {req.aime_id} logged {req.status.name} event, credit_ct: {req.credit_ct} bet_ct: {req.bet_ct} won_ct: {req.won_ct}")
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id, req.head.protocol_ver)
def handle_log_ex(self, data: bytes, resp_code: int) -> bytes:
req = ADBLogExRequest(data)
self.logger.info(f"User {req.aime_id} logged {req.status.name} event, credit_ct: {req.credit_ct} bet_ct: {req.bet_ct} won_ct: {req.won_ct}")
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id)
strs = []
self.logger.info(f"Recieved {req.num_logs} or {len(req.logs)} logs")
for x in range(req.num_logs):
self.logger.debug(f"User {req.logs[x].aime_id} logged {req.logs[x].status.name} event, credit_ct: {req.logs[x].credit_ct} bet_ct: {req.logs[x].bet_ct} won_ct: {req.logs[x].won_ct}")
return ADBLogExResponse.from_req(req.head)
def handle_goodbye(self, data: bytes, resp_code: int) -> None:
self.logger.info(f"goodbye from {self.transport.getPeer().host}")

View File

@ -1,4 +1,4 @@
from typing import Dict, List, Any, Optional, Tuple, Union
from typing import Dict, List, Any, Optional, Tuple, Union, Final
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
from twisted.web.http import Request
@ -14,12 +14,15 @@ from Crypto.Signature import PKCS1_v1_5
from time import strptime
from os import path
import urllib.parse
import math
from core.config import CoreConfig
from core.utils import Utils
from core.data import Data
from core.const import *
BILLING_DT_FORMAT: Final[str] = "%Y%m%d%H%M%S"
class DLIMG_TYPE(Enum):
app = 0
opt = 1
@ -83,8 +86,16 @@ class AllnetServlet:
def handle_poweron(self, request: Request, _: Dict):
request_ip = Utils.get_ip_addr(request)
pragma_header = request.getHeader('Pragma')
is_dfi = pragma_header is not None and pragma_header == "DFI"
try:
req_dict = self.allnet_req_to_dict(request.content.getvalue())
if is_dfi:
req_urlencode = self.from_dfi(request.content.getvalue())
else:
req_urlencode = request.content.getvalue().decode()
req_dict = self.allnet_req_to_dict(req_urlencode)
if req_dict is None:
raise AllnetRequestException()
@ -219,12 +230,24 @@ class AllnetServlet:
self.logger.debug(f"Allnet response: {resp_dict}")
resp_str += "\n"
"""if is_dfi:
request.responseHeaders.addRawHeader('Pragma', 'DFI')
return self.to_dfi(resp_str)"""
return resp_str.encode("utf-8")
def handle_dlorder(self, request: Request, _: Dict):
request_ip = Utils.get_ip_addr(request)
pragma_header = request.getHeader('Pragma')
is_dfi = pragma_header is not None and pragma_header == "DFI"
try:
req_dict = self.allnet_req_to_dict(request.content.getvalue())
if is_dfi:
req_urlencode = self.from_dfi(request.content.getvalue())
else:
req_urlencode = request.content.getvalue().decode()
req_dict = self.allnet_req_to_dict(req_urlencode)
if req_dict is None:
raise AllnetRequestException()
@ -266,7 +289,13 @@ class AllnetServlet:
self.logger.debug(f"Sending download uri {resp.uri}")
self.data.base.log_event("allnet", "DLORDER_REQ_SUCCESS", logging.INFO, f"{Utils.get_ip_addr(request)} requested DL Order for {req.serial} {req.game_id} v{req.ver}")
return urllib.parse.unquote(urllib.parse.urlencode(vars(resp))) + "\n"
res_str = urllib.parse.unquote(urllib.parse.urlencode(vars(resp))) + "\n"
"""if is_dfi:
request.responseHeaders.addRawHeader('Pragma', 'DFI')
return self.to_dfi(res_str)"""
return res_str
def handle_dlorder_ini(self, request: Request, match: Dict) -> bytes:
if "file" not in match:
@ -334,8 +363,16 @@ class AllnetServlet:
return "OK".encode()
def handle_billing_request(self, request: Request, _: Dict):
req_dict = self.billing_req_to_dict(request.content.getvalue())
req_raw = request.content.getvalue()
if request.getHeader('Content-Type') == "application/octet-stream":
req_unzip = zlib.decompressobj(-zlib.MAX_WBITS).decompress(req_raw)
else:
req_unzip = req_raw
req_dict = self.billing_req_to_dict(req_unzip)
request_ip = Utils.get_ip_addr(request)
if req_dict is None:
self.logger.error(f"Failed to parse request {request.content.getvalue()}")
return b""
@ -345,45 +382,60 @@ class AllnetServlet:
rsa = RSA.import_key(open(self.config.billing.signing_key, "rb").read())
signer = PKCS1_v1_5.new(rsa)
digest = SHA.new()
traces: List[TraceData] = []
try:
kc_playlimit = int(req_dict[0]["playlimit"])
kc_nearfull = int(req_dict[0]["nearfull"])
kc_billigtype = int(req_dict[0]["billingtype"])
kc_playcount = int(req_dict[0]["playcnt"])
kc_serial: str = req_dict[0]["keychipid"]
kc_game: str = req_dict[0]["gameid"]
kc_date = strptime(req_dict[0]["date"], "%Y%m%d%H%M%S")
kc_serial_bytes = kc_serial.encode()
for x in range(len(req_dict)):
if not req_dict[x]:
continue
if x == 0:
req = BillingInfo(req_dict[x])
continue
tmp = TraceData(req_dict[x])
if tmp.trace_type == TraceDataType.CHARGE:
tmp = TraceDataCharge(req_dict[x])
elif tmp.trace_type == TraceDataType.EVENT:
tmp = TraceDataEvent(req_dict[x])
elif tmp.trace_type == TraceDataType.CREDIT:
tmp = TraceDataCredit(req_dict[x])
traces.append(tmp)
kc_serial_bytes = req.keychipid.encode()
except KeyError as e:
return f"result=5&linelimit=&message={e} field is missing".encode()
self.logger.error(f"Billing request failed to parse: {e}")
return f"result=5&linelimit=&message=field is missing or formatting is incorrect\r\n".encode()
machine = self.data.arcade.get_machine(kc_serial)
machine = self.data.arcade.get_machine(req.keychipid)
if machine is None and not self.config.server.allow_unregistered_serials:
msg = f"Unrecognised serial {kc_serial} attempted billing checkin from {request_ip} for game {kc_game}."
msg = f"Unrecognised serial {req.keychipid} attempted billing checkin from {request_ip} for {req.gameid} v{req.gamever}."
self.data.base.log_event(
"allnet", "BILLING_CHECKIN_NG_SERIAL", logging.WARN, msg
)
self.logger.warning(msg)
resp = BillingResponse("", "", "", "")
resp.result = "1"
return self.dict_to_http_form_string([vars(resp)])
return f"result=1&requestno={req.requestno}&message=Keychip Serial bad\r\n".encode()
msg = (
f"Billing checkin from {request_ip}: game {kc_game} keychip {kc_serial} playcount "
f"{kc_playcount} billing_type {kc_billigtype} nearfull {kc_nearfull} playlimit {kc_playlimit}"
f"Billing checkin from {request_ip}: game {req.gameid} ver {req.gamever} keychip {req.keychipid} playcount "
f"{req.playcnt} billing_type {req.billingtype.name} nearfull {req.nearfull} playlimit {req.playlimit}"
)
self.logger.info(msg)
self.data.base.log_event("billing", "BILLING_CHECKIN_OK", logging.INFO, msg)
if req.traceleft > 0:
self.logger.warn(f"{req.traceleft} unsent tracelogs")
kc_playlimit = req.playlimit
kc_nearfull = req.nearfull
while kc_playcount > kc_playlimit:
while req.playcnt > req.playlimit:
kc_playlimit += 1024
kc_nearfull += 1024
playlimit = kc_playlimit
nearfull = kc_nearfull + (kc_billigtype * 0x00010000)
nearfull = kc_nearfull + (req.billingtype.value * 0x00010000)
digest.update(playlimit.to_bytes(4, "little") + kc_serial_bytes)
playlimit_sig = signer.sign(digest).hex()
@ -394,13 +446,16 @@ class AllnetServlet:
# TODO: playhistory
resp = BillingResponse(playlimit, playlimit_sig, nearfull, nearfull_sig)
#resp = BillingResponse(playlimit, playlimit_sig, nearfull, nearfull_sig)
resp = BillingResponse(playlimit, playlimit_sig, nearfull, nearfull_sig, req.requestno, req.protocolver)
resp_str = self.dict_to_http_form_string([vars(resp)])
if resp_str is None:
self.logger.error(f"Failed to parse response {vars(resp)}")
resp_str = urllib.parse.unquote(urllib.parse.urlencode(vars(resp))) + "\r\n"
self.logger.debug(f"response {vars(resp)}")
if req.traceleft > 0:
self.logger.info(f"Requesting 20 more of {req.traceleft} unsent tracelogs")
return f"result=6&waittime=0&linelimit=20\r\n".encode()
return resp_str.encode("utf-8")
def handle_naomitest(self, request: Request, _: Dict) -> bytes:
@ -412,9 +467,7 @@ class AllnetServlet:
Parses an billing request string into a python dictionary
"""
try:
decomp = zlib.decompressobj(-zlib.MAX_WBITS)
unzipped = decomp.decompress(data)
sections = unzipped.decode("ascii").split("\r\n")
sections = data.decode("ascii").split("\r\n")
ret = []
for x in sections:
@ -430,9 +483,7 @@ class AllnetServlet:
Parses an allnet request string into a python dictionary
"""
try:
zipped = base64.b64decode(data)
unzipped = zlib.decompress(zipped)
sections = unzipped.decode("utf-8").split("\r\n")
sections = data.split("\r\n")
ret = []
for x in sections:
@ -443,35 +494,15 @@ class AllnetServlet:
self.logger.error(f"allnet_req_to_dict: {e} while parsing {data}")
return None
def dict_to_http_form_string(
self,
data: List[Dict[str, Any]],
crlf: bool = True,
trailing_newline: bool = True,
) -> Optional[str]:
"""
Takes a python dictionary and parses it into an allnet response string
"""
try:
urlencode = ""
for item in data:
for k, v in item.items():
if k is None or v is None:
continue
urlencode += f"{k}={v}&"
if crlf:
urlencode = urlencode[:-1] + "\r\n"
else:
urlencode = urlencode[:-1] + "\n"
if not trailing_newline:
if crlf:
urlencode = urlencode[:-2]
else:
urlencode = urlencode[:-1]
return urlencode
except Exception as e:
self.logger.error(f"dict_to_http_form_string: {e} while parsing {data}")
return None
def from_dfi(self, data: bytes) -> str:
zipped = base64.b64decode(data)
unzipped = zlib.decompress(zipped)
return unzipped.decode("utf-8")
def to_dfi(self, data: str) -> bytes:
unzipped = data.encode('utf-8')
zipped = zlib.compress(unzipped)
return base64.b64encode(zipped)
class AllnetPowerOnRequest:
@ -557,6 +588,114 @@ class AllnetDownloadOrderResponse:
self.serial = serial
self.uri = uri
class TraceDataType(Enum):
CHARGE = 0
EVENT = 1
CREDIT = 2
class BillingType(Enum):
A = 1
B = 0
class float5:
def __init__(self, n: str = "0") -> None:
nf = float(n)
if nf > 999.9 or nf < 0:
raise ValueError('float5 must be between 0.000 and 999.9 inclusive')
return nf
@classmethod
def to_str(cls, f: float):
return f"%.{2 - int(math.log10(f))+1}f" % f
class BillingInfo:
def __init__(self, data: Dict) -> None:
try:
self.keychipid = str(data.get("keychipid", None))
self.functype = int(data.get("functype", None))
self.gameid = str(data.get("gameid", None))
self.gamever = float(data.get("gamever", None))
self.boardid = str(data.get("boardid", None))
self.tenpoip = str(data.get("tenpoip", None))
self.libalibver = float(data.get("libalibver", None))
self.datamax = int(data.get("datamax", None))
self.billingtype = BillingType(int(data.get("billingtype", None)))
self.protocolver = float(data.get("protocolver", None))
self.operatingfix = bool(data.get("operatingfix", None))
self.traceleft = int(data.get("traceleft", None))
self.requestno = int(data.get("requestno", None))
self.datesync = bool(data.get("datesync", None))
self.timezone = str(data.get("timezone", None))
self.date = datetime.strptime(data.get("date", None), BILLING_DT_FORMAT)
self.crcerrcnt = int(data.get("crcerrcnt", None))
self.memrepair = bool(data.get("memrepair", None))
self.playcnt = int(data.get("playcnt", None))
self.playlimit = int(data.get("playlimit", None))
self.nearfull = int(data.get("nearfull", None))
except Exception as e:
raise KeyError(e)
class TraceData:
def __init__(self, data: Dict) -> None:
try:
self.crc_err_flg = bool(data.get("cs", None))
self.record_number = int(data.get("rn", None))
self.seq_number = int(data.get("sn", None))
self.trace_type = TraceDataType(int(data.get("tt", None)))
self.date_sync_flg = bool(data.get("ds", None))
self.date = datetime.strptime(data.get("dt", None), BILLING_DT_FORMAT)
self.keychip = str(data.get("kn", None))
self.lib_ver = float(data.get("alib", None))
except Exception as e:
raise KeyError(e)
class TraceDataCharge(TraceData):
def __init__(self, data: Dict) -> None:
super().__init__(data)
try:
self.game_id = str(data.get("gi", None))
self.game_version = float(data.get("gv", None))
self.board_serial = str(data.get("bn", None))
self.shop_ip = str(data.get("ti", None))
self.play_count = int(data.get("pc", None))
self.play_limit = int(data.get("pl", None))
self.product_code = int(data.get("ic", None))
self.product_count = int(data.get("in", None))
self.func_type = int(data.get("kk", None))
self.player_number = int(data.get("playerno", None))
except Exception as e:
raise KeyError(e)
class TraceDataEvent(TraceData):
def __init__(self, data: Dict) -> None:
super().__init__(data)
try:
self.message = str(data.get("me", None))
except Exception as e:
raise KeyError(e)
class TraceDataCredit(TraceData):
def __init__(self, data: Dict) -> None:
super().__init__(data)
try:
self.chute_type = int(data.get("cct", None))
self.service_type = int(data.get("cst", None))
self.operation_type = int(data.get("cop", None))
self.coin_rate0 = int(data.get("cr0", None))
self.coin_rate1 = int(data.get("cr1", None))
self.bonus_addition = int(data.get("cba", None))
self.credit_rate = int(data.get("ccr", None))
self.credit0 = int(data.get("cc0", None))
self.credit1 = int(data.get("cc1", None))
self.credit2 = int(data.get("cc2", None))
self.credit3 = int(data.get("cc3", None))
self.credit4 = int(data.get("cc4", None))
self.credit5 = int(data.get("cc5", None))
self.credit6 = int(data.get("cc6", None))
self.credit7 = int(data.get("cc7", None))
except Exception as e:
raise KeyError(e)
class BillingResponse:
def __init__(
@ -565,20 +704,22 @@ class BillingResponse:
playlimit_sig: str = "",
nearfull: str = "",
nearfull_sig: str = "",
request_num: int = 1,
protocol_ver: float = 1.000,
playhistory: str = "000000/0:000000/0:000000/0",
) -> None:
self.result = "0"
self.waitime = "100"
self.linelimit = "1"
self.message = ""
self.result = 0
self.requestno = request_num
self.traceerase = 1
self.fixinterval = 120
self.fixlogcnt = 100
self.playlimit = playlimit
self.playlimitsig = playlimit_sig
self.protocolver = "1.000"
self.playhistory = playhistory
self.nearfull = nearfull
self.nearfullsig = nearfull_sig
self.fixlogincnt = "0"
self.fixinterval = "5"
self.playhistory = playhistory
self.linelimit = 100
self.protocolver = float5.to_str(protocol_ver)
# playhistory -> YYYYMM/C:...
# YYYY -> 4 digit year, MM -> 2 digit month, C -> Playcount during that period

View File

@ -200,6 +200,12 @@ class AllnetConfig:
self.__config, "core", "allnet", "port", default=80
)
@property
def ip_check(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "core", "allnet", "ip_check", default=False
)
@property
def allow_online_updates(self) -> int:
return CoreConfig.get_config_field(

View File

@ -279,3 +279,67 @@ class MuchaDownloadStateRequest:
self.boardId = request.get("boardId", "")
self.placeId = request.get("placeId", "")
self.storeRouterIp = request.get("storeRouterIp", "")
class MuchaDownloadErrorRequest:
def __init__(self, request: Dict) -> None:
self.gameCd = request.get("gameCd", "")
self.updateVer = request.get("updateVer", "")
self.serialNum = request.get("serialNum", "")
self.downloadUrl = request.get("downloadUrl", "")
self.errCd = request.get("errCd", "")
self.errMessage = request.get("errMessage", "")
self.boardId = request.get("boardId", "")
self.placeId = request.get("placeId", "")
self.storeRouterIp = request.get("storeRouterIp", "")
class MuchaRegiAuthRequest:
def __init__(self, request: Dict) -> None:
self.gameCd = request.get("gameCd", "")
self.serialNum = request.get("serialNum", "") # Encrypted
self.countryCd = request.get("countryCd", "")
self.registrationCd = request.get("registrationCd", "")
self.sendDate = request.get("sendDate", "")
self.useToken = request.get("useToken", "")
self.allToken = request.get("allToken", "")
self.placeId = request.get("placeId", "")
self.storeRouterIp = request.get("storeRouterIp", "")
class MuchaRegiAuthResponse:
def __init__(self) -> None:
self.RESULTS = "001" # 001 = success, 099, 098, 097 = fail, others = fail
self.ALL_TOKEN = "0" # Encrypted
self.ADD_TOKEN = "0" # Encrypted
class MuchaTokenStateRequest:
def __init__(self, request: Dict) -> None:
self.gameCd = request.get("gameCd", "")
self.serialNum = request.get("serialNum", "")
self.countryCd = request.get("countryCd", "")
self.useToken = request.get("useToken", "")
self.allToken = request.get("allToken", "")
self.placeId = request.get("placeId", "")
self.storeRouterIp = request.get("storeRouterIp", "")
class MuchaTokenStateResponse:
def __init__(self) -> None:
self.RESULTS = "001"
class MuchaTokenMarginStateRequest:
def __init__(self, request: Dict) -> None:
self.gameCd = request.get("gameCd", "")
self.serialNum = request.get("serialNum", "")
self.countryCd = request.get("countryCd", "")
self.placeId = request.get("placeId", "")
self.limitLowerToken = request.get("limitLowerToken", 0)
self.limitUpperToken = request.get("limitUpperToken", 0)
self.settlementMonth = request.get("settlementMonth", 0)
class MuchaTokenMarginStateResponse:
def __init__(self) -> None:
self.RESULTS = "001"
self.LIMIT_LOWER_TOKEN = 0
self.LIMIT_UPPER_TOKEN = 0
self.LAST_SETTLEMENT_MONTH = 0
self.LAST_LIMIT_LOWER_TOKEN = 0
self.LAST_LIMIT_UPPER_TOKEN = 0
self.SETTLEMENT_MONTH = 0

View File

@ -34,6 +34,7 @@ frontend:
allnet:
loglevel: "info"
port: 80
ip_check: False
allow_online_updates: False
update_cfg_folder: ""

View File

@ -31,7 +31,18 @@ class CardMakerVersionConfig:
1: {"ongeki": 1.30.01, "chuni": 2.00.00, "maimai": 1.20.00}
"""
return CoreConfig.get_config_field(
self.__config, "cardmaker", "version", default={}
self.__config, "cardmaker", "version", default={
0: {
"ongeki": "1.30.01",
"chuni": "2.00.00",
"maimai": "1.20.00"
},
1: {
"ongeki": "1.35.03",
"chuni": "2.10.00",
"maimai": "1.30.00"
}
}
)[version]

View File

@ -85,8 +85,6 @@ class CardMakerServlet:
endpoint = url_split[len(url_split) - 1]
client_ip = Utils.get_ip_addr(request)
print(f"version: {version}")
if version >= 130 and version < 135: # Card Maker
internal_ver = CardMakerConstants.VER_CARD_MAKER
elif version >= 135 and version < 140: # Card Maker 1.35
@ -124,6 +122,7 @@ class CardMakerServlet:
except Exception as e:
self.logger.error(f"Error handling v{version} method {endpoint} - {e}")
raise
return zlib.compress(b'{"stat": "0"}')
if resp is None:

View File

@ -234,6 +234,10 @@ class Mai2Servlet:
if (url_split[0] == "api" and url_split[1] == "movie") or url_split[0] == "movie":
if url_split[2] == "moviestart":
return json.dumps({"moviestart":{"status":"OK"}}).encode()
else:
request.setResponseCode(404)
return b""
if url_split[0] == "old":
if url_split[1] == "ping":
@ -247,18 +251,35 @@ class Mai2Servlet:
elif url_split[1].startswith("friend"):
self.logger.info(f"v{version} old server friend inquire")
return zlib.compress(b"{}")
else:
request.setResponseCode(404)
return b""
elif url_split[0] == "usbdl":
if url_split[1] == "CONNECTIONTEST":
self.logger.info(f"v{version} usbdl server test")
return zlib.compress(b"ok")
return b""
elif self.game_cfg.deliver.udbdl_enable and path.exists(f"{self.game_cfg.deliver.content_folder}/usb/{url_split[-1]}"):
with open(f"{self.game_cfg.deliver.content_folder}/usb/{url_split[-1]}", 'rb') as f:
return f.read()
else:
request.setResponseCode(404)
return b""
elif url_split[0] == "deliver":
file = url_split[len(url_split) - 1]
self.logger.info(f"v{version} {file} deliver inquire")
if not self.game_cfg.deliver.enable or not path.exists(f"{self.game_cfg.deliver.content_folder}/{file}"):
return zlib.compress(b"")
if self.game_cfg.deliver.enable and path.exists(f"{self.game_cfg.deliver.content_folder}/{file}"):
with open(f"{self.game_cfg.deliver.content_folder}/deliver/{url_split[-1]}", 'rb') as f:
return f.read()
else:
request.setResponseCode(404)
return b""
else:
return zlib.compress(b"{}")