mirror of
https://gitea.tendokyu.moe/Hay1tsme/artemis.git
synced 2026-03-21 18:04:18 -05:00
mai2: fixup for pre-dx table reading
This commit is contained in:
parent
2d2b24739d
commit
d3c978de57
|
|
@ -67,58 +67,34 @@ class Mai2Reader(BaseReader):
|
|||
else:
|
||||
key = None
|
||||
|
||||
evt_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmEvent.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
jp_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmtextout_jp.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
#en_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmtextout_ex.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
en_table = []
|
||||
score_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmScore.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
music_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmMusic.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
genre_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmGenre.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
jp_table = self.parse_textout(f"{self.bin_dir}/tables", "mmtextout_jp.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
en_table = self.parse_textout(f"{self.bin_dir}/tables", "mmtextout_ex.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
evt_table = self.parse_table(f"{self.bin_dir}/tables", "mmEvent.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
score_table = self.parse_table(f"{self.bin_dir}/tables", "mmScore.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
music_table = self.parse_table(f"{self.bin_dir}/tables", "mmMusic.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
genre_table = self.parse_table(f"{self.bin_dir}/tables", "mmGenre.bin", key, f"{self.opt_dir}/tables" if self.opt_dir else None)
|
||||
|
||||
txt_table = {} # Build our translation table
|
||||
for entry in jp_table:
|
||||
vals = list(entry.values())
|
||||
txt_table[vals[0]] = ", ".join(vals[1:])
|
||||
for entry in en_table:
|
||||
vals = list(entry.values())
|
||||
txt_table[vals[0]] = ", ".join(vals[1:])
|
||||
|
||||
genre_lookup = {}
|
||||
for entry in genre_table:
|
||||
genre_lookup[entry['ID']] = txt_table[entry['名前テキスト']]
|
||||
genre_lookup[entry['ID']] = jp_table[entry['名前テキスト']]
|
||||
|
||||
self.logger.info(f"Insert {len(evt_table)} events")
|
||||
await self.read_old_events(evt_table)
|
||||
self.logger.info(f"Insert {len(score_table)} charts")
|
||||
await self.read_old_music(music_table, score_table, txt_table, genre_lookup)
|
||||
await self.read_old_music(music_table, score_table, jp_table, genre_lookup)
|
||||
|
||||
def load_table_raw(self, dir: str, file: str, key: Optional[bytes], opt_dir: Optional[str] = None) -> Optional[List[Dict[str, str]]]:
|
||||
if opt_dir is not None and os.path.exists(f"{opt_dir}/{file}"):
|
||||
fpath = f"{opt_dir}/{file}"
|
||||
else:
|
||||
fpath = f"{dir}/{file}"
|
||||
def parse_textout(self, dir: str, file: str, key: Optional[bytes], opt_dir: Optional[str] = None) -> Dict[str, str]:
|
||||
f_decoded = self.load_table_raw(dir, file, key, opt_dir)
|
||||
out = {}
|
||||
for line in f_decoded.splitlines():
|
||||
matcher = re.match(r"^[A-Z]+\( L\"(.+)\" ,L\"(.*)\" \)$", line)
|
||||
if not matcher: continue
|
||||
out[matcher.group(1)] = matcher.group(2)
|
||||
|
||||
if not os.path.exists(fpath):
|
||||
self.logger.warning(f"file {file} does not exist in directory {dir}, skipping")
|
||||
return
|
||||
|
||||
self.logger.info(f"Load table {fpath}")
|
||||
if key is not None:
|
||||
cipher = AES.new(key, AES.MODE_CBC)
|
||||
with open(fpath, "rb") as f:
|
||||
f_encrypted = f.read()
|
||||
f_data = cipher.decrypt(f_encrypted)[0x10:]
|
||||
|
||||
else:
|
||||
with open(fpath, "rb") as f:
|
||||
f_data = f.read()[0x10:]
|
||||
|
||||
if f_data is None or not f_data:
|
||||
self.logger.warning(f"file {fpath} could not be read, skipping")
|
||||
return
|
||||
|
||||
f_data_deflate = zlib.decompress(f_data, wbits = zlib.MAX_WBITS | 16)[0x12:] # lop off the junk at the beginning
|
||||
f_decoded = codecs.utf_16_le_decode(f_data_deflate)[0]
|
||||
return out
|
||||
|
||||
def parse_table(self, dir: str, file: str, key: Optional[bytes], opt_dir: Optional[str] = None) -> Optional[List[Dict[str, str]]]:
|
||||
f_decoded = self.load_table_raw(dir, file, key, opt_dir)
|
||||
f_split = f_decoded.splitlines()
|
||||
|
||||
has_struct_def = "struct " in f_decoded
|
||||
|
|
@ -193,6 +169,34 @@ class Mai2Reader(BaseReader):
|
|||
self.logger.warning("Failed load table content, skipping")
|
||||
return
|
||||
|
||||
def load_table_raw(self, dir: str, file: str, key: Optional[bytes], opt_dir: Optional[str] = None) -> str:
|
||||
if opt_dir is not None and os.path.exists(f"{opt_dir}/{file}"):
|
||||
fpath = f"{opt_dir}/{file}"
|
||||
else:
|
||||
fpath = f"{dir}/{file}"
|
||||
|
||||
if not os.path.exists(fpath):
|
||||
self.logger.warning(f"file {file} does not exist in directory {dir}, skipping")
|
||||
return
|
||||
|
||||
self.logger.info(f"Load table {fpath}")
|
||||
if key is not None:
|
||||
cipher = AES.new(key, AES.MODE_CBC)
|
||||
with open(fpath, "rb") as f:
|
||||
f_encrypted = f.read()
|
||||
f_data = cipher.decrypt(f_encrypted)[0x10:]
|
||||
|
||||
else:
|
||||
with open(fpath, "rb") as f:
|
||||
f_data = f.read()[0x10:]
|
||||
|
||||
if f_data is None or not f_data:
|
||||
self.logger.warning(f"file {fpath} could not be read, skipping")
|
||||
return
|
||||
|
||||
f_data_deflate = zlib.decompress(f_data, wbits = zlib.MAX_WBITS | 16)[0x12:] # lop off the junk at the beginning
|
||||
return codecs.utf_16_le_decode(f_data_deflate)[0]
|
||||
|
||||
async def get_events(self, base_dir: str, opt_id: int = None) -> None:
|
||||
self.logger.info(f"Reading events from {base_dir}...")
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user