forked from Hay1tsme/artemis
allnet: add IP checking
This commit is contained in:
parent
4744e8cf5f
commit
9e3a51a57a
@ -24,6 +24,12 @@ class DLIMG_TYPE(Enum):
|
|||||||
app = 0
|
app = 0
|
||||||
opt = 1
|
opt = 1
|
||||||
|
|
||||||
|
class ALLNET_STAT(Enum):
|
||||||
|
ok = 0
|
||||||
|
bad_game = -1
|
||||||
|
bad_machine = -2
|
||||||
|
bad_shop = -3
|
||||||
|
|
||||||
class AllnetServlet:
|
class AllnetServlet:
|
||||||
def __init__(self, core_cfg: CoreConfig, cfg_folder: str):
|
def __init__(self, core_cfg: CoreConfig, cfg_folder: str):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@ -102,33 +108,7 @@ class AllnetServlet:
|
|||||||
else:
|
else:
|
||||||
resp = AllnetPowerOnResponse()
|
resp = AllnetPowerOnResponse()
|
||||||
|
|
||||||
self.logger.debug(f"Allnet request: {vars(req)}")
|
self.logger.debug(f"Allnet request: {vars(req)}")
|
||||||
if req.game_id not in self.uri_registry:
|
|
||||||
if not self.config.server.is_develop:
|
|
||||||
msg = f"Unrecognised game {req.game_id} attempted allnet auth from {request_ip}."
|
|
||||||
self.data.base.log_event(
|
|
||||||
"allnet", "ALLNET_AUTH_UNKNOWN_GAME", logging.WARN, msg
|
|
||||||
)
|
|
||||||
self.logger.warning(msg)
|
|
||||||
|
|
||||||
resp.stat = -1
|
|
||||||
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
|
||||||
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.logger.info(
|
|
||||||
f"Allowed unknown game {req.game_id} v{req.ver} to authenticate from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}"
|
|
||||||
)
|
|
||||||
resp.uri = f"http://{self.config.title.hostname}:{self.config.title.port}/{req.game_id}/{req.ver.replace('.', '')}/"
|
|
||||||
resp.host = f"{self.config.title.hostname}:{self.config.title.port}"
|
|
||||||
|
|
||||||
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
|
||||||
resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict))
|
|
||||||
|
|
||||||
self.logger.debug(f"Allnet response: {resp_str}")
|
|
||||||
return (resp_str + "\n").encode("utf-8")
|
|
||||||
|
|
||||||
resp.uri, resp.host = self.uri_registry[req.game_id]
|
|
||||||
|
|
||||||
machine = self.data.arcade.get_machine(req.serial)
|
machine = self.data.arcade.get_machine(req.serial)
|
||||||
if machine is None and not self.config.server.allow_unregistered_serials:
|
if machine is None and not self.config.server.allow_unregistered_serials:
|
||||||
@ -138,12 +118,36 @@ class AllnetServlet:
|
|||||||
)
|
)
|
||||||
self.logger.warning(msg)
|
self.logger.warning(msg)
|
||||||
|
|
||||||
resp.stat = -2
|
resp.stat = ALLNET_STAT.bad_machine.value
|
||||||
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
||||||
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
|
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
|
||||||
|
|
||||||
if machine is not None:
|
if machine is not None:
|
||||||
arcade = self.data.arcade.get_arcade(machine["arcade"])
|
arcade = self.data.arcade.get_arcade(machine["arcade"])
|
||||||
|
if self.config.server.check_arcade_ip:
|
||||||
|
if arcade["ip"] and arcade["ip"] is not None and arcade["ip"] != req.ip:
|
||||||
|
msg = f"Serial {req.serial} attempted allnet auth from bad IP {req.ip} (expected {arcade['ip']})."
|
||||||
|
self.data.base.log_event(
|
||||||
|
"allnet", "ALLNET_AUTH_BAD_IP", logging.ERROR, msg
|
||||||
|
)
|
||||||
|
self.logger.warning(msg)
|
||||||
|
|
||||||
|
resp.stat = ALLNET_STAT.bad_shop.value
|
||||||
|
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
||||||
|
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
|
||||||
|
|
||||||
|
elif not arcade["ip"] or arcade["ip"] is None and self.config.server.strict_ip_checking:
|
||||||
|
msg = f"Serial {req.serial} attempted allnet auth from bad IP {req.ip}, but arcade {arcade['id']} has no IP set! (strict checking enabled)."
|
||||||
|
self.data.base.log_event(
|
||||||
|
"allnet", "ALLNET_AUTH_NO_SHOP_IP", logging.ERROR, msg
|
||||||
|
)
|
||||||
|
self.logger.warning(msg)
|
||||||
|
|
||||||
|
resp.stat = ALLNET_STAT.bad_shop.value
|
||||||
|
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
||||||
|
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
country = (
|
country = (
|
||||||
arcade["country"] if machine["country"] is None else machine["country"]
|
arcade["country"] if machine["country"] is None else machine["country"]
|
||||||
)
|
)
|
||||||
@ -174,6 +178,33 @@ class AllnetServlet:
|
|||||||
resp.client_timezone = (
|
resp.client_timezone = (
|
||||||
arcade["timezone"] if arcade["timezone"] is not None else "+0900"
|
arcade["timezone"] if arcade["timezone"] is not None else "+0900"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if req.game_id not in self.uri_registry:
|
||||||
|
if not self.config.server.is_develop:
|
||||||
|
msg = f"Unrecognised game {req.game_id} attempted allnet auth from {request_ip}."
|
||||||
|
self.data.base.log_event(
|
||||||
|
"allnet", "ALLNET_AUTH_UNKNOWN_GAME", logging.WARN, msg
|
||||||
|
)
|
||||||
|
self.logger.warning(msg)
|
||||||
|
|
||||||
|
resp.stat = ALLNET_STAT.bad_game.value
|
||||||
|
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
||||||
|
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.logger.info(
|
||||||
|
f"Allowed unknown game {req.game_id} v{req.ver} to authenticate from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}"
|
||||||
|
)
|
||||||
|
resp.uri = f"http://{self.config.title.hostname}:{self.config.title.port}/{req.game_id}/{req.ver.replace('.', '')}/"
|
||||||
|
resp.host = f"{self.config.title.hostname}:{self.config.title.port}"
|
||||||
|
|
||||||
|
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
||||||
|
resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict))
|
||||||
|
|
||||||
|
self.logger.debug(f"Allnet response: {resp_str}")
|
||||||
|
return (resp_str + "\n").encode("utf-8")
|
||||||
|
|
||||||
|
resp.uri, resp.host = self.uri_registry[req.game_id]
|
||||||
|
|
||||||
int_ver = req.ver.replace(".", "")
|
int_ver = req.ver.replace(".", "")
|
||||||
resp.uri = resp.uri.replace("$v", int_ver)
|
resp.uri = resp.uri.replace("$v", int_ver)
|
||||||
|
@ -213,35 +213,9 @@ class ArcadeData(BaseData):
|
|||||||
return f"{platform_code}{platform_rev:02d}A{serial_num:04d}{append:04d}" # 0x41 = A, 0x52 = R
|
return f"{platform_code}{platform_rev:02d}A{serial_num:04d}{append:04d}" # 0x41 = A, 0x52 = R
|
||||||
|
|
||||||
def validate_keychip_format(self, serial: str) -> bool:
|
def validate_keychip_format(self, serial: str) -> bool:
|
||||||
serial = serial.replace("-", "")
|
if re.fullmatch(r"^A[0-9]{2}[E|X][-]?[0-9]{2}[A-HJ-NP-Z][0-9]{4}([0-9]{4})?$", serial) is None:
|
||||||
if len(serial) != 11 or len(serial) != 15:
|
|
||||||
self.logger.error(
|
|
||||||
f"Serial validate failed: Incorrect length for {serial} (len {len(serial)})"
|
|
||||||
)
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
platform_code = serial[:4]
|
|
||||||
platform_rev = serial[4:6]
|
|
||||||
const_a = serial[6]
|
|
||||||
num = serial[7:11]
|
|
||||||
append = serial[11:15]
|
|
||||||
|
|
||||||
if re.match("A[7|6]\d[E|X][0|1][0|1|2]A\d{4,8}", serial) is None:
|
|
||||||
self.logger.error(f"Serial validate failed: {serial} failed regex")
|
|
||||||
return False
|
|
||||||
|
|
||||||
if len(append) != 0 or len(append) != 4:
|
|
||||||
self.logger.error(
|
|
||||||
f"Serial validate failed: {serial} had malformed append {append}"
|
|
||||||
)
|
|
||||||
return False
|
|
||||||
|
|
||||||
if len(num) != 4:
|
|
||||||
self.logger.error(
|
|
||||||
f"Serial validate failed: {serial} had malformed number {num}"
|
|
||||||
)
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def find_arcade_by_name(self, name: str) -> List[Row]:
|
def find_arcade_by_name(self, name: str) -> List[Row]:
|
||||||
|
Loading…
Reference in New Issue
Block a user