gfdm-server/v8_server/utils/xml.py
2021-01-15 11:21:34 -06:00

154 lines
4.7 KiB
Python

import logging
from binascii import unhexlify
from datetime import datetime
from pathlib import Path
from random import randint
from time import time
from typing import Dict, Tuple, Union
import lxml
from flask import Request
from kbinxml import KBinXML
from lxml import etree as ET # noqa: N812
from v8_server.utils.arc4 import EamuseARC4
from v8_server.utils.lz77 import Lz77
# We want a general logger, and a special logger to log requests separately
logger = logging.getLogger(__name__)
rlogger = logging.getLogger("requests")
# eAmuse Header Tags
X_EAMUSE_INFO = "x-eamuse-info"
X_COMPRESS = "x-compress"
def is_encrypted(request: Request) -> bool:
return X_EAMUSE_INFO in request.headers
def get_encryption_key(request: Request) -> Tuple[str, bytes]:
info = request.headers[X_EAMUSE_INFO]
key = unhexlify(info[2:].replace("-", ""))
return info, key
def make_encryption_key() -> Tuple[str, bytes]:
info = f"1-{int(time()):08x}-{randint(0x0000, 0xffff):04x}"
key = unhexlify(info[2:].replace("-", ""))
return info, key
def is_compressed(request: Request) -> bool:
return X_COMPRESS in request.headers and request.headers[X_COMPRESS] != "none"
def get_compression_type(request: Request) -> str:
return request.headers[X_COMPRESS]
def save_xml(data: bytes, request: Request, kind: str, _type: str = "xml") -> None:
# Always make sure the dir exists
dirpath = Path("./logs/requests")
dirpath.mkdir(exist_ok=True)
# We want some unique identifiers to match requests and responses, so lets use the
# x-eamuse-info header, as well as a hash of the data
info = ""
if is_encrypted(request):
x_eamuse_info, _ = get_encryption_key(request)
info = f"_{x_eamuse_info.replace('-', '_')}"
data_hash = str(hash(request.data))
# Write out the data
date = datetime.now().strftime("%Y_%m_%d_%H_%M")
filepath = dirpath / f"eamuse_{date}_{data_hash}{info}_{kind}.{type}"
with filepath.open("wb") as f:
logging.debug(f"Writing File: {filepath}")
f.write(data)
def eamuse_read_xml(request: Request) -> Tuple[str, str, str, str, str]:
# Get the raw xml data from the request
xml_bin = request.data
# Decrypt the data if necessary
if is_encrypted(request):
_, key = get_encryption_key(request)
xml_bin = EamuseARC4(key).decrypt(xml_bin)
# Decompress the data if necessary
# Right now we only de-compress lz77
if is_compressed(request) and get_compression_type(request) == "lz77":
print("I DECOMPRESSED")
xml_bin = Lz77().decompress(xml_bin)
else:
print("NOPE")
# Convert the binary xml data to text bytes and save a copy
try:
xml_bytes = KBinXML(xml_bin).to_text().encode("UTF-8")
except Exception:
print(xml_bin)
raise
save_xml(xml_bytes, request, "req")
# Convert the xml text to an eTree
root = ET.fromstring(xml_bytes)
# Grab the xml information we care about
model = str(root.attrib["model"])
module = str(root[0].tag)
method = str(root[0].attrib["method"] if "method" in root[0].attrib else None)
command = str(root[0].attrib["command"] if "command" in root[0].attrib else None)
rlogger.debug(
"---- Request ----\n"
f"[ {'Model':^20} | {'Module':^15} | {'Method':^15} | {'Command':^20} ]\n"
f"[ {model:^20} | {module:^15} | {method:^15} | {command:^20} ]\n"
f"{xml_bytes.decode('UTF-8')}\n"
)
# Return raw XML
return xml_bytes, model, module, method, command
def eamuse_prepare_xml(
xml_bytes: Union[bytes, lxml.etree._Element], request: Request
) -> Tuple[bytes, Dict[str, str]]:
# Make sure xml_bytes is a bytes object
if type(xml_bytes) == lxml.etree._Element:
xml_bytes = ET.tostring(xml_bytes, pretty_print=True)
# Lets save our response
save_xml(xml_bytes, request, "resp")
# Lets make our own encryption key
x_eamuse_info, key = make_encryption_key()
# Common headers
headers = {
"Content-Type": "applicaton/octet_stream",
"Server": "Microsoft-HTTPAPI/2.0",
}
# Convert XML to binary and save
xml_bin = KBinXML(xml_bytes).to_binary()
save_xml(xml_bin, request, "resp", _type="bin")
# Compress if necessary
# Right now we only compress lz77
if is_compressed(request) and get_compression_type(request) == "lz77":
headers[X_COMPRESS] = "lz77"
xml_bin = Lz77().compress(xml_bin)
# Encrypt if necessary
if is_encrypted(request):
headers[X_EAMUSE_INFO] = x_eamuse_info
xml_bin = EamuseARC4(key).encrypt(xml_bin)
rlogger.debug(f"---- Response ----\n{xml_bytes.decode('UTF-8')}\n")
return xml_bin, headers