From b77ccdd5b952dd3ca2b44dfdfd4443fc074d0d83 Mon Sep 17 00:00:00 2001 From: Jennifer Taylor Date: Sat, 24 Apr 2021 17:59:36 +0000 Subject: [PATCH] Beginning of an AFP ByteCode decompiler, starting with a massive code reorg and a control flow graph analyzer. --- bemani/format/afp/container.py | 36 +-- bemani/format/afp/decompile.py | 282 ++++++++++++++++++++++++ bemani/format/afp/geo.py | 14 +- bemani/format/afp/swf.py | 325 +++++++++++----------------- bemani/format/afp/types/__init__.py | 48 +++- bemani/format/afp/types/ap2.py | 260 +++++++++++++++++++++- bemani/format/afp/types/generic.py | 16 +- bemani/utils/afputils.py | 16 +- 8 files changed, 761 insertions(+), 236 deletions(-) create mode 100644 bemani/format/afp/decompile.py diff --git a/bemani/format/afp/container.py b/bemani/format/afp/container.py index 5421ce6..044586e 100644 --- a/bemani/format/afp/container.py +++ b/bemani/format/afp/container.py @@ -29,7 +29,7 @@ class PMAN: self.flags2 = flags2 self.flags3 = flags3 - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'flags': [self.flags1, self.flags2, self.flags3], 'entries': self.entries, @@ -64,7 +64,7 @@ class Texture: self.compressed = compressed self.img = imgdata - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'name': self.name, 'width': self.width, @@ -85,7 +85,7 @@ class TextureRegion: self.right = right self.bottom = bottom - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'texture': self.textureno, 'left': self.left, @@ -117,7 +117,7 @@ class Unknown1: if len(data) != 12: raise Exception("Unexpected length for Unknown1 structure!") - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'name': self.name, 'data': "".join(_hex(x) for x in self.data), @@ -133,7 +133,7 @@ class Unknown2: if len(data) != 4: raise Exception("Unexpected length for Unknown2 structure!") - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'data': "".join(_hex(x) for x in self.data), } @@ -215,7 +215,7 @@ class TXP2File(TrackedCoverage, VerboseOutput): with self.debugging(verbose): self.__parse(verbose) - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'endian': self.endian, 'features': self.features, @@ -223,19 +223,19 @@ class TXP2File(TrackedCoverage, VerboseOutput): 'obfuscated': self.text_obfuscated, 'legacy_lz': self.legacy_lz, 'modern_lz': self.modern_lz, - 'textures': [tex.as_dict() for tex in self.textures], - 'texturemap': self.texturemap.as_dict(), - 'textureregion': [reg.as_dict() for reg in self.texture_to_region], - 'regionmap': self.regionmap.as_dict(), - 'swfdata': [data.as_dict() for data in self.swfdata], - 'swfmap': self.swfmap.as_dict(), + 'textures': [tex.as_dict(*args, **kwargs) for tex in self.textures], + 'texturemap': self.texturemap.as_dict(*args, **kwargs), + 'textureregion': [reg.as_dict(*args, **kwargs) for reg in self.texture_to_region], + 'regionmap': self.regionmap.as_dict(*args, **kwargs), + 'swfdata': [data.as_dict(*args, **kwargs) for data in self.swfdata], + 'swfmap': self.swfmap.as_dict(*args, **kwargs), 'fontdata': str(self.fontdata) if self.fontdata is not None else None, - 'shapes': [shape.as_dict() for shape in self.shapes], - 'shapemap': self.shapemap.as_dict(), - 'unknown1': [unk.as_dict() for unk in self.unknown1], - 'unknown1map': self.unk_pman1.as_dict(), - 'unknown2': [unk.as_dict() for unk in self.unknown2], - 'unknown2map': self.unk_pman2.as_dict(), + 'shapes': [shape.as_dict(*args, **kwargs) for shape in self.shapes], + 'shapemap': self.shapemap.as_dict(*args, **kwargs), + 'unknown1': [unk.as_dict(*args, **kwargs) for unk in self.unknown1], + 'unknown1map': self.unk_pman1.as_dict(*args, **kwargs), + 'unknown2': [unk.as_dict(*args, **kwargs) for unk in self.unknown2], + 'unknown2map': self.unk_pman2.as_dict(*args, **kwargs), } @staticmethod diff --git a/bemani/format/afp/decompile.py b/bemani/format/afp/decompile.py new file mode 100644 index 0000000..25c11a2 --- /dev/null +++ b/bemani/format/afp/decompile.py @@ -0,0 +1,282 @@ +import os +from typing import Any, Dict, List, Tuple, cast + +from .types import AP2Action, JumpAction, IfAction +from .util import VerboseOutput + + +class ByteCode: + # A list of bytecodes to execute. + def __init__(self, actions: List[AP2Action], end_offset: int) -> None: + self.actions = actions + self.end_offset = end_offset + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + if kwargs.get('decompile_bytecode', False): + decompiler = ByteCodeDecompiler(self) + code = decompiler.decompile(verbose=True) + + return { + 'code': code, + } + else: + return { + 'actions': [a.as_dict(*args, **kwargs) for a in self.actions], + 'end_offset': self.end_offset, + } + + def __repr__(self) -> str: + entries: List[str] = [] + for action in self.actions: + entries.extend([f" {s}" for s in str(action).split(os.linesep)]) + + return f"ByteCode({os.linesep}{os.linesep.join(entries)}{os.linesep} {self.end_offset}: END{os.linesep})" + + +class ControlFlow: + def __init__(self, beginning: int, end: int, next_flow: List[int]) -> None: + self.beginning = beginning + self.end = end + self.next_flow = next_flow + + def contains(self, offset: int) -> bool: + return (self.beginning <= offset) and (offset < self.end) + + def is_first(self, offset: int) -> bool: + return self.beginning == offset + + def is_last(self, offset: int) -> bool: + return self.end == (offset + 1) + + def split(self, offset: int, link: bool = False) -> Tuple["ControlFlow", "ControlFlow"]: + if not self.contains(offset): + raise Exception(f"This ControlFlow does not contain offset {offset}") + + # First, make the second half that the first half will point to. + second = ControlFlow( + offset, + self.end, + self.next_flow, + ) + + # Now, make the first half that we can point to. + first = ControlFlow( + self.beginning, + offset, + [second.beginning] if link else [], + ) + + return (first, second) + + def __repr__(self) -> str: + return f"ControlFlow(beginning={self.beginning}, end={self.end}, next={(', '.join(str(n) for n in self.next_flow)) or 'N/A'}" + + +class ByteCodeChunk: + def __init__(self, actions: List[AP2Action], next_chunk: List[int]) -> None: + self.actions = actions + self.next_chunk = next_chunk + + @property + def offset(self) -> int: + return self.actions[0].offset + + def __repr__(self) -> str: + entries: List[str] = [] + for action in self.actions: + entries.extend([f" {s}" for s in str(action).split(os.linesep)]) + + return f"ByteCodeChunk({os.linesep}{os.linesep.join(entries)}{os.linesep} Next Offsets: {', '.join(str(n) for n in self.next_chunk) or 'None'}{os.linesep})" + + +class ByteCodeDecompiler(VerboseOutput): + def __init__(self, bytecode: ByteCode) -> None: + super().__init__() + + self.bytecode = bytecode + + def __graph_control_flow(self) -> List[ByteCodeChunk]: + # Start by assuming that the whole bytecode never directs flow. This is, confusingly, + # indexed by AP2Action offset, not by actual bytecode offset, so we can avoid the + # prickly problem of opcodes that take more than one byte in the data. + flows: Dict[int, ControlFlow] = {} + end = len(self.bytecode.actions) + beginning = 0 + + # The end of the program. + flows[end] = ControlFlow(end, end + 1, []) + + # The rest of the program. + flows[beginning] = ControlFlow(beginning, end, [end]) + + # Function that helps us find a flow by position. + def find(opcodeno: int) -> int: + for start, cf in flows.items(): + if cf.contains(opcodeno): + return start + + raise Exception(f"Offset {opcodeno} somehow not in our control flow graph!") + + # Now, walk the entire bytecode, and every control flow point split the graph at that point. + for i, action in enumerate(self.bytecode.actions): + current_action = i + next_action = i + 1 + + if action.opcode in [AP2Action.THROW, AP2Action.RETURN]: + # This should end execution, so we should cap off the current execution + # and send it to the end. + current_action_flow = find(current_action) + next_action_flow = find(next_action) + + if current_action_flow == next_action_flow: + # We need to split this on the next_action boundary. + first, second = flows[current_action_flow].split(next_action) + first.next_flow = [end] + + self.vprint(f"{action} action split {flows[current_action_flow]} into {first}, {second}") + + flows[current_action_flow] = first + flows[next_action] = second + + else: + # This already was split in two, presumably by something + # earlier in the chain jumping to the opcode after this. + # We need to unlink the current flow from the second and + # link it to the end. + flows[current_action_flow].next_flow = [end] + + self.vprint(f"{action} action repointed {flows[current_action_flow]} to end") + elif action.opcode == AP2Action.JUMP: + # Unconditional control flow redirection after this, we should split the + # section if necessary and point this section at the new offset. + # First, we need to find the jump point and make sure that its the start + # of a section. + action = cast(JumpAction, action) + for j, dest in enumerate(self.bytecode.actions): + if dest.offset == action.jump_offset: + dest_action = j + break + else: + raise Exception(f"{action} jumps to an opcode that doesn't exist!") + + # If the destination action flow already starts with the jump offset, + # then we're good, we just need to point our current split at this new + # offset. If it doesn't start with the jump offset, then we need to split + # that flow so we can point to the opcode directly. + dest_action_flow = find(dest_action) + if not flows[dest_action_flow].is_first(dest_action): + first, second = flows[dest_action_flow].split(dest_action, link=True) + + self.vprint(f"{action} action required split of {flows[dest_action_flow]} into {first, second}") + + flows[dest_action_flow] = first + flows[dest_action] = second + + # Now, the second is what we want to point at in the next section. + dest_action_flow = dest_action + + # Now, we must split the current flow at the point of this jump. + current_action_flow = find(current_action) + next_action_flow = find(next_action) + + if current_action_flow == next_action_flow: + # We need to split this on the next_action boundary. + first, second = flows[current_action_flow].split(next_action) + first.next_flow = [dest_action_flow] + + self.vprint(f"{action} action split {flows[current_action_flow]} into {first}, {second}") + + flows[current_action_flow] = first + flows[next_action] = second + else: + # This already was split in two, presumably by something + # earlier in the chain jumping to the opcode after this. + # We need to unlink the current flow from the second and + # link it to the end. + flows[current_action_flow].next_flow = [dest_action_flow] + + self.vprint(f"{action} action repointed {flows[current_action_flow]} to new chunk") + elif action.opcode in [AP2Action.IF, AP2Action.IF2]: + # Conditional control flow redirection after this, we should split the + # section if necessary and point this section at the new offset as well + # as the second half of the split section. + # First, we need to find the jump point and make sure that its the start + # of a section. + action = cast(IfAction, action) + for j, dest in enumerate(self.bytecode.actions): + if dest.offset == action.jump_if_true_offset: + dest_action = j + break + else: + raise Exception(f"{action} conditional jumps to an opcode that doesn't exist!") + + # If the destination action flow already starts with the jump offset, + # then we're good, we just need to point our current split at this new + # offset. If it doesn't start with the jump offset, then we need to split + # that flow so we can point to the opcode directly. + dest_action_flow = find(dest_action) + if not flows[dest_action_flow].is_first(dest_action): + first, second = flows[dest_action_flow].split(dest_action, link=True) + + self.vprint(f"{action} action required split of {flows[dest_action_flow]} into {first, second}") + + flows[dest_action_flow] = first + flows[dest_action] = second + + # Now, the second is what we want to point at in the next section. + dest_action_flow = dest_action + + # Now, we must split the current flow at the point of this jump. + current_action_flow = find(current_action) + next_action_flow = find(next_action) + + if current_action_flow == next_action_flow: + # We need to split this on the next_action boundary. + first, second = flows[current_action_flow].split(next_action) + first.next_flow = [next_action, dest_action_flow] + + self.vprint(f"{action} action split {flows[current_action_flow]} into {first}, {second}") + + flows[current_action_flow] = first + flows[next_action] = second + else: + # This already was split in two, presumably by something + # earlier in the chain jumping to the opcode after this. + # We need to unlink the current flow from the second and + # link it to the end. + flows[current_action_flow].next_flow = [next_action, dest_action_flow] + + self.vprint(f"{action} action repointed {flows[current_action_flow]} to new chunk") + + # Finally, return chunks of contiguous execution. + chunks: List[ByteCodeChunk] = [] + for start, flow in flows.items(): + if start == end: + # We don't want to render out the end of the graph, it was only there to make + # the above algorithm easier. + continue + + if len(flow.next_flow) == 1 and flow.next_flow[0] == end: + # This flow is a termination state. + chunks.append(ByteCodeChunk(self.bytecode.actions[flow.beginning:flow.end], [])) + else: + next_chunks: List[int] = [] + for ano in flow.next_flow: + if ano == end: + raise Exception("Logic error!") + next_chunks.append(self.bytecode.actions[ano].offset) + chunks.append(ByteCodeChunk(self.bytecode.actions[flow.beginning:flow.end], next_chunks)) + + return sorted(chunks, key=lambda c: c.offset) + + def decompile(self, verbose: bool = False) -> str: + with self.debugging(verbose): + return self.__decompile() + + def __decompile(self) -> str: + # First, we need to construct a control flow graph. + chunks = self.__graph_control_flow() + + self.vprint(chunks) + + return "TODO" diff --git a/bemani/format/afp/geo.py b/bemani/format/afp/geo.py index dafc5e5..b1af67f 100644 --- a/bemani/format/afp/geo.py +++ b/bemani/format/afp/geo.py @@ -34,13 +34,13 @@ class Shape: # Whether this is parsed. self.parsed = False - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'name': self.name, - 'vertex_points': [p.as_dict() for p in self.vertex_points], - 'tex_points': [p.as_dict() for p in self.tex_points], - 'tex_colors': [c.as_dict() for c in self.tex_colors], - 'draw_params': [d.as_dict() for d in self.draw_params], + 'vertex_points': [p.as_dict(*args, **kwargs) for p in self.vertex_points], + 'tex_points': [p.as_dict(*args, **kwargs) for p in self.tex_points], + 'tex_colors': [c.as_dict(*args, **kwargs) for c in self.tex_colors], + 'draw_params': [d.as_dict(*args, **kwargs) for d in self.draw_params], } def __repr__(self) -> str: @@ -205,12 +205,12 @@ class DrawParams: self.vertexes = vertexes self.blend = blend - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'flags': self.flags, 'region': self.region, 'vertexes': self.vertexes, - 'blend': self.blend.as_dict() if self.blend else None, + 'blend': self.blend.as_dict(*args, **kwargs) if self.blend else None, } def __repr__(self) -> str: diff --git a/bemani/format/afp/swf.py b/bemani/format/afp/swf.py index 6c4b7e1..b7ccceb 100644 --- a/bemani/format/afp/swf.py +++ b/bemani/format/afp/swf.py @@ -3,9 +3,34 @@ import struct import sys from typing import Any, Dict, List, Optional, Tuple +from .decompile import ByteCode from .types import Matrix, Color, Point, Rectangle -from .types import AP2Action, AP2Tag, AP2Property -from .util import TrackedCoverage, VerboseOutput, _hex +from .types import ( + AP2Action, + AP2Tag, + AP2Property, + DefineFunction2Action, + InitRegisterAction, + StoreRegisterAction, + JumpAction, + WithAction, + PushAction, + AddNumVariableAction, + AddNumRegisterAction, + IfAction, + GetURL2Action, + StartDragAction, + GotoFrame2Action, + Register, + NULL, + UNDEFINED, + THIS, + ROOT, + PARENT, + CLIP, + GLOBAL, +) +from .util import TrackedCoverage, VerboseOutput class NamedTagReference: @@ -13,7 +38,7 @@ class NamedTagReference: self.swf = swf_name self.tag = tag_name - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'swf': self.swf, 'tag': self.tag, @@ -23,195 +48,6 @@ class NamedTagReference: return f"{self.swf}.{self.tag}" -class DefineFunction2Action(AP2Action): - def __init__(self, offset: int, name: Optional[str], flags: int, body: "ByteCode") -> None: - super().__init__(offset, AP2Action.DEFINE_FUNCTION2) - self.name = name - self.flags = flags - self.body = body - - def __repr__(self) -> str: - bytecode = [f" {line}" for line in str(self.body).split(os.linesep)] - action_name = AP2Action.action_to_name(self.opcode) - return os.linesep.join([ - f"{self.offset}: {action_name}, Name: {self.name or ''}, Flags: {hex(self.flags)}", - *bytecode, - f"END_{action_name}", - ]) - - -# A bunch of stuff for implementing PushAction -class GenericObject: - def __init__(self, name: str) -> None: - self.__name = name - - def __repr__(self) -> str: - return self.__name - - -NULL = GenericObject('NULL') -UNDEFINED = GenericObject('UNDEFINED') -THIS = GenericObject('THIS') -ROOT = GenericObject('ROOT') -PARENT = GenericObject('PARENT') -CLIP = GenericObject('CLIP') -GLOBAL = GenericObject('GLOBAL') - - -class Register: - def __init__(self, no: int) -> None: - self.no = no - - def __repr__(self) -> str: - return f"Register {self.no}" - - -class PushAction(AP2Action): - def __init__(self, offset: int, objects: List[Any]) -> None: - super().__init__(offset, AP2Action.PUSH) - self.objects = objects - - def __repr__(self) -> str: - objects = [f" {repr(obj)}" for obj in self.objects] - action_name = AP2Action.action_to_name(self.opcode) - return os.linesep.join([ - f"{self.offset}: {action_name}", - *objects, - f"END_{action_name}", - ]) - - -class InitRegisterAction(AP2Action): - def __init__(self, offset: int, registers: List[Register]) -> None: - super().__init__(offset, AP2Action.INIT_REGISTER) - self.registers = registers - - def __repr__(self) -> str: - registers = [f" {reg}" for reg in self.registers] - action_name = AP2Action.action_to_name(self.opcode) - return os.linesep.join([ - f"{self.offset}: {action_name}", - *registers, - f"END_{action_name}", - ]) - - -class StoreRegisterAction(AP2Action): - def __init__(self, offset: int, registers: List[Register]) -> None: - super().__init__(offset, AP2Action.STORE_REGISTER) - self.registers = registers - - def __repr__(self) -> str: - registers = [f" {reg}" for reg in self.registers] - action_name = AP2Action.action_to_name(self.opcode) - return os.linesep.join([ - f"{self.offset}: {action_name}", - *registers, - f"END_{action_name}", - ]) - - -class IfAction(AP2Action): - def __init__(self, offset: int, jump_if_true_offset: int) -> None: - super().__init__(offset, AP2Action.IF) - self.jump_if_true_offset = jump_if_true_offset - - def __repr__(self) -> str: - return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Offset To Jump To If True: {self.jump_if_true_offset}" - - -class If2Action(AP2Action): - def __init__(self, offset: int, comparison: str, jump_if_true_offset: int) -> None: - super().__init__(offset, AP2Action.IF2) - self.comparison = comparison - self.jump_if_true_offset = jump_if_true_offset - - def __repr__(self) -> str: - return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Comparison: {self.comparison}, Offset To Jump To If True: {self.jump_if_true_offset}" - - -class JumpAction(AP2Action): - def __init__(self, offset: int, jump_offset: int) -> None: - super().__init__(offset, AP2Action.JUMP) - self.jump_offset = jump_offset - - def __repr__(self) -> str: - return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Offset To Jump To: {self.jump_offset}" - - -class WithAction(AP2Action): - def __init__(self, offset: int, unknown: bytes) -> None: - super().__init__(offset, AP2Action.WITH) - self.unknown = unknown - - def __repr__(self) -> str: - return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Unknown: {self.unknown!r}" - - -class GotoFrame2Action(AP2Action): - def __init__(self, offset: int, additional_frames: int, stop: bool) -> None: - super().__init__(offset, AP2Action.GOTO_FRAME2) - self.additional_frames = additional_frames - self.stop = stop - - def __repr__(self) -> str: - return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Additional Frames: {self.additional_frames}, Stop On Arrival: {'yes': if self.stop else 'no'}" - - -class AddNumVariableAction(AP2Action): - def __init__(self, offset: int, amount_to_add: int) -> None: - super().__init__(offset, AP2Action.ADD_NUM_VARIABLE) - self.amount_to_add = amount_to_add - - def __repr__(self) -> str: - return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Amount To Add: {self.amount_to_add}" - - -class AddNumRegisterAction(AP2Action): - def __init__(self, offset: int, register: Register, amount_to_add: int) -> None: - super().__init__(offset, AP2Action.ADD_NUM_REGISTER) - self.register = register - self.amount_to_add = amount_to_add - - def __repr__(self) -> str: - return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Register: {self.register}, Amount To Add: {self.amount_to_add}" - - -class GetURL2Action(AP2Action): - def __init__(self, offset: int, action: int) -> None: - super().__init__(offset, AP2Action.GET_URL2) - self.action = action - - def __repr__(self) -> str: - return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Action: {self.action}" - - -class StartDragAction(AP2Action): - def __init__(self, offset: int, constrain: Optional[bool]) -> None: - super().__init__(offset, AP2Action.START_DRAG) - self.constrain = constrain - - def __repr__(self) -> str: - if self.constrain is None: - cstr = "check stack" - else: - cstr = "yes" if self.constrain else "no" - return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Constrain Mouse: {cstr}" - - -class ByteCode: - # A list of bytecodes to execute. - def __init__(self, actions: List[AP2Action]) -> None: - self.actions = actions - - def __repr__(self) -> str: - entries: List[str] = [] - for action in self.actions: - entries.extend([f" {s}" for s in str(action).split(os.linesep)]) - - return f"ByteCode({os.linesep}{os.linesep.join(entries)}{os.linesep})" - - class TagPointer: # A pointer to a tag in this SWF by Tag ID and containing an optional initialization bytecode # to run for this tag when it is placed/executed. @@ -219,6 +55,12 @@ class TagPointer: self.id = id self.init_bytecode = init_bytecode + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + 'id': self.id, + 'init_bytecode': self.init_bytecode.as_dict(*args, **kwargs) if self.init_bytecode else None, + } + class Frame: def __init__(self, start_tag_offset: int, num_tags: int, imported_tags: List[TagPointer] = []) -> None: @@ -229,19 +71,29 @@ class Frame: self.num_tags = num_tags # A list of any imported tags that are to be placed this frame. - self.imported_tags = imported_tags + self.imported_tags = imported_tags or [] # The current tag we're processing, if any. self.current_tag = 0 + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + 'start_tag_offset': self.start_tag_offset, + 'num_tags': self.num_tags, + 'imported_tags': [i.as_dict(*args, **kwargs) for i in self.imported_tags], + } + class Tag: # Any tag that can appear in the SWF. All tags will subclass from this for their behavior. def __init__(self, id: Optional[int]) -> None: self.id = id - def children(self) -> List["Tag"]: - return [] + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + 'id': self.id, + 'type': self.__class__.__name__, + } class AP2ShapeTag(Tag): @@ -251,6 +103,12 @@ class AP2ShapeTag(Tag): # The reference is the name of a shape (geo structure) that defines this primitive or sprite. self.reference = reference + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'reference': self.reference, + } + class AP2DefineFontTag(Tag): def __init__(self, id: int, fontname: str, xml_prefix: str, heights: List[int]) -> None: @@ -268,6 +126,14 @@ class AP2DefineFontTag(Tag): # in the texture map. self.heights = heights + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'fontname': self.fontname, + 'xml_prefix': self.xml_prefix, + 'heights': self.heights, + } + class AP2DoActionTag(Tag): def __init__(self, bytecode: ByteCode) -> None: @@ -278,6 +144,12 @@ class AP2DoActionTag(Tag): # this tag is placed/executed. self.bytecode = bytecode + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'bytecode': self.bytecode.as_dict(*args, **kwargs), + } + class AP2PlaceObjectTag(Tag): def __init__( @@ -330,6 +202,22 @@ class AP2PlaceObjectTag(Tag): # fires. self.triggers = triggers + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'object_id': self.object_id, + 'depth': self.depth, + 'source_tag_id': self.source_tag_id, + 'name': self.name, + 'blend': self.blend, + 'update': self.update, + 'transform': self.transform.as_dict(*args, **kwargs) if self.transform is not None else None, + 'rotation_offset': self.rotation_offset.as_dict(*args, **kwargs) if self.rotation_offset is not None else None, + 'mult_color': self.mult_color.as_dict(*args, **kwargs) if self.mult_color is not None else None, + 'add_color': self.add_color.as_dict(*args, **kwargs) if self.add_color is not None else None, + 'triggers': {i: [b.as_dict(*args, **kwargs) for b in t] for (i, t) in self.triggers.items()} + } + def __repr__(self) -> str: return f"AP2PlaceObjectTag(object_id={self.object_id}, depth={self.depth})" @@ -345,6 +233,13 @@ class AP2RemoveObjectTag(Tag): # The depth (level) that we should remove objects from. self.depth = depth + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'object_id': self.object_id, + 'depth': self.depth, + } + class AP2DefineSpriteTag(Tag): def __init__(self, id: int, tags: List[Tag], frames: List[Frame]) -> None: @@ -357,8 +252,12 @@ class AP2DefineSpriteTag(Tag): # The list of frames this SWF occupies. self.frames = frames - def children(self) -> List["Tag"]: - return self.tags + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'tags': [t.as_dict(*args, **kwargs) for t in self.tags], + 'frames': [f.as_dict(*args, **kwargs) for f in self.frames], + } class AP2DefineEditTextTag(Tag): @@ -381,6 +280,16 @@ class AP2DefineEditTextTag(Tag): # The default text that should be present in the control when it is initially placed/executed. self.default_text = default_text + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'font_tag_id': self.font_tag_id, + 'font_height': self.font_height, + 'rect': self.rect.as_dict(*args, **kwargs), + 'color': self.color.as_dict(*args, **kwargs), + 'default_text': self.default_text, + } + class SWF(TrackedCoverage, VerboseOutput): def __init__( @@ -448,11 +357,19 @@ class SWF(TrackedCoverage, VerboseOutput): print(f"Uncovered string: {hex(offset)} - {string}", file=sys.stderr) - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'name': self.name, - 'data': "".join(_hex(x) for x in self.data), - 'descramble_info': "".join(_hex(x) for x in self.descramble_info), + 'exported_name': self.exported_name, + 'data_version': self.data_version, + 'container_version': self.container_version, + 'fps': self.fps, + 'color': self.color.as_dict(*args, **kwargs) if self.color is not None else None, + 'location': self.location.as_dict(*args, **kwargs), + 'exported_tags': self.exported_tags, + 'imported_tags': {i: self.imported_tags[i].as_dict(*args, **kwargs) for i in self.imported_tags}, + 'tags': [t.as_dict(*args, **kwargs) for t in self.tags], + 'frames': [f.as_dict(*args, **kwargs) for f in self.frames], } def __parse_bytecode(self, datachunk: bytes, string_offsets: List[int] = [], prefix: str = "") -> ByteCode: @@ -851,7 +768,7 @@ class SWF(TrackedCoverage, VerboseOutput): offset_ptr += 3 self.vprint(f"{prefix} {lineno}: Offset If True: {jump_if_true_offset}") - actions.append(IfAction(lineno, jump_if_true_offset)) + actions.append(IfAction(lineno, "IS TRUE", jump_if_true_offset)) elif opcode == AP2Action.IF2: if2_type, jump_if_true_offset = struct.unpack(">Bh", datachunk[(offset_ptr + 1):(offset_ptr + 4)]) jump_if_true_offset += (lineno + 4) @@ -864,7 +781,7 @@ class SWF(TrackedCoverage, VerboseOutput): 3: ">", 4: "<=", 5: ">=", - 6: "!", + 6: "IS FALSE", 7: "BITAND", 8: "BITNOTAND", 9: "STRICT ==", @@ -874,7 +791,7 @@ class SWF(TrackedCoverage, VerboseOutput): }[if2_type] self.vprint(f"{prefix} {lineno}: {action_name} {if2_typestr}, Offset If True: {jump_if_true_offset}") - actions.append(If2Action(lineno, if2_typestr, jump_if_true_offset)) + actions.append(IfAction(lineno, if2_typestr, jump_if_true_offset)) elif opcode == AP2Action.JUMP: jump_offset = struct.unpack(">h", datachunk[(offset_ptr + 1):(offset_ptr + 3)])[0] jump_offset += (lineno + 3) @@ -937,7 +854,7 @@ class SWF(TrackedCoverage, VerboseOutput): else: raise Exception(f"Can't advance, no handler for opcode {opcode} ({hex(opcode)})!") - return ByteCode(actions) + return ByteCode(actions, offset_ptr) def __parse_tag(self, ap2_version: int, afp_version: int, ap2data: bytes, tagid: int, size: int, dataoffset: int, prefix: str = "") -> Tag: if tagid == AP2Tag.AP2_SHAPE: diff --git a/bemani/format/afp/types/__init__.py b/bemani/format/afp/types/__init__.py index bbf0977..7c428fc 100644 --- a/bemani/format/afp/types/__init__.py +++ b/bemani/format/afp/types/__init__.py @@ -1,5 +1,31 @@ from .generic import Matrix, Color, Point, Rectangle -from .ap2 import AP2Tag, AP2Action, AP2Object, AP2Pointer, AP2Property +from .ap2 import ( + AP2Tag, + AP2Action, + AP2Object, + AP2Pointer, + AP2Property, + DefineFunction2Action, + NULL, + UNDEFINED, + THIS, + ROOT, + PARENT, + CLIP, + GLOBAL, + Register, + PushAction, + InitRegisterAction, + StoreRegisterAction, + IfAction, + JumpAction, + WithAction, + GotoFrame2Action, + AddNumVariableAction, + AddNumRegisterAction, + GetURL2Action, + StartDragAction, +) __all__ = [ @@ -12,4 +38,24 @@ __all__ = [ 'AP2Object', 'AP2Pointer', 'AP2Property', + 'DefineFunction2Action', + 'NULL', + 'UNDEFINED', + 'THIS', + 'ROOT', + 'PARENT', + 'CLIP', + 'GLOBAL', + 'Register', + 'PushAction', + 'InitRegisterAction', + 'StoreRegisterAction', + 'IfAction', + 'JumpAction', + 'WithAction', + 'GotoFrame2Action', + 'AddNumVariableAction', + 'AddNumRegisterAction', + 'GetURL2Action', + 'StartDragAction', ] diff --git a/bemani/format/afp/types/ap2.py b/bemani/format/afp/types/ap2.py index 0e6f6df..f3a4093 100644 --- a/bemani/format/afp/types/ap2.py +++ b/bemani/format/afp/types/ap2.py @@ -1,4 +1,9 @@ -from typing import Dict, List, Set, Tuple +import os +from typing import TYPE_CHECKING, Any, Dict, List, Set, Tuple, Optional + +if TYPE_CHECKING: + # This is a circular dependency otherwise. + from ..decompile import ByteCode class AP2Tag: @@ -574,10 +579,263 @@ class AP2Action: self.offset = offset self.opcode = opcode + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + 'offset': self.offset, + 'action': AP2Action.action_to_name(self.opcode), + } + def __repr__(self) -> str: return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}" +class DefineFunction2Action(AP2Action): + def __init__(self, offset: int, name: Optional[str], flags: int, body: "ByteCode") -> None: + super().__init__(offset, AP2Action.DEFINE_FUNCTION2) + self.name = name + self.flags = flags + self.body = body + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'name': self.name, + 'flags': self.flags, + 'body': self.body.as_dict(*args, **kwargs), + } + + def __repr__(self) -> str: + bytecode = [f" {line}" for line in str(self.body).split(os.linesep)] + action_name = AP2Action.action_to_name(self.opcode) + return os.linesep.join([ + f"{self.offset}: {action_name}, Name: {self.name or ''}, Flags: {hex(self.flags)}", + *bytecode, + f"END_{action_name}", + ]) + + +# A bunch of stuff for implementing PushAction +class GenericObject: + def __init__(self, name: str) -> None: + self.__name = name + + def __repr__(self) -> str: + return self.__name + + +NULL = GenericObject('NULL') +UNDEFINED = GenericObject('UNDEFINED') +THIS = GenericObject('THIS') +ROOT = GenericObject('ROOT') +PARENT = GenericObject('PARENT') +CLIP = GenericObject('CLIP') +GLOBAL = GenericObject('GLOBAL') + + +class Register: + def __init__(self, no: int) -> None: + self.no = no + + def __repr__(self) -> str: + return f"Register {self.no}" + + +class PushAction(AP2Action): + def __init__(self, offset: int, objects: List[Any]) -> None: + super().__init__(offset, AP2Action.PUSH) + self.objects = objects + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + # TODO: We need to do better than this when exporting objects, + # we should preserve their type. + 'objects': [repr(o) for o in self.objects], + } + + def __repr__(self) -> str: + objects = [f" {repr(obj)}" for obj in self.objects] + action_name = AP2Action.action_to_name(self.opcode) + return os.linesep.join([ + f"{self.offset}: {action_name}", + *objects, + f"END_{action_name}", + ]) + + +class InitRegisterAction(AP2Action): + def __init__(self, offset: int, registers: List[Register]) -> None: + super().__init__(offset, AP2Action.INIT_REGISTER) + self.registers = registers + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'registers': [r.no for r in self.registers], + } + + def __repr__(self) -> str: + registers = [f" {reg}" for reg in self.registers] + action_name = AP2Action.action_to_name(self.opcode) + return os.linesep.join([ + f"{self.offset}: {action_name}", + *registers, + f"END_{action_name}", + ]) + + +class StoreRegisterAction(AP2Action): + def __init__(self, offset: int, registers: List[Register]) -> None: + super().__init__(offset, AP2Action.STORE_REGISTER) + self.registers = registers + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'registers': [r.no for r in self.registers], + } + + def __repr__(self) -> str: + registers = [f" {reg}" for reg in self.registers] + action_name = AP2Action.action_to_name(self.opcode) + return os.linesep.join([ + f"{self.offset}: {action_name}", + *registers, + f"END_{action_name}", + ]) + + +class IfAction(AP2Action): + def __init__(self, offset: int, comparison: str, jump_if_true_offset: int) -> None: + super().__init__(offset, AP2Action.IF2) + self.comparison = comparison + self.jump_if_true_offset = jump_if_true_offset + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'jump_if_true_offset': self.jump_if_true_offset, + } + + def __repr__(self) -> str: + return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Comparison: {self.comparison}, Offset To Jump To If True: {self.jump_if_true_offset}" + + +class JumpAction(AP2Action): + def __init__(self, offset: int, jump_offset: int) -> None: + super().__init__(offset, AP2Action.JUMP) + self.jump_offset = jump_offset + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'jump_offset': self.jump_offset, + } + + def __repr__(self) -> str: + return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Offset To Jump To: {self.jump_offset}" + + +class WithAction(AP2Action): + def __init__(self, offset: int, unknown: bytes) -> None: + super().__init__(offset, AP2Action.WITH) + self.unknown = unknown + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + # TODO: We need to do better than this, so I guess it comes down to having + # a better idea how WITH works. + 'unknown': str(self.unknown), + } + + def __repr__(self) -> str: + return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Unknown: {self.unknown!r}" + + +class GotoFrame2Action(AP2Action): + def __init__(self, offset: int, additional_frames: int, stop: bool) -> None: + super().__init__(offset, AP2Action.GOTO_FRAME2) + self.additional_frames = additional_frames + self.stop = stop + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'additiona_frames': self.additional_frames, + 'stop': self.stop, + } + + def __repr__(self) -> str: + return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Additional Frames: {self.additional_frames}, Stop On Arrival: {'yes' if self.stop else 'no'}" + + +class AddNumVariableAction(AP2Action): + def __init__(self, offset: int, amount_to_add: int) -> None: + super().__init__(offset, AP2Action.ADD_NUM_VARIABLE) + self.amount_to_add = amount_to_add + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'amount_to_add': self.amount_to_add, + } + + def __repr__(self) -> str: + return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Amount To Add: {self.amount_to_add}" + + +class AddNumRegisterAction(AP2Action): + def __init__(self, offset: int, register: Register, amount_to_add: int) -> None: + super().__init__(offset, AP2Action.ADD_NUM_REGISTER) + self.register = register + self.amount_to_add = amount_to_add + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'register': self.register.no, + 'amount_to_add': self.amount_to_add, + } + + def __repr__(self) -> str: + return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Register: {self.register}, Amount To Add: {self.amount_to_add}" + + +class GetURL2Action(AP2Action): + def __init__(self, offset: int, action: int) -> None: + super().__init__(offset, AP2Action.GET_URL2) + self.action = action + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'action': self.action, + } + + def __repr__(self) -> str: + return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Action: {self.action}" + + +class StartDragAction(AP2Action): + def __init__(self, offset: int, constrain: Optional[bool]) -> None: + super().__init__(offset, AP2Action.START_DRAG) + self.constrain = constrain + + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + **super().as_dict(*args, **kwargs), + 'constrain': self.constrain, + } + + def __repr__(self) -> str: + if self.constrain is None: + cstr = "check stack" + else: + cstr = "yes" if self.constrain else "no" + return f"{self.offset}: {AP2Action.action_to_name(self.opcode)}, Constrain Mouse: {cstr}" + + class AP2Object: UNDEFINED = 0x0 NAN = 0x1 diff --git a/bemani/format/afp/types/generic.py b/bemani/format/afp/types/generic.py index 5eafff3..1c73f64 100644 --- a/bemani/format/afp/types/generic.py +++ b/bemani/format/afp/types/generic.py @@ -8,7 +8,7 @@ class Color: self.b = b self.a = a - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'r': self.r, 'g': self.g, @@ -37,7 +37,7 @@ class Point: def identity() -> "Point": return Point(0.0, 0.0) - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'x': self.x, 'y': self.y, @@ -67,7 +67,7 @@ class Rectangle: self.bottom = bottom self.right = right - def as_dict(self) -> Dict[str, Any]: + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: return { 'left': self.left, 'top': self.top, @@ -104,6 +104,16 @@ class Matrix: def identity() -> "Matrix": return Matrix(1.0, 0.0, 0.0, 1.0, 0.0, 0.0) + def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: + return { + 'a': self.a, + 'b': self.b, + 'c': self.c, + 'd': self.d, + 'tx': self.tx, + 'ty': self.ty, + } + def multiply_point(self, point: Point) -> Point: return Point( x=(self.a * point.x) + (self.c * point.y) + self.tx, diff --git a/bemani/utils/afputils.py b/bemani/utils/afputils.py index 7185811..db88e99 100644 --- a/bemani/utils/afputils.py +++ b/bemani/utils/afputils.py @@ -101,6 +101,12 @@ def main() -> int: metavar="FILE", help="The file to print", ) + print_parser.add_argument( + "-d", + "--decompile-bytecode", + action="store_true", + help="Attempt to decompile and print bytecode instead of printing the raw representation.", + ) print_parser.add_argument( "-v", "--verbose", @@ -119,6 +125,12 @@ def main() -> int: metavar="BSIFILE", help="The BSI file to parse", ) + parseafp_parser.add_argument( + "-d", + "--decompile-bytecode", + action="store_true", + help="Attempt to decompile and print bytecode instead of printing the raw representation.", + ) parseafp_parser.add_argument( "-v", "--verbose", @@ -427,7 +439,7 @@ def main() -> int: afpfile = TXP2File(bfp.read(), verbose=args.verbose) # Now, print it - print(json.dumps(afpfile.as_dict(), sort_keys=True, indent=4)) + print(json.dumps(afpfile.as_dict(decompile_bytecode=args.decompile_bytecode), sort_keys=True, indent=4)) if args.action == "parseafp": # First, load the AFP and BSI files @@ -437,7 +449,7 @@ def main() -> int: # Now, print it swf.parse(verbose=args.verbose) - print(json.dumps(swf.as_dict(), sort_keys=True, indent=4)) + print(json.dumps(swf.as_dict(decompile_bytecode=args.decompile_bytecode), sort_keys=True, indent=4)) if args.action == "parsegeo": # First, load the AFP and BSI files