diff --git a/core/allnet.py b/core/allnet.py index 1a9ae5b..e935359 100644 --- a/core/allnet.py +++ b/core/allnet.py @@ -24,6 +24,12 @@ class DLIMG_TYPE(Enum): app = 0 opt = 1 +class ALLNET_STAT(Enum): + ok = 0 + bad_game = -1 + bad_machine = -2 + bad_shop = -3 + class AllnetServlet: def __init__(self, core_cfg: CoreConfig, cfg_folder: str): super().__init__() @@ -102,33 +108,7 @@ class AllnetServlet: else: resp = AllnetPowerOnResponse() - self.logger.debug(f"Allnet request: {vars(req)}") - if req.game_id not in self.uri_registry: - if not self.config.server.is_develop: - msg = f"Unrecognised game {req.game_id} attempted allnet auth from {request_ip}." - self.data.base.log_event( - "allnet", "ALLNET_AUTH_UNKNOWN_GAME", logging.WARN, msg - ) - self.logger.warning(msg) - - resp.stat = -1 - resp_dict = {k: v for k, v in vars(resp).items() if v is not None} - return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8") - - else: - self.logger.info( - f"Allowed unknown game {req.game_id} v{req.ver} to authenticate from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}" - ) - resp.uri = f"http://{self.config.title.hostname}:{self.config.title.port}/{req.game_id}/{req.ver.replace('.', '')}/" - resp.host = f"{self.config.title.hostname}:{self.config.title.port}" - - resp_dict = {k: v for k, v in vars(resp).items() if v is not None} - resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) - - self.logger.debug(f"Allnet response: {resp_str}") - return (resp_str + "\n").encode("utf-8") - - resp.uri, resp.host = self.uri_registry[req.game_id] + self.logger.debug(f"Allnet request: {vars(req)}") machine = self.data.arcade.get_machine(req.serial) if machine is None and not self.config.server.allow_unregistered_serials: @@ -138,12 +118,36 @@ class AllnetServlet: ) self.logger.warning(msg) - resp.stat = -2 + resp.stat = ALLNET_STAT.bad_machine.value resp_dict = {k: v for k, v in vars(resp).items() if v is not None} return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8") if machine is not None: arcade = self.data.arcade.get_arcade(machine["arcade"]) + if self.config.server.check_arcade_ip: + if arcade["ip"] and arcade["ip"] is not None and arcade["ip"] != req.ip: + msg = f"Serial {req.serial} attempted allnet auth from bad IP {req.ip} (expected {arcade['ip']})." + self.data.base.log_event( + "allnet", "ALLNET_AUTH_BAD_IP", logging.ERROR, msg + ) + self.logger.warning(msg) + + resp.stat = ALLNET_STAT.bad_shop.value + resp_dict = {k: v for k, v in vars(resp).items() if v is not None} + return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8") + + elif not arcade["ip"] or arcade["ip"] is None and self.config.server.strict_ip_checking: + msg = f"Serial {req.serial} attempted allnet auth from bad IP {req.ip}, but arcade {arcade['id']} has no IP set! (strict checking enabled)." + self.data.base.log_event( + "allnet", "ALLNET_AUTH_NO_SHOP_IP", logging.ERROR, msg + ) + self.logger.warning(msg) + + resp.stat = ALLNET_STAT.bad_shop.value + resp_dict = {k: v for k, v in vars(resp).items() if v is not None} + return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8") + + country = ( arcade["country"] if machine["country"] is None else machine["country"] ) @@ -174,6 +178,33 @@ class AllnetServlet: resp.client_timezone = ( arcade["timezone"] if arcade["timezone"] is not None else "+0900" ) + + if req.game_id not in self.uri_registry: + if not self.config.server.is_develop: + msg = f"Unrecognised game {req.game_id} attempted allnet auth from {request_ip}." + self.data.base.log_event( + "allnet", "ALLNET_AUTH_UNKNOWN_GAME", logging.WARN, msg + ) + self.logger.warning(msg) + + resp.stat = ALLNET_STAT.bad_game.value + resp_dict = {k: v for k, v in vars(resp).items() if v is not None} + return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8") + + else: + self.logger.info( + f"Allowed unknown game {req.game_id} v{req.ver} to authenticate from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}" + ) + resp.uri = f"http://{self.config.title.hostname}:{self.config.title.port}/{req.game_id}/{req.ver.replace('.', '')}/" + resp.host = f"{self.config.title.hostname}:{self.config.title.port}" + + resp_dict = {k: v for k, v in vars(resp).items() if v is not None} + resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + + self.logger.debug(f"Allnet response: {resp_str}") + return (resp_str + "\n").encode("utf-8") + + resp.uri, resp.host = self.uri_registry[req.game_id] int_ver = req.ver.replace(".", "") resp.uri = resp.uri.replace("$v", int_ver) diff --git a/core/data/schema/arcade.py b/core/data/schema/arcade.py index cf244cc..31d48f6 100644 --- a/core/data/schema/arcade.py +++ b/core/data/schema/arcade.py @@ -213,35 +213,9 @@ class ArcadeData(BaseData): return f"{platform_code}{platform_rev:02d}A{serial_num:04d}{append:04d}" # 0x41 = A, 0x52 = R def validate_keychip_format(self, serial: str) -> bool: - serial = serial.replace("-", "") - if len(serial) != 11 or len(serial) != 15: - self.logger.error( - f"Serial validate failed: Incorrect length for {serial} (len {len(serial)})" - ) + if re.fullmatch(r"^A[0-9]{2}[E|X][-]?[0-9]{2}[A-HJ-NP-Z][0-9]{4}([0-9]{4})?$", serial) is None: return False - - platform_code = serial[:4] - platform_rev = serial[4:6] - const_a = serial[6] - num = serial[7:11] - append = serial[11:15] - - if re.match("A[7|6]\d[E|X][0|1][0|1|2]A\d{4,8}", serial) is None: - self.logger.error(f"Serial validate failed: {serial} failed regex") - return False - - if len(append) != 0 or len(append) != 4: - self.logger.error( - f"Serial validate failed: {serial} had malformed append {append}" - ) - return False - - if len(num) != 4: - self.logger.error( - f"Serial validate failed: {serial} had malformed number {num}" - ) - return False - + return True def find_arcade_by_name(self, name: str) -> List[Row]: