From 6fae1448506e02c45aaa2e4206be9152bef8e0cd Mon Sep 17 00:00:00 2001 From: 573dev <> Date: Thu, 15 Oct 2020 09:54:58 -0500 Subject: [PATCH] refactored xml utils to be cleaner --- v8_server/__init__.py | 9 ++- v8_server/utils/xml.py | 167 ++++++++++++++++++++++++---------------- v8_server/view/index.py | 14 ++-- 3 files changed, 113 insertions(+), 77 deletions(-) diff --git a/v8_server/__init__.py b/v8_server/__init__.py index 2f39f81..852fa1d 100644 --- a/v8_server/__init__.py +++ b/v8_server/__init__.py @@ -35,6 +35,7 @@ class RequestFormatter(logging.Formatter): # Define the logger +logpath = Path(__file__).parent.parent dictConfig( { "version": 1, @@ -58,25 +59,25 @@ dictConfig( }, "debugfile": { "class": "logging.handlers.TimedRotatingFileHandler", - "filename": "logs/debug.log", + "filename": logpath / "debug.log", "formatter": "default", "when": "midnight", }, "requestsfile": { "class": "logging.handlers.TimedRotatingFileHandler", - "filename": "logs/requests.log", + "filename": logpath / "requests.log", "formatter": "detailed", "when": "midnight", }, "allfile": { "class": "logging.handlers.TimedRotatingFileHandler", - "filename": "logs/all.log", + "filename": logpath / "all.log", "formatter": "detailed", "when": "midnight", }, "werkzeugfile": { "class": "logging.handlers.TimedRotatingFileHandler", - "filename": "logs/werkzeug.log", + "filename": logpath / "werkzeug.log", "formatter": "detailed", "when": "midnight", }, diff --git a/v8_server/utils/xml.py b/v8_server/utils/xml.py index 11c4a67..71f5cd2 100644 --- a/v8_server/utils/xml.py +++ b/v8_server/utils/xml.py @@ -1,9 +1,10 @@ import logging from binascii import unhexlify +from datetime import datetime from pathlib import Path from random import randint from time import time -from typing import Dict, Tuple +from typing import Dict, Tuple, Union import lxml from flask import Request @@ -11,51 +12,92 @@ from kbinxml import KBinXML from lxml import etree as ET # noqa: N812 from v8_server.utils.arc4 import EamuseARC4 -from v8_server.utils.eamuse import get_timestamp from v8_server.utils.lz77 import Lz77 +# We want a general logger, and a special logger to log requests separately logger = logging.getLogger(__name__) rlogger = logging.getLogger("requests") -EAMUSE_CONFIG = {"encrypted": False, "compressed": False} -REQUESTS_PATH = Path("./requests") +# eAmuse Header Tags +X_EAMUSE_INFO = "x-eamuse-info" +X_COMPRESS = "x-compress" + + +def is_encrypted(request: Request) -> bool: + return X_EAMUSE_INFO in request.headers + + +def get_encryption_key(request: Request) -> Tuple[str, bytes]: + info = request.headers[X_EAMUSE_INFO] + key = unhexlify(info[2:].replace("-", "")) + return info, key + + +def make_encryption_key() -> Tuple[str, bytes]: + info = f"1-{int(time()):08x}-{randint(0x0000, 0xffff):04x}" + key = unhexlify(info[2:].replace("-", "")) + return info, key + + +def is_compressed(request: Request) -> bool: + return X_COMPRESS in request.headers and request.headers[X_COMPRESS] != "none" + + +def get_compression_type(request: Request) -> str: + return request.headers[X_COMPRESS] + + +def save_xml(data: bytes, request: Request, kind: str, _type: str = "xml") -> None: + # Always make sure the dir exists + dirpath = Path("./logs/requests") + dirpath.mkdir(exist_ok=True) + + # We want some unique identifiers to match requests and responses, so lets use the + # x-eamuse-info header, as well as a hash of the data + info = "" + if is_encrypted(request): + x_eamuse_info, _ = get_encryption_key(request) + info = f"_{x_eamuse_info.replace('-', '_')}" + data_hash = str(hash(request.data)) + + # Write out the data + date = datetime.now().strftime("%Y_%m_%d_%H_%M") + filepath = dirpath / f"eamuse_{date}_{data_hash}{info}_{kind}.{type}" + with filepath.open("wb") as f: + logging.debug(f"Writing File: {filepath}") + f.write(data) def eamuse_read_xml(request: Request) -> Tuple[str, str, str, str, str]: - # Get encrypted/compressed data from client - headers = request.headers - data = request.data + # Get the raw xml data from the request + xml_bin = request.data - if "x-eamuse-info" in headers: - # Decrypt/Compress data - x_eamuse_info = headers["x-eamuse-info"] - key = unhexlify(x_eamuse_info[2:].replace("-", "")) - arc4 = EamuseARC4(key) - xml_dec = arc4.decrypt(data) - EAMUSE_CONFIG["encrypted"] = True + # Decrypt the data if necessary + if is_encrypted(request): + _, key = get_encryption_key(request) + xml_bin = EamuseARC4(key).decrypt(xml_bin) + + # Decompress the data if necessary + # Right now we only de-compress lz77 + if is_compressed(request) and get_compression_type(request) == "lz77": + print("I DECOMPRESSED") + xml_bin = Lz77().decompress(xml_bin) else: - xml_dec = data - EAMUSE_CONFIG["encrypted"] = False + print("NOPE") - compress = headers["x-compress"] if "x-compress" in headers else None + # Convert the binary xml data to text bytes and save a copy + try: + xml_bytes = KBinXML(xml_bin).to_text().encode("UTF-8") + except Exception: + print(xml_bin) + raise + save_xml(xml_bytes, request, "req") - if compress == "lz77": - lz77 = Lz77() - xml_dec = lz77.decompress(xml_dec) - EAMUSE_CONFIG["compress"] = True - else: - EAMUSE_CONFIG["compress"] = False - - xml_text = KBinXML(xml_dec).to_text().encode("UTF-8") - - root = ET.fromstring(xml_text) - - REQUESTS_PATH.mkdir(exist_ok=True) - output_filename = REQUESTS_PATH / f"eamuse_{get_timestamp()}_req.xml" - with output_filename.open("w") as f: - f.write(xml_text.decode("UTF-8")) + # Convert the xml text to an eTree + root = ET.fromstring(xml_bytes) + # Grab the xml information we care about model = str(root.attrib["model"]) module = str(root[0].tag) method = str(root[0].attrib["method"] if "method" in root[0].attrib else None) @@ -65,54 +107,47 @@ def eamuse_read_xml(request: Request) -> Tuple[str, str, str, str, str]: "---- Request ----\n" f"[ {'Model':^20} | {'Module':^15} | {'Method':^15} | {'Command':^20} ]\n" f"[ {model:^20} | {module:^15} | {method:^15} | {command:^20} ]\n" - f"{xml_text.decode('UTF-8')[:-1]}\n" + f"{xml_bytes.decode('UTF-8')}\n" ) # Return raw XML - return xml_text, model, module, method, command + return xml_bytes, model, module, method, command -def eamuse_prepare_xml(xml: str) -> Tuple[bytes, Dict[str, str]]: - x_eamuse_info = f"1-{int(time()):08x}-{randint(0x0000, 0xffff):04x}" - key = unhexlify(x_eamuse_info[2:].replace("-", "")) +def eamuse_prepare_xml( + xml_bytes: Union[bytes, lxml.etree._Element], request: Request +) -> Tuple[bytes, Dict[str, str]]: + # Make sure xml_bytes is a bytes object + if type(xml_bytes) == lxml.etree._Element: + xml_bytes = ET.tostring(xml_bytes, pretty_print=True) - xml_root = xml - if type(xml) == lxml.etree._Element: - xml = ET.tostring(xml, encoding="UTF-8").decode("UTF-8") + # Lets save our response + save_xml(xml_bytes, request, "resp") - REQUESTS_PATH.mkdir(exist_ok=True) - timestamp = get_timestamp() - output_filename = REQUESTS_PATH / f"eamuse_{timestamp}_resp.xml" - with output_filename.open("wb") as f: - f.write(xml.encode("UTF-8")) + # Lets make our own encryption key + x_eamuse_info, key = make_encryption_key() + # Common headers headers = { "Content-Type": "applicaton/octet_stream", "Server": "Microsoft-HTTPAPI/2.0", } - # Convert XML to binary - xml_bin = KBinXML(xml.encode("UTF-8")).to_binary() - output_filename = REQUESTS_PATH / f"eamuse_{timestamp}_resp.bin" - with output_filename.open("wb") as f: - f.write(xml_bin) + # Convert XML to binary and save + xml_bin = KBinXML(xml_bytes).to_binary() + save_xml(xml_bin, request, "resp", _type="bin") - if EAMUSE_CONFIG["compress"]: - headers["X-Compress"] = "lz77" - lz77 = Lz77() - xml_bin = lz77.compress(xml_bin) + # Compress if necessary + # Right now we only compress lz77 + if is_compressed(request) and get_compression_type(request) == "lz77": + headers[X_COMPRESS] = "lz77" + xml_bin = Lz77().compress(xml_bin) - headers["X-Eamuse-Info"] = x_eamuse_info + # Encrypt if necessary + if is_encrypted(request): + headers[X_EAMUSE_INFO] = x_eamuse_info + xml_bin = EamuseARC4(key).encrypt(xml_bin) - if EAMUSE_CONFIG["encrypted"]: - arc4 = EamuseARC4(key) - data = arc4.encrypt(xml_bin) - else: - data = xml_bin + rlogger.debug(f"---- Response ----\n{xml_bytes.decode('UTF-8')}\n") - rlogger.debug( - "---- Response ----\n" - f"{ET.tostring(xml_root, pretty_print=True).decode('UTF-8')[:-1]}\n" - ) - - return data, headers + return xml_bin, headers diff --git a/v8_server/view/index.py b/v8_server/view/index.py index 8b20be6..5d22ff7 100644 --- a/v8_server/view/index.py +++ b/v8_server/view/index.py @@ -55,7 +55,7 @@ def pcbtracker() -> Tuple[bytes, Dict[str, str]]: # There shoulnd't really even be any other methods raise Exception("Not sure how to handle this PCBTracker Request") - return eamuse_prepare_xml(response) + return eamuse_prepare_xml(response, request) @app.route("/message/service", methods=["POST"]) @@ -65,7 +65,7 @@ def message() -> Tuple[bytes, Dict[str, str]]: """ _ = eamuse_read_xml(request) response = base_response("message") - return eamuse_prepare_xml(response) + return eamuse_prepare_xml(response, request) @app.route("/pcbevent/service", methods=["POST"]) @@ -78,7 +78,7 @@ def pcbevent() -> Tuple[bytes, Dict[str, str]]: # TODO: Log the data from `request_xml` response = base_response("pcbevent") - return eamuse_prepare_xml(response) + return eamuse_prepare_xml(response, request) @app.route("/facility/service", methods=["POST"]) @@ -129,7 +129,7 @@ def facility() -> Tuple[bytes, Dict[str, str]]: {"expire": "600"}, ) ) - return eamuse_prepare_xml(response) + return eamuse_prepare_xml(response, request) @app.route("/package/service", methods=["POST"]) @@ -140,7 +140,7 @@ def package() -> Tuple[bytes, Dict[str, str]]: _ = eamuse_read_xml(request) response = base_response("package") - return eamuse_prepare_xml(response) + return eamuse_prepare_xml(response, request) @app.route("/local/service", methods=["POST"]) @@ -169,7 +169,7 @@ def local() -> Tuple[bytes, Dict[str, str]]: else: response = base_response(module) - return eamuse_prepare_xml(response) + return eamuse_prepare_xml(response, request) @app.route("/service/services/services/", methods=["POST"]) @@ -213,4 +213,4 @@ def services() -> Tuple[bytes, Dict[str, str]]: ) ) - return eamuse_prepare_xml(response) + return eamuse_prepare_xml(response, request)