mirror of
https://github.com/pret/pokemon-reverse-engineering-tools.git
synced 2026-04-24 23:27:03 -05:00
Refactor the compressor again.
This is a little closer to the target compressor than before.
This commit is contained in:
parent
78aa4f00be
commit
36f7ee513e
|
|
@ -216,82 +216,218 @@ class Compressed:
|
|||
c = Compressed()
|
||||
c.data = data
|
||||
lz = c.compress()
|
||||
|
||||
There are some issues with reproducing the target compressor.
|
||||
Some notes are listed here:
|
||||
- the criteria for detecting a lookback is inconsistent
|
||||
- sometimes lookbacks that are mostly 0s are pruned, sometimes not
|
||||
- target appears to skip ahead if it can use a lookback soon, stopping the current command short or in some cases truncating it with literals.
|
||||
- this has been implemented, but the specifics are unknown
|
||||
- self.min_scores: It's unknown if blank's minimum score should be 1 or 2. Most likely it's 1, with some other hack to account for edge cases.
|
||||
- may be related to the above
|
||||
- target does not appear to compress backwards
|
||||
"""
|
||||
|
||||
# The target compressor is not always as efficient as this implementation.
|
||||
# To ignore compatibility and spit out a smaller blob, pass in small=True.
|
||||
small = False
|
||||
|
||||
# BUG: literal [00] is a byte longer than blank 1.
|
||||
# In other words, blank's real minimum score is 1.
|
||||
# This bug exists in the target compressor as well,
|
||||
# so don't fix until we've given up on replicating it.
|
||||
min_scores = {
|
||||
'blank': 2,
|
||||
'iterate': 2,
|
||||
'alternate': 3,
|
||||
'repeat': 3,
|
||||
'reverse': 3,
|
||||
'flip': 3,
|
||||
}
|
||||
|
||||
preference = [
|
||||
'repeat',
|
||||
'blank',
|
||||
'reverse',
|
||||
'flip',
|
||||
'iterate',
|
||||
'alternate',
|
||||
#'literal',
|
||||
]
|
||||
|
||||
data = None
|
||||
commands = lz_commands
|
||||
debug = False
|
||||
literal_only = False
|
||||
|
||||
arg_names = 'data', 'commands', 'debug', 'literal_only'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.__dict__.update(dict(zip(self.arg_names, args)))
|
||||
|
||||
self.min_scores = {
|
||||
'blank': 1,
|
||||
'iterate': 2,
|
||||
'alternate': 3,
|
||||
'repeat': 3,
|
||||
'reverse': 3,
|
||||
'flip': 3,
|
||||
}
|
||||
|
||||
self.preference = [
|
||||
'repeat',
|
||||
'blank',
|
||||
'flip',
|
||||
'reverse',
|
||||
'iterate',
|
||||
'alternate',
|
||||
#'literal',
|
||||
]
|
||||
|
||||
self.lookback_methods = 'repeat', 'reverse', 'flip'
|
||||
|
||||
self.__dict__.update({
|
||||
'data': None,
|
||||
'commands': lz_commands,
|
||||
'debug': False,
|
||||
'literal_only': False,
|
||||
})
|
||||
|
||||
self.arg_names = 'data', 'commands', 'debug', 'literal_only'
|
||||
|
||||
self.__dict__.update(kwargs)
|
||||
self.__dict__.update(dict(zip(self.arg_names, args)))
|
||||
|
||||
if self.data is not None:
|
||||
self.compress()
|
||||
|
||||
def read_byte(self, address=None):
|
||||
def compress(self, data=None):
|
||||
if data is not None:
|
||||
self.data = data
|
||||
|
||||
self.data = list(bytearray(self.data))
|
||||
|
||||
self.indexes = {}
|
||||
self.lookbacks = {}
|
||||
for method in self.lookback_methods:
|
||||
self.lookbacks[method] = {}
|
||||
|
||||
self.address = 0
|
||||
self.end = len(self.data)
|
||||
self.output = []
|
||||
self.literal = None
|
||||
|
||||
while self.address < self.end:
|
||||
|
||||
if self.score():
|
||||
self.do_literal()
|
||||
self.do_winner()
|
||||
|
||||
else:
|
||||
if self.literal == None:
|
||||
self.literal = self.address
|
||||
self.address += 1
|
||||
|
||||
self.do_literal()
|
||||
|
||||
self.output += [lz_end]
|
||||
return self.output
|
||||
|
||||
def reset_scores(self):
|
||||
self.scores = {}
|
||||
self.offsets = {}
|
||||
self.helpers = {}
|
||||
for method in self.min_scores.iterkeys():
|
||||
self.scores[method] = 0
|
||||
|
||||
def bit_flip(self, byte):
|
||||
return bit_flipped[byte]
|
||||
|
||||
def do_literal(self):
|
||||
if self.literal != None:
|
||||
length = abs(self.address - self.literal)
|
||||
start = min(self.literal, self.address + 1)
|
||||
self.helpers['literal'] = self.data[start:start+length]
|
||||
self.do_cmd('literal', length)
|
||||
self.literal = None
|
||||
|
||||
def score(self):
|
||||
self.reset_scores()
|
||||
|
||||
map(self.score_literal, ['iterate', 'alternate', 'blank'])
|
||||
|
||||
for method in self.lookback_methods:
|
||||
self.scores[method], self.offsets[method] = self.find_lookback(method, self.address)
|
||||
|
||||
# Compatibility:
|
||||
# If a lookback is close, reduce the scores of other commands
|
||||
best_method, best_score = max(
|
||||
self.scores.items(),
|
||||
key = lambda x: (
|
||||
x[1],
|
||||
-self.preference.index(x[0])
|
||||
)
|
||||
)
|
||||
for method in self.lookback_methods:
|
||||
for address in xrange(self.address+1, self.address+min(best_score, 6)):
|
||||
if self.find_lookback(method, address)[0] > max(self.min_scores[method], best_score):
|
||||
# BUG: lookbacks can reduce themselves. This appears to be a bug in the target also.
|
||||
for m, score in self.scores.items():
|
||||
self.scores[m] = min(score, address - self.address)
|
||||
|
||||
return any(
|
||||
score
|
||||
> self.min_scores[method] + int(score > lowmax)
|
||||
for method, score in self.scores.iteritems()
|
||||
)
|
||||
|
||||
def read(self, address=None):
|
||||
if address is None:
|
||||
address = self.address
|
||||
if 0 <= address < len(self.data):
|
||||
return self.data[address]
|
||||
return None
|
||||
|
||||
def reset_scores(self):
|
||||
self.scores = {}
|
||||
self.offsets = {}
|
||||
for method in self.min_scores.keys():
|
||||
self.scores[method] = 0
|
||||
def find_all_lookbacks(self):
|
||||
for method in self.lookback_methods:
|
||||
for address, byte in enumerate(self.data):
|
||||
self.find_lookback(method, address)
|
||||
|
||||
def score_literal(self, method):
|
||||
address = self.address
|
||||
compare = {
|
||||
'blank': [0],
|
||||
'iterate': [self.read_byte(address)],
|
||||
'alternate': [self.read_byte(address), self.read_byte(address + 1)],
|
||||
def find_lookback(self, method, address=None):
|
||||
if address is None:
|
||||
address = self.address
|
||||
|
||||
existing = self.lookbacks.get(method, {}).get(address)
|
||||
if existing != None:
|
||||
return existing
|
||||
|
||||
lookback = 0, None
|
||||
|
||||
# Better to not carelessly optimize at the moment.
|
||||
"""
|
||||
if address < 2:
|
||||
return lookback
|
||||
"""
|
||||
|
||||
byte = self.read(address)
|
||||
if byte is None:
|
||||
return lookback
|
||||
|
||||
direction, mutate = {
|
||||
'repeat': ( 1, int),
|
||||
'reverse': (-1, int),
|
||||
'flip': ( 1, self.bit_flip),
|
||||
}[method]
|
||||
length = 0
|
||||
while self.read_byte(address) == compare[length % len(compare)]:
|
||||
length += 1
|
||||
address += 1
|
||||
self.scores[method] = length
|
||||
return compare
|
||||
|
||||
def precompute_repeat_matches(self):
|
||||
# Doesn't seem to help
|
||||
"""
|
||||
This is faster than redundantly searching each time repeats are scored.
|
||||
if mutate == self.bit_flip:
|
||||
if byte == 0:
|
||||
self.lookbacks[method][address] = lookback
|
||||
return lookback
|
||||
"""
|
||||
self.indexes = {}
|
||||
for byte in xrange(0x100):
|
||||
|
||||
data_len = len(self.data)
|
||||
is_two_byte_index = lambda index: int(index < address - 0x7f)
|
||||
|
||||
for index in self.get_indexes(mutate(byte)):
|
||||
|
||||
if index >= address:
|
||||
break
|
||||
|
||||
old_length, old_index = lookback
|
||||
if direction == 1:
|
||||
if old_length > data_len - index: break
|
||||
else:
|
||||
if old_length > index: continue
|
||||
|
||||
if self.read(index) in [None]: continue
|
||||
|
||||
length = 1 # we know there's at least one match, or we wouldn't be checking this index
|
||||
while 1:
|
||||
this_byte = self.read(address + length)
|
||||
that_byte = self.read(index + length * direction)
|
||||
if that_byte == None or this_byte != mutate(that_byte):
|
||||
break
|
||||
length += 1
|
||||
"""
|
||||
if direction == 1:
|
||||
if not any(self.data[address+2:address+length]): continue
|
||||
"""
|
||||
if length - is_two_byte_index(index) >= old_length - is_two_byte_index(old_index): # XXX >?
|
||||
# XXX maybe avoid two-byte indexes when possible
|
||||
lookback = length, index
|
||||
|
||||
self.lookbacks[method][address] = lookback
|
||||
return lookback
|
||||
|
||||
def get_indexes(self, byte):
|
||||
if not self.indexes.has_key(byte):
|
||||
self.indexes[byte] = []
|
||||
index = -1
|
||||
while 1:
|
||||
|
|
@ -300,156 +436,91 @@ class Compressed:
|
|||
except ValueError:
|
||||
break
|
||||
self.indexes[byte].append(index)
|
||||
return self.indexes[byte]
|
||||
|
||||
def score_repeats(self, name, direction=1, mutate=int):
|
||||
|
||||
def score_literal(self, method):
|
||||
address = self.address
|
||||
byte = mutate(self.data[address])
|
||||
|
||||
for index in self.indexes[byte]:
|
||||
if index >= address: break
|
||||
compare = {
|
||||
'blank': [0],
|
||||
'iterate': [self.read(address)],
|
||||
'alternate': [self.read(address), self.read(address + 1)],
|
||||
}[method]
|
||||
|
||||
length = 1 # we already know the first byte matches
|
||||
while 1:
|
||||
byte = self.read_byte(index + length * direction)
|
||||
if byte == None or mutate(byte) != self.read_byte(address + length):
|
||||
break
|
||||
length += 1
|
||||
# XXX may or may not be correct
|
||||
if method == 'alternate' and compare[0] == 0:
|
||||
return
|
||||
|
||||
# If repeats are almost entirely zeroes, just keep going and use blank instead.
|
||||
if all(x == 0 for x in self.data[ address + 2 : address + length ]):
|
||||
if self.read_byte(address + length) == 0:
|
||||
# zeroes continue after this chunk
|
||||
continue
|
||||
length = 0
|
||||
while self.read(address + length) == compare[length % len(compare)]:
|
||||
length += 1
|
||||
|
||||
# Adjust the score for two-byte offsets.
|
||||
two_byte_index = index < address - 0x7f
|
||||
if self.scores[name] >= length - int(two_byte_index):
|
||||
continue
|
||||
self.scores[method] = length
|
||||
self.helpers[method] = compare
|
||||
|
||||
self.scores [name] = length
|
||||
self.offsets[name] = index
|
||||
|
||||
def compress(self, data=None):
|
||||
"""
|
||||
This algorithm is greedy.
|
||||
It aims to match the compressor it's based on as closely as possible.
|
||||
It doesn't, but in the meantime the output is smaller.
|
||||
"""
|
||||
|
||||
if data is not None:
|
||||
self.data = data
|
||||
|
||||
self.data = list(bytearray(self.data))
|
||||
|
||||
self.address = 0
|
||||
self.end = len(self.data)
|
||||
self.output = []
|
||||
self.literal = []
|
||||
self.precompute_repeat_matches()
|
||||
|
||||
while self.address < self.end:
|
||||
|
||||
# Tally up the number of bytes that can be compressed
|
||||
# by a single command from the current address.
|
||||
|
||||
self.reset_scores()
|
||||
|
||||
# Check for repetition. Alternating bytes are common since graphics data is planar.
|
||||
|
||||
_, self.iter, self.alts = map(self.score_literal, ['blank', 'iterate', 'alternate'])
|
||||
|
||||
# Check if we can repeat any data that the decompressor just output (here, the input data).
|
||||
# This includes the current command's output.
|
||||
|
||||
for args in [
|
||||
('repeat', 1, int),
|
||||
('reverse', -1, int),
|
||||
('flip', 1, self.bit_flip),
|
||||
]:
|
||||
self.score_repeats(*args)
|
||||
|
||||
# If the scores are too low, try again from the next byte.
|
||||
|
||||
if self.literal_only or not any(
|
||||
self.min_scores.get(name, score)
|
||||
+ int(score > lowmax)
|
||||
< score
|
||||
for name, score in self.scores.items()
|
||||
):
|
||||
self.literal += [self.read_byte()]
|
||||
self.address += 1
|
||||
|
||||
else:
|
||||
self.do_literal() # payload
|
||||
self.do_scored()
|
||||
|
||||
# unload any literals we're sitting on
|
||||
self.do_literal()
|
||||
|
||||
self.output += [lz_end]
|
||||
|
||||
return self.output
|
||||
|
||||
def bit_flip(self, byte):
|
||||
return bit_flipped[byte]
|
||||
|
||||
def do_literal(self):
|
||||
if self.literal:
|
||||
length = len(self.literal)
|
||||
self.do_cmd('literal', length)
|
||||
self.literal = []
|
||||
|
||||
def do_scored(self):
|
||||
# Which command will compress the longest chunk?
|
||||
winner, score = sorted(
|
||||
self.scores.items(),
|
||||
key = lambda (name, score): (
|
||||
-(score - self.min_scores[name] - int(score > lowmax)),
|
||||
self.preference.index(name)
|
||||
def do_winner(self):
|
||||
winners = filter(
|
||||
lambda (method, score):
|
||||
score
|
||||
> self.min_scores[method] + int(score > lowmax),
|
||||
self.scores.iteritems()
|
||||
)
|
||||
winners.sort(
|
||||
key = lambda (method, score): (
|
||||
-(score - self.min_scores[method] - int(score > lowmax)),
|
||||
self.preference.index(method)
|
||||
)
|
||||
)[0]
|
||||
length = self.do_cmd(winner, score)
|
||||
)
|
||||
winner, score = winners[0]
|
||||
|
||||
length = min(score, max_length)
|
||||
self.do_cmd(winner, length)
|
||||
self.address += length
|
||||
|
||||
def do_cmd(self, cmd, length):
|
||||
length = min(length, max_length)
|
||||
start_address = self.address
|
||||
|
||||
cmd_length = length - 1
|
||||
|
||||
output = []
|
||||
|
||||
if length > lowmax:
|
||||
output += [(self.commands['long'] << 5) + (self.commands[cmd] << 2) + (cmd_length >> 8)]
|
||||
output += [cmd_length & 0xff]
|
||||
output.append(
|
||||
(self.commands['long'] << 5)
|
||||
+ (self.commands[cmd] << 2)
|
||||
+ (cmd_length >> 8)
|
||||
)
|
||||
output.append(
|
||||
cmd_length & 0xff
|
||||
)
|
||||
else:
|
||||
output += [(self.commands[cmd] << 5) + cmd_length]
|
||||
output.append(
|
||||
(self.commands[cmd] << 5)
|
||||
+ cmd_length
|
||||
)
|
||||
|
||||
output += {
|
||||
'literal': self.literal,
|
||||
'iterate': self.iter,
|
||||
'alternate': self.alts,
|
||||
'blank': [],
|
||||
}.get(cmd, [])
|
||||
self.helpers['blank'] = [] # quick hack
|
||||
output += self.helpers.get(cmd, [])
|
||||
|
||||
if cmd in ['repeat', 'reverse', 'flip']:
|
||||
if cmd in self.lookback_methods:
|
||||
offset = self.offsets[cmd]
|
||||
# Negative offsets are one byte.
|
||||
# Positive offsets are two.
|
||||
if self.address - offset <= 0x7f:
|
||||
offset = self.address - offset + 0x80
|
||||
offset -= 1 # this is a hack, but it seems to work
|
||||
if start_address - offset <= 0x7f:
|
||||
offset = start_address - offset + 0x80
|
||||
offset -= 1 # this seems to work
|
||||
output += [offset]
|
||||
else:
|
||||
output += [offset / 0x100, offset % 0x100] # big endian
|
||||
|
||||
if self.debug:
|
||||
print (
|
||||
print ' '.join(map(str, [
|
||||
cmd, length, '\t',
|
||||
' '.join(map('{:02x}'.format, output))
|
||||
)
|
||||
' '.join(map('{:02x}'.format, output)),
|
||||
self.data[start_address:start_address+length] if cmd in self.lookback_methods else '',
|
||||
]))
|
||||
|
||||
self.output += output
|
||||
return length
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user