diff --git a/README.md b/README.md index eae0e18..a56cade 100644 --- a/README.md +++ b/README.md @@ -26,4 +26,6 @@ Run [start.bat (Windows)](start.bat) or [start.sh (Linux, MacOS)](start.sh) - **URL Slash 1 (On)** [may still be required in rare cases](modules/__init__.py#L46) +- **URL Slash 0 (Off)** may be required in other cases + - When initially creating a DDR profile, complete an entire credit without pfree hacks diff --git a/config.py b/config.py index b35c773..1c90daf 100644 --- a/config.py +++ b/config.py @@ -16,7 +16,7 @@ def get_ip(): ip = get_ip() port = 8000 -services_prefix = "/core" +response_compression = False verbose_log = True arcade = "M0NKYBUS1N3Z" diff --git a/core_common.py b/core_common.py index 4123ac2..41a1c00 100644 --- a/core_common.py +++ b/core_common.py @@ -1,6 +1,5 @@ import config -import random import time from lxml.builder import ElementMaker @@ -8,7 +7,7 @@ from lxml.builder import ElementMaker from kbinxml import KBinXML from utils.arc4 import EamuseARC4 -from utils.lz77 import EamuseLZ77 +from utils.lz77 import lz77_decode, lz77_encode def _add_val_as_str(elm, val): @@ -36,6 +35,17 @@ def _add_list_as_str(elm, vals): return new_val +def _prng(): + state = 0x41C64E6D + while True: + x = (state * 0x838C9CDA) + 0x6072 + # state = (state * 0x41C64E6D + 0x3039) + # state = (state * 0x41C64E6D + 0x3039) + state = (state * 0xC2A29A69 + 0xD3DC167E) & 0xFFFFFFFF + yield (x & 0x7FFF0000) | state >> 0xF & 0xFFFF +prng_init = _prng() + + E = ElementMaker( typemap={ int: _add_val_as_str, @@ -134,22 +144,19 @@ async def core_process_request(request): if not cl or not data: return {} - if "X-Compress" in request.headers: - request.compress = request.headers.get("X-Compress") - else: - request.compress = None + request.compress = request.headers.get("X-Compress", "none") # intentionally lowercase 'none' (NOT None) if "X-Eamuse-Info" in request.headers: xeamuseinfo = request.headers.get("X-Eamuse-Info") - key = bytes.fromhex(xeamuseinfo[2:].replace("-", "")) - xml_dec = EamuseARC4(key).decrypt(data[: int(cl)]) + version, unix_time, prng = xeamuseinfo.split("-") + xml_dec = EamuseARC4(bytes.fromhex(unix_time), bytes.fromhex(prng)).decrypt(data[: int(cl)]) request.is_encrypted = True else: xml_dec = data[: int(cl)] request.is_encrypted = False if request.compress == "lz77": - xml_dec = EamuseLZ77.decode(xml_dec) + xml_dec = lz77_decode(xml_dec) xml = KBinXML(xml_dec, convert_illegal_things=True) root = xml.xml_doc @@ -196,17 +203,24 @@ async def core_prepare_response(request, xml): response_headers = {"User-Agent": "EAMUSE.Httpac/1.0"} - if request.is_encrypted: - xeamuseinfo = "1-%08x-%04x" % (int(time.time()), random.randint(0x0000, 0xFFFF)) - response_headers["X-Eamuse-Info"] = xeamuseinfo - key = bytes.fromhex(xeamuseinfo[2:].replace("-", "")) - response = EamuseARC4(key).encrypt(xml_binary) + if config.response_compression: + response_headers["X-Compress"] = request.compress + if request.compress == "lz77": + response = lz77_encode(xml_binary) # very slow + else: + response = xml_binary else: - response = bytes(xml_binary) + response_headers["X-Compress"] = "none" # intentionally lowercase 'none' (NOT None) + response = xml_binary - request.compress = None - # if request.compress == "lz77": - # response_headers["X-Compress"] = request.compress - # response = EamuseLZ77.encode(response) + + if request.is_encrypted: + version = 1 + unix_time = int(time.time()) + prng = next(prng_init) & 0xFFFF + response_headers["X-Eamuse-Info"] = f"{version}-{unix_time:04x}-{prng:02x}" + response = EamuseARC4(unix_time.to_bytes(4), prng.to_bytes(2)).encrypt(response) + else: + response = bytes(response) return response, response_headers diff --git a/modules/ddr/api.py b/modules/ddr/api.py index 6194d24..3be10d8 100644 --- a/modules/ddr/api.py +++ b/modules/ddr/api.py @@ -8,10 +8,10 @@ from pydantic import BaseModel import config import utils.card as conv -from utils.lz77 import EamuseLZ77 +from utils.lz77 import lz77_decode import lxml.etree as ET -import ujson as json +import json import struct from typing import Dict, List, Tuple from os import path @@ -202,7 +202,7 @@ class ARC: return self.__data[fileoffset : (fileoffset + compressedsize)] else: # Compressed - return EamuseLZ77.decode( + return lz77_decode( self.__data[fileoffset : (fileoffset + compressedsize)] ) @@ -259,6 +259,6 @@ async def ddr_receive_mdb(file: UploadFile = File(...)) -> bytes: mdb[mcode] = mdb_old[mcode] with open(ddr_metadata, "w", encoding="utf-8") as fp: - json.dump(mdb, fp, indent=4, ensure_ascii=False, escape_forward_slashes=False) + json.dump(mdb, fp, indent=4, ensure_ascii=False) return Response(status_code=201) diff --git a/modules/iidx/api.py b/modules/iidx/api.py index e5d403e..30f3246 100644 --- a/modules/iidx/api.py +++ b/modules/iidx/api.py @@ -10,10 +10,9 @@ from typing import Optional import config import utils.card as conv import utils.musicdata_tool as mdt -from utils.lz77 import EamuseLZ77 import xml.etree.ElementTree as ET -import ujson as json +import json from os import path @@ -309,7 +308,6 @@ async def iidx_receive_mdb(file: UploadFile = File(...)) -> bytes: open(iidx_metadata, "w", encoding="utf8"), indent=4, ensure_ascii=False, - escape_forward_slashes=False, ) return Response(status_code=201) except Exception as e: diff --git a/pyeamu.py b/pyeamu.py index e47a1e1..9ec4e5f 100644 --- a/pyeamu.py +++ b/pyeamu.py @@ -2,7 +2,7 @@ from urllib.parse import urlparse, urlunparse, urlencode import uvicorn -import ujson as json +import json from os import name, path from typing import Optional @@ -33,14 +33,14 @@ for host in ("localhost", config.ip, socket.gethostname()): server_services_urls = [] for server_address in server_addresses: server_services_urls.append( - urlunparse(("http", server_address, config.services_prefix, None, None, None)) + urlunparse(("http", server_address, "/core", None, None, None)) ) settings = {} for s in ( "ip", "port", - "services_prefix", + "response_compression", "verbose_log", "arcade", "paseli", @@ -64,7 +64,7 @@ app.add_middleware( if path.exists("webui"): webui = True with open(path.join("webui", "monkey.json"), "w") as f: - json.dump(settings, f, indent=2, escape_forward_slashes=False) + json.dump(settings, f, indent=2) app.mount("/webui", StaticFiles(directory="webui", html=True), name="webui") else: webui = False @@ -112,8 +112,8 @@ if __name__ == "__main__": uvicorn.run("pyeamu:app", host="0.0.0.0", port=config.port, reload=True) -@app.post(urlpathjoin([config.services_prefix])) -@app.post(urlpathjoin([config.services_prefix, "/{gameinfo}/services/get"])) +@app.post("/core") +@app.post("/core/{gameinfo}/services/get") async def services_get( request: Request, model: Optional[str] = None, diff --git a/requirements.txt b/requirements.txt index 12a5b2c..382736d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,4 @@ kbinxml>=2.0 pycryptodomex python-multipart tinydb -ujson uvicorn[standard] diff --git a/start.sh b/start.sh index d3c1fd0..aef882f 100644 --- a/start.sh +++ b/start.sh @@ -1,6 +1,6 @@ #!/bin/bash -ver="3.12" +ver="3.14" py="python$ver" if ! command -v $py &> /dev/null diff --git a/utils/arc4.py b/utils/arc4.py index d49f8ea..3ce2bfb 100644 --- a/utils/arc4.py +++ b/utils/arc4.py @@ -3,11 +3,9 @@ from Cryptodome.Hash import MD5 class EamuseARC4: - def __init__(self, eamuseKey): - self.internal_key = bytearray.fromhex( - "69D74627D985EE2187161570D08D93B12455035B6DF0D8205DF5" - ) - self.key = MD5.new(eamuseKey + self.internal_key).digest() + def __init__(self, seconds, prng): + self.internal_key = b"\x69\xD7\x46\x27\xD9\x85\xEE\x21\x87\x16\x15\x70\xD0\x8D\x93\xB1\x24\x55\x03\x5B\x6D\xF0\xD8\x20\x5D\xF5" + self.key = MD5.new(seconds + prng + self.internal_key).digest() def decrypt(self, data): return ARC4.new(self.key).decrypt(bytes(data)) diff --git a/utils/lz77.py b/utils/lz77.py index 36cea6d..edcad4e 100644 --- a/utils/lz77.py +++ b/utils/lz77.py @@ -1,33 +1,125 @@ -class EamuseLZ77: - @staticmethod - def decode(data): - data_length = len(data) - offset = 0 - output = [] - while offset < data_length: - flag = data[offset] - offset += 1 - for bit in range(8): - if flag & (1 << bit): - output.append(data[offset]) - offset += 1 - else: - if offset >= data_length: - break - lookback_flag = int.from_bytes(data[offset : offset + 2], "big") - lookback_length = (lookback_flag & 0x000F) + 3 - lookback_offset = lookback_flag >> 4 - offset += 2 - if lookback_flag == 0: - break - for _ in range(lookback_length): - loffset = len(output) - lookback_offset - if loffset <= 0 or loffset >= len(output): - output.append(0) - else: - output.append(output[loffset]) - return bytes(output) +WINDOW_SIZE = 0x1000 +WINDOW_MASK = WINDOW_SIZE - 1 +THRESHOLD = 3 +INPLACE_THRESHOLD = 0xA +LOOK_RANGE = 0x200 +MAX_LEN = 0xF + THRESHOLD +MAX_BUFFER = 0x10 + 1 - # @staticmethod - # def encode(data): - # return bytes(output) + +def match_current(window: bytes, pos: int, max_len: int, data: bytes, dpos: int) -> int: + length = 0 + while ( + dpos + length < len(data) + and length < max_len + and window[(pos + length) & WINDOW_MASK] == data[dpos + length] + and length < MAX_LEN + ): + length += 1 + return length + + +def match_window(window: bytes, pos: int, data: bytes, d_pos: int) -> None | tuple[int, int]: + max_pos = 0 + max_len = 0 + for i in range(THRESHOLD, LOOK_RANGE): + length = match_current(window, (pos - i) & WINDOW_MASK, i, data, d_pos) + if length >= INPLACE_THRESHOLD: + return (i, length) + if length >= THRESHOLD: + max_pos = i + max_len = length + if max_len >= THRESHOLD: + return (max_pos, max_len) + return None + + +def lz77_encode(data: bytes) -> bytes: + output = bytearray() + window = bytearray(WINDOW_SIZE) + current_pos = 0 + current_window = 0 + current_buffer = 0 + flag_byte = 0 + bit = 0 + buffer = [0] * MAX_BUFFER + pad = 3 + while current_pos < len(data): + flag_byte = 0 + current_buffer = 0 + for bit_pos in range(8): + if current_pos >= len(data): + pad = 0 + flag_byte = flag_byte >> (8 - bit_pos) + buffer[current_buffer] = 0 + buffer[current_buffer + 1] = 0 + current_buffer += 2 + break + else: + found = match_window(window, current_window, data, current_pos) + if found is not None and found[1] >= THRESHOLD: + pos, length = found + + byte1 = pos >> 4 + byte2 = (((pos & 0x0F) << 4) | ((length - THRESHOLD) & 0x0F)) + buffer[current_buffer] = byte1 + buffer[current_buffer + 1] = byte2 + current_buffer += 2 + bit = 0 + for _ in range(length): + window[current_window & WINDOW_MASK] = data[current_pos] + current_pos += 1 + current_window += 1 + else: + buffer[current_buffer] = data[current_pos] + window[current_window] = data[current_pos] + current_pos += 1 + current_window += 1 + current_buffer += 1 + bit = 1 + + flag_byte = (flag_byte >> 1) | ((bit & 1) << 7) + current_window = current_window & WINDOW_MASK + + output.append(flag_byte) + for i in range(current_buffer): + output.append(buffer[i]) + for _ in range(pad): + output.append(0) + + return bytes(output) + + +def lz77_decode(data: bytes) -> bytes: + output = bytearray() + cur_byte = 0 + window = bytearray(WINDOW_SIZE) + window_cursor = 0 + + while cur_byte < len(data): + flag = data[cur_byte] + cur_byte += 1 + + for i in range(8): + if (flag >> i) & 1 == 1: + output.append(data[cur_byte]) + window[window_cursor] = data[cur_byte] + window_cursor = (window_cursor + 1) & WINDOW_MASK + cur_byte += 1 + else: + w = ((data[cur_byte]) << 8) | (data[cur_byte + 1]) + if w == 0: + return bytes(output) + + cur_byte += 2 + position = ((window_cursor - (w >> 4)) & WINDOW_MASK) + length = (w & 0x0F) + THRESHOLD + + for _ in range(length): + b = window[position & WINDOW_MASK] + output.append(b) + window[window_cursor] = b + window_cursor = (window_cursor + 1) & WINDOW_MASK + position += 1 + + return bytes(output)