From 6e34d2647ea6f8079faa1bfaf08d5aa511f93e04 Mon Sep 17 00:00:00 2001 From: Jennifer Taylor Date: Sat, 24 Apr 2021 18:01:25 +0000 Subject: [PATCH] Implement loop break/continue/goto processing. --- bemani/format/afp/decompile.py | 280 ++++++++++++++++++++++++++++----- bemani/format/afp/types/ap2.py | 2 +- 2 files changed, 245 insertions(+), 37 deletions(-) diff --git a/bemani/format/afp/decompile.py b/bemani/format/afp/decompile.py index c3247d5..292611b 100644 --- a/bemani/format/afp/decompile.py +++ b/bemani/format/afp/decompile.py @@ -1,14 +1,14 @@ import os -from typing import Any, Dict, List, Tuple, Set, Union, Optional, cast +from typing import Any, Dict, List, Sequence, Tuple, Set, Union, Optional, cast -from .types import AP2Action, JumpAction, IfAction +from .types import AP2Action, JumpAction, IfAction, DefineFunction2Action from .util import VerboseOutput class ByteCode: # A list of bytecodes to execute. - def __init__(self, actions: List[AP2Action], end_offset: int) -> None: - self.actions = actions + def __init__(self, actions: Sequence[AP2Action], end_offset: int) -> None: + self.actions = list(actions) self.end_offset = end_offset def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]: @@ -72,23 +72,82 @@ class ControlFlow: return f"ControlFlow(beginning={self.beginning}, end={self.end}, next={(', '.join(str(n) for n in self.next_flow)) or 'N/A'}" +class ConvertedAction: + # An action that has been analyzed and converted to an intermediate representation. + pass + + +ArbitraryOpcode = Union[AP2Action, ConvertedAction] + + +class BreakStatement(ConvertedAction): + # A break from a loop (forces execution to the next line after the loop). + def __repr__(self) -> str: + return "break;" + + +class ContinueStatement(ConvertedAction): + # A continue in a loop (forces execution to the top of the loop). + def __repr__(self) -> str: + return "continue;" + + +class GotoStatement(ConvertedAction): + # A goto, including the ID of the chunk we want to jump to. + def __init__(self, location: int) -> None: + self.location = location + + def __repr__(self) -> str: + return f"goto label_{self.location};" + + +class IntermediateIfStatement(ConvertedAction): + def __init__(self, parent_action: IfAction, true_actions: Sequence[ArbitraryOpcode], false_actions: Sequence[ArbitraryOpcode], negate: bool) -> None: + self.parent_action = parent_action + self.true_actions = list(true_actions) + self.false_actions = list(false_actions) + self.negate = negate + + def __repr__(self) -> str: + true_entries: List[str] = [] + for action in self.true_actions: + true_entries.extend([f" {s}" for s in str(action).split(os.linesep)]) + + false_entries: List[str] = [] + for action in self.false_actions: + false_entries.extend([f" {s}" for s in str(action).split(os.linesep)]) + + if self.false_actions: + return os.linesep.join([ + f"if <{'!' if self.negate else ''}{self.parent_action}> {{", + os.linesep.join(true_entries), + "} else {", + os.linesep.join(false_entries), + "}" + ]) + else: + return os.linesep.join([ + f"if <{'!' if self.negate else ''}{self.parent_action}> {{", + os.linesep.join(true_entries), + "}" + ]) + + class ByteCodeChunk: - def __init__(self, id: int, actions: List[AP2Action], next_chunks: List[int], previous_chunks: List[int] = []) -> None: + def __init__(self, id: int, actions: Sequence[ArbitraryOpcode], next_chunks: List[int], previous_chunks: List[int] = []) -> None: self.id = id - self.actions = actions + self.actions = list(actions) self.next_chunks = next_chunks self.previous_chunks = previous_chunks or [] - @property - def offset(self) -> Optional[int]: - if self.actions: - return self.actions[0].offset - return None - def __repr__(self) -> str: entries: List[str] = [] for action in self.actions: - entries.extend([f" {s}" for s in str(action).split(os.linesep)]) + if isinstance(action, DefineFunction2Action): + # Special case, since we will decompile this later, we don't want to print it now. + entries.append(f" {action.offset}: {AP2Action.action_to_name(action.opcode)}, Name: {action.name or ''}, Flags: {hex(action.flags)}") + else: + entries.extend([f" {s}" for s in str(action).split(os.linesep)]) return ( f"ByteCodeChunk({os.linesep}" + @@ -104,7 +163,7 @@ ArbitraryCodeChunk = Union[ByteCodeChunk, "Loop"] class Loop: - def __init__(self, id: int, chunks: List[ArbitraryCodeChunk]) -> None: + def __init__(self, id: int, chunks: Sequence[ArbitraryCodeChunk]) -> None: # The ID is usually the chunk that other chunks point into. self.id = id @@ -113,7 +172,7 @@ class Loop: self.previous_chunks: List[int] = [] self.next_chunks: List[int] = [] - self.chunks = chunks + self.chunks = list(chunks) for chunk in chunks: for nextid in chunk.next_chunks: @@ -123,14 +182,6 @@ class Loop: if previd not in ided_chunks: self.previous_chunks.append(previd) - @property - def offset(self) -> Optional[int]: - for chunk in self.chunks: - if chunk.id == self.id: - return chunk.offset - # We're guaranteed to have a haeder (the ID), so this is a problem. - raise Exception("Logic error!") - def __repr__(self) -> str: entries: List[str] = [] for chunk in self.chunks: @@ -204,7 +255,7 @@ class ByteCodeDecompiler(VerboseOutput): self.bytecode = bytecode - def __graph_control_flow(self) -> List[ByteCodeChunk]: + def __graph_control_flow(self) -> Tuple[List[ByteCodeChunk], Dict[int, int]]: # Start by assuming that the whole bytecode never directs flow. This is, confusingly, # indexed by AP2Action offset, not by actual bytecode offset, so we can avoid the # prickly problem of opcodes that take more than one byte in the data. @@ -305,7 +356,7 @@ class ByteCodeDecompiler(VerboseOutput): flows[current_action_flow].next_flow = [dest_action_flow] self.vprint(f"{action} action repointed {flows[current_action_flow]} to new chunk") - elif action.opcode in [AP2Action.IF, AP2Action.IF2]: + elif action.opcode == AP2Action.IF: # Conditional control flow redirection after this, we should split the # section if necessary and point this section at the new offset as well # as the second half of the split section. @@ -356,6 +407,9 @@ class ByteCodeDecompiler(VerboseOutput): flows[current_action_flow].next_flow = [next_action, dest_action_flow] self.vprint(f"{action} action repointed {flows[current_action_flow]} to new chunk") + elif action.opcode == AP2Action.IF2: + # We don't emit this anymore, so this is a problem. + raise Exception("Logic error!") # Finally, return chunks of contiguous execution. chunks: List[ByteCodeChunk] = [] @@ -383,12 +437,16 @@ class ByteCodeDecompiler(VerboseOutput): entries: Dict[int, List[int]] = {} offset_to_id: Dict[int, int] = {} for chunk in chunks: - offset_to_id[chunk.offset] = chunk.id + # We haven't emitted any non-AP2Actions yet, so we are safe in casting here. + chunk_offset = cast(AP2Action, chunk.actions[0]).offset + offset_to_id[chunk_offset] = chunk.id for next_chunk in chunk.next_chunks: - entries[next_chunk] = entries.get(next_chunk, []) + [chunk.offset] + entries[next_chunk] = entries.get(next_chunk, []) + [chunk_offset] for chunk in chunks: - chunk.previous_chunks = entries.get(chunk.offset, []) + # We haven't emitted any non-AP2Actions yet, so we are safe in casting here. + chunk_offset = cast(AP2Action, chunk.actions[0]).offset + chunk.previous_chunks = entries.get(chunk_offset, []) # Now, convert the offsets to chunk ID pointers. end_previous_chunks: List[int] = [] @@ -402,10 +460,13 @@ class ByteCodeDecompiler(VerboseOutput): end_previous_chunks.append(chunk.id) chunk.previous_chunks = [offset_to_id[c] for c in chunk.previous_chunks] + # Add the "return" chunk now that we've converted everything. chunks.append(ByteCodeChunk(chunkid, [], [], previous_chunks=end_previous_chunks)) - return sorted(chunks, key=lambda c: c.id) + offset_to_id[self.bytecode.end_offset] = chunkid - def __get_entry_block(self, chunks: List[ByteCodeChunk]) -> int: + return (sorted(chunks, key=lambda c: c.id), offset_to_id) + + def __get_entry_block(self, chunks: Sequence[ByteCodeChunk]) -> int: start_id: int = -1 for chunk in chunks: if not chunk.previous_chunks: @@ -420,7 +481,7 @@ class ByteCodeDecompiler(VerboseOutput): raise Exception("Logic error!") return start_id - def __compute_dominators(self, chunks: List[ByteCodeChunk]) -> Dict[int, Set[int]]: + def __compute_dominators(self, chunks: Sequence[ByteCodeChunk]) -> Dict[int, Set[int]]: # Find the start of the graph (the node with no previous entries). start_id = self.__get_entry_block(chunks) @@ -445,7 +506,136 @@ class ByteCodeDecompiler(VerboseOutput): return {chunk.id: dominators[chunk.id].bitsSet for chunk in chunks} - def __separate_loops(self, chunks: List[ByteCodeChunk], dominators: Dict[int, Set[int]]) -> List[Union[ByteCodeChunk, Loop]]: + def __analyze_loop_jumps(self, loop: Loop, offset_map: Dict[int, int]) -> Loop: + # Go through and try to determine which jumps are "break" and "continue" statements based on + # where they point (to the header or to the exit point). First, let's try to identify all + # exits, and which one is the break point and which ones are possibly goto statements + # (break out of multiple loop depths). + internal_jump_points = {c.id for c in loop.chunks} + + header_chunks = [c for c in loop.chunks if c.id == loop.id] + if len(header_chunks) != 1: + # Should never happen, only one should match ID. + raise Exception("Logic error!") + header_chunk = header_chunks[0] + + # Identify external jumps from the header. + break_points = [i for i in header_chunk.next_chunks if i not in internal_jump_points] + if len(break_points) > 1: + # We should not have two exits here, if so this isn't a loop! + raise Exception("Logic error!") + + # Identify the break and continue jump points. + if not break_points: + # This might be possible, but I don't know how to deal with it. + raise Exception("Logic error!") + break_point = break_points[0] + continue_point = header_chunk.id + + self.vprint(f"Loop breaks to {break_point} and continues to {continue_point}") + + # Now, go through each chunk, identify whether it has an if, and fix up the + # if statements. + for chunk in loop.chunks: + if not chunk.next_chunks: + # All chunks need a next chunk of some type, the only one that doesn't + # is the end chunk which should never be part of a loop. + raise Exception("Logic error!") + if not isinstance(chunk, ByteCodeChunk): + # We don't need to fix up loops, we already did this in a previous + # fixup. + continue + + last_action = chunk.actions[-1] + if isinstance(last_action, AP2Action): + if last_action.opcode in [AP2Action.THROW, AP2Action.RETURN]: + # Ignore these for now, we'll fix these up in a later stage. + continue + + if last_action.opcode == AP2Action.JUMP: + # This is either an unconditional break/continue or an + # internal jump. + if len(chunk.next_chunks) != 1: + raise Exception("Logic error!") + next_chunk = chunk.next_chunks[0] + + if next_chunk == break_point: + self.vprint("Converting jump to loop break into break statement.") + chunk.actions[-1] = BreakStatement() + chunk.next_chunks = [] + elif next_chunk == continue_point: + self.vprint("Converting jump to loop continue into continue statement.") + chunk.actions[-1] = ContinueStatement() + chunk.next_chunks = [] + elif next_chunk not in internal_jump_points: + self.vprint("Converting jump to external point into goto statement.") + chunk.actions[-1] = GotoStatement(next_chunk) + chunk.next_chunks = [] + continue + + if last_action.opcode == AP2Action.IF: + # Calculate true and false jump points. + true_jump_point = offset_map[cast(IfAction, last_action).jump_if_true_offset] + false_jump_points = [n for n in chunk.next_chunks if n != true_jump_point] + if len(false_jump_points) != 1: + raise Exception("Logic error!") + false_jump_point = false_jump_points[0] + + # Calculate true and false jump points, see if they are break/continue/goto. + true_action: Optional[ConvertedAction] = None + if true_jump_point == break_point: + self.vprint("Converting jump if true to loop break into break statement.") + true_action = BreakStatement() + chunk.next_chunks = [n for n in chunk.next_chunks if n != true_jump_point] + elif true_jump_point == continue_point: + self.vprint("Converting jump if true to loop continue into continue statement.") + true_action = ContinueStatement() + chunk.next_chunks = [n for n in chunk.next_chunks if n != true_jump_point] + elif true_jump_point not in internal_jump_points: + self.vprint("Converting jump if true to external point into goto statement.") + true_action = GotoStatement(true_jump_point) + chunk.next_chunks = [n for n in chunk.next_chunks if n != true_jump_point] + + false_action: Optional[ConvertedAction] = None + if false_jump_point == break_point: + self.vprint("Converting jump if false to loop break into break statement.") + false_action = BreakStatement() + chunk.next_chunks = [n for n in chunk.next_chunks if n != false_jump_point] + elif false_jump_point == continue_point: + self.vprint("Converting jump if false to loop continue into continue statement.") + false_action = ContinueStatement() + chunk.next_chunks = [n for n in chunk.next_chunks if n != false_jump_point] + elif false_jump_point not in internal_jump_points: + self.vprint("Converting jump if false to external point into goto statement.") + false_action = GotoStatement(false_jump_point) + chunk.next_chunks = [n for n in chunk.next_chunks if n != false_jump_point] + + if true_action is None and false_action is not None: + true_action = false_action + false_action = None + negate = True + else: + negate = False + + if true_action is None and false_action is None: + # This is an internal-only if statement, we don't care. + continue + + chunk.actions[-1] = IntermediateIfStatement( + cast(IfAction, last_action), + [true_action], + [false_action] if false_action else [], + negate=negate, + ) + continue + + # Now, we have converted all external jumps to either break or goto, so we don't + # need to keep track of the next chunk aside from the break location. + loop.next_chunks = [break_point] + + return loop + + def __separate_loops(self, chunks: Sequence[ByteCodeChunk], dominators: Dict[int, Set[int]], offset_map: Dict[int, int]) -> List[Union[ByteCodeChunk, Loop]]: # Find the start of the graph (the node with no previous entries). start_id = self.__get_entry_block(chunks) chunks_by_id: Dict[int, Union[ByteCodeChunk, Loop]] = {chunk.id: chunk for chunk in chunks} @@ -505,7 +695,12 @@ class ByteCodeDecompiler(VerboseOutput): # This loop does not contain any loops of its own. It is safe to # convert. self.vprint(f"Converting loop with header {header} and blocks {', '.join(str(b) for b in blocks)}.") - chunks_by_id[header] = Loop(header, [chunks_by_id[i] for i in blocks]) + new_loop = Loop(header, [chunks_by_id[i] for i in blocks]) + + # Eliminate jumps that are to the beginning/end of the loop to + # make if statement detection later on easier. + new_loop = self.__analyze_loop_jumps(new_loop, offset_map) + chunks_by_id[header] = new_loop # These blocks are now part of the loop, so we need to remove them # from the IDed chunks as well as from existing loops. @@ -536,17 +731,30 @@ class ByteCodeDecompiler(VerboseOutput): return [chunks_by_id[i] for i in chunks_by_id] + def __separate_ifs(self, chunks: Sequence[Union[ByteCodeChunk, Loop]], offset_map: Dict[int, int]) -> List[ArbitraryCodeChunk]: + return [c for c in chunks] + def __decompile(self) -> str: # First, we need to construct a control flow graph. - chunks = self.__graph_control_flow() + self.vprint("Generating control flow graph...") + chunks, offset_map = self.__graph_control_flow() # Now, compute dominators so we can locate back-refs. + self.vprint("Generating dominator list...") dominators = self.__compute_dominators(chunks) # Now, separate chunks out into chunks and loops. - chunks_and_loops = self.__separate_loops(chunks, dominators) + self.vprint("Identifying and separating loops...") + chunks_and_loops = self.__separate_loops(chunks, dominators, offset_map) - self.vprint(chunks_and_loops) + # Now, identify any remaining control flow logic. + self.vprint("Identifying and separating ifs...") + chunks_loops_and_ifs = self.__separate_ifs(chunks_and_loops, offset_map) + + # At this point, we *should* have a directed graph where there are no + # backwards refs and every fork has been identified as an if. This means + # we can now walk and recursively generate pseudocode in one pass. + self.vprint(chunks_loops_and_ifs) return "TODO" diff --git a/bemani/format/afp/types/ap2.py b/bemani/format/afp/types/ap2.py index f3a4093..89fcc22 100644 --- a/bemani/format/afp/types/ap2.py +++ b/bemani/format/afp/types/ap2.py @@ -707,7 +707,7 @@ class StoreRegisterAction(AP2Action): class IfAction(AP2Action): def __init__(self, offset: int, comparison: str, jump_if_true_offset: int) -> None: - super().__init__(offset, AP2Action.IF2) + super().__init__(offset, AP2Action.IF) self.comparison = comparison self.jump_if_true_offset = jump_if_true_offset