bemaniutils/bemani/format/afp/decompile.py

556 lines
24 KiB
Python

import os
from typing import Any, Dict, List, Tuple, Set, Union, Optional, cast
from .types import AP2Action, JumpAction, IfAction
from .util import VerboseOutput
class ByteCode:
# A list of bytecodes to execute.
def __init__(self, actions: List[AP2Action], end_offset: int) -> None:
self.actions = actions
self.end_offset = end_offset
def as_dict(self, *args: Any, **kwargs: Any) -> Dict[str, Any]:
if kwargs.get('decompile_bytecode', False):
decompiler = ByteCodeDecompiler(self)
code = decompiler.decompile(verbose=True)
return {
'code': code,
}
else:
return {
'actions': [a.as_dict(*args, **kwargs) for a in self.actions],
'end_offset': self.end_offset,
}
def __repr__(self) -> str:
entries: List[str] = []
for action in self.actions:
entries.extend([f" {s}" for s in str(action).split(os.linesep)])
return f"ByteCode({os.linesep}{os.linesep.join(entries)}{os.linesep} {self.end_offset}: END{os.linesep})"
class ControlFlow:
def __init__(self, beginning: int, end: int, next_flow: List[int]) -> None:
self.beginning = beginning
self.end = end
self.next_flow = next_flow
def contains(self, offset: int) -> bool:
return (self.beginning <= offset) and (offset < self.end)
def is_first(self, offset: int) -> bool:
return self.beginning == offset
def is_last(self, offset: int) -> bool:
return self.end == (offset + 1)
def split(self, offset: int, link: bool = False) -> Tuple["ControlFlow", "ControlFlow"]:
if not self.contains(offset):
raise Exception(f"This ControlFlow does not contain offset {offset}")
# First, make the second half that the first half will point to.
second = ControlFlow(
offset,
self.end,
self.next_flow,
)
# Now, make the first half that we can point to.
first = ControlFlow(
self.beginning,
offset,
[second.beginning] if link else [],
)
return (first, second)
def __repr__(self) -> str:
return f"ControlFlow(beginning={self.beginning}, end={self.end}, next={(', '.join(str(n) for n in self.next_flow)) or 'N/A'}"
class ByteCodeChunk:
def __init__(self, id: int, actions: List[AP2Action], next_chunks: List[int], previous_chunks: List[int] = []) -> None:
self.id = id
self.actions = actions
self.next_chunks = next_chunks
self.previous_chunks = previous_chunks or []
@property
def offset(self) -> Optional[int]:
if self.actions:
return self.actions[0].offset
return None
def __repr__(self) -> str:
entries: List[str] = []
for action in self.actions:
entries.extend([f" {s}" for s in str(action).split(os.linesep)])
return (
f"ByteCodeChunk({os.linesep}" +
f" ID: {self.id}{os.linesep}" +
(f" Previous Chunks: {', '.join(str(n) for n in self.previous_chunks)}{os.linesep}" if self.previous_chunks else f" Start Chunk{os.linesep}") +
f"{os.linesep.join(entries)}{os.linesep}" +
(f" Next Chunks: {', '.join(str(n) for n in self.next_chunks)}{os.linesep}" if self.next_chunks else f" End Chunk{os.linesep}") +
")"
)
ArbitraryCodeChunk = Union[ByteCodeChunk, "Loop"]
class Loop:
def __init__(self, id: int, chunks: List[ArbitraryCodeChunk]) -> None:
# The ID is usually the chunk that other chunks point into.
self.id = id
# Calculate predecessors (who points into it) and successors (who we point out of).
ided_chunks: Dict[int, ArbitraryCodeChunk] = {chunk.id: chunk for chunk in chunks}
self.previous_chunks: List[int] = []
self.next_chunks: List[int] = []
self.chunks = chunks
for chunk in chunks:
for nextid in chunk.next_chunks:
if nextid not in ided_chunks:
self.next_chunks.append(nextid)
for previd in chunk.previous_chunks:
if previd not in ided_chunks:
self.previous_chunks.append(previd)
@property
def offset(self) -> Optional[int]:
for chunk in self.chunks:
if chunk.id == self.id:
return chunk.offset
# We're guaranteed to have a haeder (the ID), so this is a problem.
raise Exception("Logic error!")
def __repr__(self) -> str:
entries: List[str] = []
for chunk in self.chunks:
entries.extend([f" {s}" for s in str(chunk).split(os.linesep)])
return (
f"Loop({os.linesep}" +
f" ID: {self.id}{os.linesep}" +
(f" Previous Chunks: {', '.join(str(n) for n in self.previous_chunks)}{os.linesep}" if self.previous_chunks else f" Start Chunk{os.linesep}") +
f"{os.linesep.join(entries)}{os.linesep}" +
(f" Next Chunks: {', '.join(str(n) for n in self.next_chunks)}{os.linesep}" if self.next_chunks else f" End Chunk{os.linesep}") +
")"
)
class BitVector:
def __init__(self, length: int, init: bool = False) -> None:
self.__bits: Dict[int, bool] = {i: init for i in range(length)}
def clone(self) -> "BitVector":
new = BitVector(len(self.__bits))
new.__bits = {i: self.__bits[i] for i in self.__bits}
return new
def setAllBitsTo(self, val: bool) -> "BitVector":
self.__bits = {i: val for i in self.__bits}
return self
def setBit(self, bit: int) -> "BitVector":
self.__bits[bit] = True
return self
def clearBit(self, bit: int) -> "BitVector":
self.__bits[bit] = False
return self
def orVector(self, other: "BitVector") -> "BitVector":
if len(self.__bits) != len(other.__bits):
raise Exception("Cannot or different-sized bitvectors!")
self.__bits = {i: (self.__bits[i] or other.__bits[i]) for i in self.__bits}
return self
def andVector(self, other: "BitVector") -> "BitVector":
if len(self.__bits) != len(other.__bits):
raise Exception("Cannot and different-sized bitvectors!")
self.__bits = {i: (self.__bits[i] and other.__bits[i]) for i in self.__bits}
return self
def __eq__(self, other: object) -> bool:
if not isinstance(other, BitVector):
return NotImplemented
if len(self.__bits) != len(other.__bits):
raise Exception("Cannot compare different-sized bitvectors!")
for i in self.__bits:
if self.__bits[i] != other.__bits[i]:
return False
return True
def __ne__(self, other: object) -> bool:
return not self.__eq__(other)
@property
def bitsSet(self) -> Set[int]:
return {i for i in self.__bits if self.__bits[i]}
class ByteCodeDecompiler(VerboseOutput):
def __init__(self, bytecode: ByteCode) -> None:
super().__init__()
self.bytecode = bytecode
def __graph_control_flow(self) -> List[ByteCodeChunk]:
# Start by assuming that the whole bytecode never directs flow. This is, confusingly,
# indexed by AP2Action offset, not by actual bytecode offset, so we can avoid the
# prickly problem of opcodes that take more than one byte in the data.
flows: Dict[int, ControlFlow] = {}
end = len(self.bytecode.actions)
beginning = 0
# The end of the program.
flows[end] = ControlFlow(end, end + 1, [])
# The rest of the program.
flows[beginning] = ControlFlow(beginning, end, [end])
# Function that helps us find a flow by position.
def find(opcodeno: int) -> int:
for start, cf in flows.items():
if cf.contains(opcodeno):
return start
raise Exception(f"Offset {opcodeno} somehow not in our control flow graph!")
# Now, walk the entire bytecode, and every control flow point split the graph at that point.
for i, action in enumerate(self.bytecode.actions):
current_action = i
next_action = i + 1
if action.opcode in [AP2Action.THROW, AP2Action.RETURN]:
# This should end execution, so we should cap off the current execution
# and send it to the end.
current_action_flow = find(current_action)
next_action_flow = find(next_action)
if current_action_flow == next_action_flow:
# We need to split this on the next_action boundary.
first, second = flows[current_action_flow].split(next_action)
first.next_flow = [end]
self.vprint(f"{action} action split {flows[current_action_flow]} into {first}, {second}")
flows[current_action_flow] = first
flows[next_action] = second
else:
# This already was split in two, presumably by something
# earlier in the chain jumping to the opcode after this.
# We need to unlink the current flow from the second and
# link it to the end.
flows[current_action_flow].next_flow = [end]
self.vprint(f"{action} action repointed {flows[current_action_flow]} to end")
elif action.opcode == AP2Action.JUMP:
# Unconditional control flow redirection after this, we should split the
# section if necessary and point this section at the new offset.
# First, we need to find the jump point and make sure that its the start
# of a section.
action = cast(JumpAction, action)
for j, dest in enumerate(self.bytecode.actions):
if dest.offset == action.jump_offset:
dest_action = j
break
else:
raise Exception(f"{action} jumps to an opcode that doesn't exist!")
# If the destination action flow already starts with the jump offset,
# then we're good, we just need to point our current split at this new
# offset. If it doesn't start with the jump offset, then we need to split
# that flow so we can point to the opcode directly.
dest_action_flow = find(dest_action)
if not flows[dest_action_flow].is_first(dest_action):
first, second = flows[dest_action_flow].split(dest_action, link=True)
self.vprint(f"{action} action required split of {flows[dest_action_flow]} into {first, second}")
flows[dest_action_flow] = first
flows[dest_action] = second
# Now, the second is what we want to point at in the next section.
dest_action_flow = dest_action
# Now, we must split the current flow at the point of this jump.
current_action_flow = find(current_action)
next_action_flow = find(next_action)
if current_action_flow == next_action_flow:
# We need to split this on the next_action boundary.
first, second = flows[current_action_flow].split(next_action)
first.next_flow = [dest_action_flow]
self.vprint(f"{action} action split {flows[current_action_flow]} into {first}, {second}")
flows[current_action_flow] = first
flows[next_action] = second
else:
# This already was split in two, presumably by something
# earlier in the chain jumping to the opcode after this.
# We need to unlink the current flow from the second and
# link it to the end.
flows[current_action_flow].next_flow = [dest_action_flow]
self.vprint(f"{action} action repointed {flows[current_action_flow]} to new chunk")
elif action.opcode in [AP2Action.IF, AP2Action.IF2]:
# Conditional control flow redirection after this, we should split the
# section if necessary and point this section at the new offset as well
# as the second half of the split section.
# First, we need to find the jump point and make sure that its the start
# of a section.
action = cast(IfAction, action)
for j, dest in enumerate(self.bytecode.actions):
if dest.offset == action.jump_if_true_offset:
dest_action = j
break
else:
raise Exception(f"{action} conditional jumps to an opcode that doesn't exist!")
# If the destination action flow already starts with the jump offset,
# then we're good, we just need to point our current split at this new
# offset. If it doesn't start with the jump offset, then we need to split
# that flow so we can point to the opcode directly.
dest_action_flow = find(dest_action)
if not flows[dest_action_flow].is_first(dest_action):
first, second = flows[dest_action_flow].split(dest_action, link=True)
self.vprint(f"{action} action required split of {flows[dest_action_flow]} into {first, second}")
flows[dest_action_flow] = first
flows[dest_action] = second
# Now, the second is what we want to point at in the next section.
dest_action_flow = dest_action
# Now, we must split the current flow at the point of this jump.
current_action_flow = find(current_action)
next_action_flow = find(next_action)
if current_action_flow == next_action_flow:
# We need to split this on the next_action boundary.
first, second = flows[current_action_flow].split(next_action)
first.next_flow = [next_action, dest_action_flow]
self.vprint(f"{action} action split {flows[current_action_flow]} into {first}, {second}")
flows[current_action_flow] = first
flows[next_action] = second
else:
# This already was split in two, presumably by something
# earlier in the chain jumping to the opcode after this.
# We need to unlink the current flow from the second and
# link it to the end.
flows[current_action_flow].next_flow = [next_action, dest_action_flow]
self.vprint(f"{action} action repointed {flows[current_action_flow]} to new chunk")
# Finally, return chunks of contiguous execution.
chunks: List[ByteCodeChunk] = []
chunkid: int = 0
for start, flow in flows.items():
if start == end:
# We don't want to render out the end of the graph, it was only there to make
# the above algorithm easier.
continue
if len(flow.next_flow) == 1 and flow.next_flow[0] == end:
# This flow is a termination state.
chunks.append(ByteCodeChunk(chunkid, self.bytecode.actions[flow.beginning:flow.end], []))
chunkid += 1
else:
next_chunks: List[int] = []
for ano in flow.next_flow:
if ano == end:
raise Exception("Logic error!")
next_chunks.append(self.bytecode.actions[ano].offset)
chunks.append(ByteCodeChunk(chunkid, self.bytecode.actions[flow.beginning:flow.end], next_chunks))
chunkid += 1
# Calculate who points to us as well, for posterity.
entries: Dict[int, List[int]] = {}
offset_to_id: Dict[int, int] = {}
for chunk in chunks:
offset_to_id[chunk.offset] = chunk.id
for next_chunk in chunk.next_chunks:
entries[next_chunk] = entries.get(next_chunk, []) + [chunk.offset]
for chunk in chunks:
chunk.previous_chunks = entries.get(chunk.offset, [])
# Now, convert the offsets to chunk ID pointers.
end_previous_chunks: List[int] = []
for chunk in chunks:
if chunk.next_chunks:
# Normal chunk.
chunk.next_chunks = [offset_to_id[c] for c in chunk.next_chunks]
else:
# Point this chunk at the end of bytecode sentinel.
chunk.next_chunks = [chunkid]
end_previous_chunks.append(chunk.id)
chunk.previous_chunks = [offset_to_id[c] for c in chunk.previous_chunks]
chunks.append(ByteCodeChunk(chunkid, [], [], previous_chunks=end_previous_chunks))
return sorted(chunks, key=lambda c: c.id)
def __get_entry_block(self, chunks: List[ByteCodeChunk]) -> int:
start_id: int = -1
for chunk in chunks:
if not chunk.previous_chunks:
if start_id != -1:
# This should never happen, we have one entrypoint. If we run into
# this we might need to do dead code analysis and discarding.
raise Exception("Logic error!")
start_id = chunk.id
if start_id == -1:
# We should never get to this as we always have at least one entrypoint.
raise Exception("Logic error!")
return start_id
def __compute_dominators(self, chunks: List[ByteCodeChunk]) -> Dict[int, Set[int]]:
# Find the start of the graph (the node with no previous entries).
start_id = self.__get_entry_block(chunks)
# Compute dominators recursively
chunklen = len(chunks)
dominators: Dict[int, BitVector] = {chunk.id: BitVector(chunklen, init=True) for chunk in chunks}
dominators[start_id].setAllBitsTo(False).setBit(start_id)
changed = True
while changed:
changed = False
for chunk in chunks:
if chunk.id == start_id:
continue
for previd in chunk.previous_chunks:
comparison = dominators[chunk.id].clone()
dominators[chunk.id].andVector(dominators[previd]).setBit(chunk.id)
if dominators[chunk.id] != comparison:
changed = True
return {chunk.id: dominators[chunk.id].bitsSet for chunk in chunks}
def __separate_loops(self, chunks: List[ByteCodeChunk], dominators: Dict[int, Set[int]]) -> List[Union[ByteCodeChunk, Loop]]:
# Find the start of the graph (the node with no previous entries).
start_id = self.__get_entry_block(chunks)
chunks_by_id: Dict[int, Union[ByteCodeChunk, Loop]] = {chunk.id: chunk for chunk in chunks}
# Go through and gather up all loops in the chunks.
loops: Dict[int, Set[int]] = {}
for chunk in chunks:
if chunk.id == start_id:
continue
for nextid in chunk.next_chunks:
# If this next chunk dominates us, then that means we found a loop.
if nextid in dominators[chunk.id]:
# Calculate the blocks that are in this loop.
header = nextid
tail = chunk.id
blocks = {header}
# If we don't already have a loop of one block,
# we need to walk backwards to find all blocks in this
# loop.
if header != tail:
blocks.add(tail)
blocks_to_examine = [tail]
while blocks_to_examine:
block = blocks_to_examine.pop()
for predecessor in chunks_by_id[block].previous_chunks:
if predecessor not in blocks:
blocks.add(predecessor)
blocks_to_examine.append(predecessor)
self.vprint(f"Found loop with header {header} and blocks {', '.join(str(b) for b in blocks)}.")
# We found a loop!
if header in loops:
raise Exception("Logic error!")
loops[header] = blocks
# Now, we need to reduce our list of chunks down to non-loops only. We do this
# by recursively trying to find inner loops until we find a loop that has no
# inner loops, and converting that. Once we do that, we remove the chunks from
# our list, add it to that new loop, and convert all other loops that might
# reference it to point at the loop instead.
while loops:
delete_header: Optional[int] = None
delete_blocks: Set[int] = set()
for header, blocks in loops.items():
# See if any of the blocks in this loop are the header of any other loop.
for block in blocks:
if block in loops and loops[block] is not blocks:
# This particular block of code is the header of another loop,
# so we shouldn't convert this loop until we handle the inner
# loop.
break
else:
# This loop does not contain any loops of its own. It is safe to
# convert.
self.vprint(f"Converting loop with header {header} and blocks {', '.join(str(b) for b in blocks)}.")
chunks_by_id[header] = Loop(header, [chunks_by_id[i] for i in blocks])
# These blocks are now part of the loop, so we need to remove them
# from the IDed chunks as well as from existing loops.
delete_blocks = {block for block in blocks if block != header}
delete_header = header
break
if delete_header is None:
# We must find at LEAST one loop that has no inner loops of its own.
raise Exception("Logic error!")
# Remove this loop from the processing list
del loops[delete_header]
# Go through and remove the rest of the chunks from the rest of the loops
loops = {header: {block for block in blocks if block not in delete_blocks} for (header, blocks) in loops.items()}
# Also remove the rest of the chunks from our IDed chunks as they are part of this loop now.
for block in delete_blocks:
del chunks_by_id[block]
# Verify that we don't have any existing chunks that point at the non-header portion of the loop.
for _, chunk_or_loop in chunks_by_id.items():
for nextid in chunk_or_loop.next_chunks:
if nextid in delete_blocks:
# Woah, we point at a chunk inside this loop that isn't the header!
raise Exception("Logic error!")
return [chunks_by_id[i] for i in chunks_by_id]
def __decompile(self) -> str:
# First, we need to construct a control flow graph.
chunks = self.__graph_control_flow()
# Now, compute dominators so we can locate back-refs.
dominators = self.__compute_dominators(chunks)
# Now, separate chunks out into chunks and loops.
chunks_and_loops = self.__separate_loops(chunks, dominators)
self.vprint(chunks_and_loops)
return "TODO"
def decompile(self, verbose: bool = False) -> str:
with self.debugging(verbose):
return self.__decompile()