import xml.etree.ElementTree as ET from os import path from tinydb import Query, where import config import random import time from fastapi import APIRouter, Request, Response from core_common import core_process_request, core_prepare_response, E from core_database import get_db router = APIRouter(prefix="/local2", tags=["local2"]) router.model_whitelist = ["KFC"] def get_profile(cid): return get_db().table("sdvx_profile").get(where("card") == cid) def get_game_profile(cid, game_version): profile = get_profile(cid) return profile["version"].get(str(game_version), None) def get_id_from_profile(cid): profile = get_db().table("sdvx_profile").get(where("card") == cid) djid = "%08d" % profile["sdvx_id"] djid_split = "-".join([djid[:4], djid[4:]]) return profile["sdvx_id"], djid_split @router.post("/{gameinfo}/game/sv{ver}_common") async def game_sv_common(ver: str, request: Request): request_info = await core_process_request(request) event = [ "DEMOGAME_PLAY", "MATCHING_MODE", "MATCHING_MODE_FREE_IP", "LEVEL_LIMIT_EASING", "ACHIEVEMENT_ENABLE", "APICAGACHADRAW\t30", "VOLFORCE_ENABLE", "AKANAME_ENABLE", "PAUSE_ONLINEUPDATE", "CONTINUATION", "TENKAICHI_MODE", "QC_MODE", "KAC_MODE", # "APPEAL_CARD_GEN_PRICE\t100", # "APPEAL_CARD_GEN_NEW_PRICE\t200", # "APPEAL_CARD_UNLOCK\t0,20170914,0,20171014,0,20171116,0,20180201,0,20180607,0,20181206,0,20200326,0,20200611,4,10140732,6,10150431", "FAVORITE_APPEALCARD_MAX\t200", "FAVORITE_MUSIC_MAX\t200", "EVENTDATE_APRILFOOL", "KONAMI_50TH_LOGO", "OMEGA_ARS_ENABLE", "DISABLE_MONITOR_ID_CHECK", "SKILL_ANALYZER_ABLE", "BLASTER_ABLE", "STANDARD_UNLOCK_ENABLE", "PLAYERJUDGEADJ_ENABLE", "MIXID_INPUT_ENABLE", "EVENTDATE_ONIGO", "EVENTDATE_GOTT", "GENERATOR_ABLE", "CREW_SELECT_ABLE", "PREMIUM_TIME_ENABLE", "OMEGA_ENABLE\t1,2,3,4,5,6,7,8,9", "HEXA_ENABLE\t1,2,3,4,5,6,7,8,9,10,11", "HEXA_OVERDRIVE_ENABLE\t8", "MEGAMIX_ENABLE", "VALGENE_ENABLE", "ARENA_ENABLE", "ARENA_LOCAL_TO_ONLINE_ENABLE", "ARENA_ALTER_MODE_WINDOW_ENABLE", "ARENA_PASS_MATCH_WINDOW_ENABLE", "DEMOLOOP_PASELI_FESTIVAL_2022", "DISABLED_MUSIC_IN_ARENA_ONLINE", "ARENA_VOTE_MODE_ENABLE", "DISP_PASELI_BANNER", "S_PUC_EFFECT_ENABLE", "SUPER_RANDOM_ACTIVE", "PLAYER_RADAR_ENABLE", "APRIL_RAINBOW_LINE_ACTIVE", "USE_CUDA_VIDEO_PRESENTER", "CHARACTER_IGNORE_DISABLE\t122,123,131,139,140,143,149,160,162,163,164,167,170,174", "STAMP_IGNORE_DISABLE\t273~312,773~820,993~1032,1245~1284,1469~1508,1585~1632,1633~1672,1737~1776,1777~1816,1897~1936", "SUBBG_IGNORE_DISABLE\t166~185,281~346,369~381,419~438,464~482,515~552,595~616,660~673,714~727", ] unlock = [] for f in ( path.join("modules", "sdvx", "music_db.xml"), path.join("music_db.xml"), ): if path.exists(f): with open(f, "r", encoding="shift_jisx0213") as fp: tree = ET.parse(fp, ET.XMLParser()) mdb = tree.getroot() for entry in mdb: mid = entry.get("id") # print(mid) difficulties = { 0: "novice", 1: "advanced", 2: "exhaust", 3: "infinite", 4: "maximum", 5: "ultimate", } for k in difficulties: d = entry.find("difficulty").find(difficulties[k]) if d is not None: limit = int(d.find("limited").text) if limit != 3: # print(mid, difficulties[k], limit) unlock.append([mid, k]) break if unlock == []: for i in range(2400): for j in range(0, 5): unlock.append([i, j]) response = E.response( E.game( E.event( *[ E.info( E.event_id(s, __type="str"), ) for s in event ], ), E.music_limited( *[ E.info( E.music_id(s[0], __type="s32"), E.music_type(s[1], __type="u8"), E.limited(3, __type="u8"), ) for s in unlock ], ), ) ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_new") async def game_sv_new(ver: str, request: Request): request_info = await core_process_request(request) game_version = request_info["game_version"] root = request_info["root"][0] dataid = root.find("dataid").text cardno = root.find("cardno").text name = root.find("name").text db = get_db().table("sdvx_profile") all_profiles_for_card = db.get(Query().card == dataid) if all_profiles_for_card is None: all_profiles_for_card = {"card": dataid, "version": {}} if "sdvx_id" not in all_profiles_for_card: sdvx_id = random.randint(10000000, 99999999) all_profiles_for_card["sdvx_id"] = sdvx_id all_profiles_for_card["version"][str(game_version)] = { "game_version": game_version, "name": name, "appeal_id": 0, "skill_level": 0, "skill_base_id": 0, "skill_name_id": 0, "earned_gamecoin_packet": 0, "earned_gamecoin_block": 0, "earned_blaster_energy": 0, "earned_extrack_energy": 0, "used_packet_booster": 0, "used_block_booster": 0, "hispeed": 0, "lanespeed": 0, "gauge_option": 0, "ars_option": 0, "notes_option": 0, "early_late_disp": 0, "draw_adjust": 0, "eff_c_left": 0, "eff_c_right": 1, "music_id": 0, "music_type": 0, "sort_type": 0, "narrow_down": 0, "headphone": 1, "print_count": 0, "start_option": 0, "bgm": 0, "submonitor": 0, "nemsys": 0, "stampA": 0, "stampB": 0, "stampC": 0, "stampD": 0, "items": [], "params": [], } db.upsert(all_profiles_for_card, where("card") == dataid) response = E.response( E.game( E.result(0, __type="u8"), ), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_load") async def game_sv_load(ver: str, request: Request): request_info = await core_process_request(request) game_version = request_info["game_version"] dataid = request_info["root"][0].find("dataid").text profile = get_game_profile(dataid, game_version) if profile: djid, djid_split = get_id_from_profile(dataid) unlock = [] for i in range(301): unlock.append([i, 11, 15]) for i in range(6001): unlock.append([i, 1, 1]) unlock.append([599, 4, 10]) for item in profile["items"]: unlock.append(item) customize = [ [ 2, 2, [ profile["bgm"], profile["submonitor"], profile["nemsys"], profile["stampA"], profile["stampB"], profile["stampC"], profile["stampD"], ], ] ] for item in profile["params"]: customize.append(item) response = E.response( E.game( E.result(0, __type="u8"), E.name(profile["name"], __type="str"), E.code(djid_split, __type="str"), E.sdvx_id(djid_split, __type="str"), E.appeal_id(profile["appeal_id"], __type="u16"), E.skill_level(profile["skill_level"], __type="s16"), E.skill_base_id(profile["skill_base_id"], __type="s16"), E.skill_name_id(profile["skill_name_id"], __type="s16"), E.gamecoin_packet(profile["earned_gamecoin_packet"], __type="u32"), E.gamecoin_block(profile["earned_gamecoin_block"], __type="u32"), E.blaster_energy(profile["earned_blaster_energy"], __type="u32"), E.blaster_count(9999, __type="u32"), E.extrack_energy(profile["earned_extrack_energy"], __type="u16"), E.play_count(1001, __type="u32"), E.day_count(301, __type="u32"), E.today_count(21, __type="u32"), E.play_chain(31, __type="u32"), E.max_play_chain(31, __type="u32"), E.week_count(9, __type="u32"), E.week_play_count(101, __type="u32"), E.week_chain(31, __type="u32"), E.max_week_chain(1001, __type="u32"), E.creator_id(1, __type="u32"), E.eaappli(E.relation(1, __type="s8")), E.ea_shop( E.blaster_pass_enable(1, __type="bool"), E.blaster_pass_limit_date(1605871200, __type="u64"), ), E.kac_id(profile["name"], __type="str"), E.block_no(0, __type="s32"), E.volte_factory( *[ E.info( E.goods_id(s, __type="s32"), E.status(1, __type="s32"), ) for s in range(1, 999) ], ), *[ E.campaign( E.campaign_id(s, __type="s32"), E.jackpot_flg(1, __type="bool"), ) for s in range(99) ], E.cloud(E.relation(1, __type="s8")), E.something( *[ E.info( E.ranking_id(s[0], __type="s32"), E.value(s[1], __type="s64"), ) for s in [[1402, 20000]] ], ), E.festival( E.fes_id(1, __type="s32"), E.live_energy(1000000, __type="s32"), *[ E.bonus( E.energy_type(s, __type="s32"), E.live_energy(1000000, __type="s32"), ) for s in range(1, 6) ], ), E.valgene_ticket( E.ticket_num(0, __type="s32"), E.limit_date(1605871200, __type="u64"), ), E.arena( E.last_play_season(0, __type="s32"), E.rank_point(0, __type="s32"), E.shop_point(0, __type="s32"), E.ultimate_rate(0, __type="s32"), E.ultimate_rank_num(0, __type="s32"), E.rank_play_cnt(0, __type="s32"), E.ultimate_play_cnt(0, __type="s32"), ), E.hispeed(profile["hispeed"], __type="s32"), E.lanespeed(profile["lanespeed"], __type="u32"), E.gauge_option(profile["gauge_option"], __type="u8"), E.ars_option(profile["ars_option"], __type="u8"), E.notes_option(profile["notes_option"], __type="u8"), E.early_late_disp(profile["early_late_disp"], __type="u8"), E.draw_adjust(profile["draw_adjust"], __type="s32"), E.eff_c_left(profile["eff_c_left"], __type="u8"), E.eff_c_right(profile["eff_c_right"], __type="u8"), E.last_music_id(profile["music_id"], __type="s32"), E.last_music_type(profile["music_type"], __type="u8"), E.sort_type(profile["sort_type"], __type="u8"), E.narrow_down(profile["narrow_down"], __type="u8"), E.headphone(profile["headphone"], __type="u8"), E.item( *[ E.info( E.id(s[0], __type="u32"), E.type(s[1], __type="u8"), E.param(s[2], __type="u32"), ) for s in unlock ], ), E.param( *[ E.info( E.type(s[0], __type="s32"), E.id(s[1], __type="s32"), E.param(s[2], __type="s32", __count=len(s[2])), ) for s in customize ], ), ), ) else: response = E.response( E.game( E.result(1, __type="u8"), ) ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_load_m") async def game_sv_load_m(ver: str, request: Request): request_info = await core_process_request(request) game_version = request_info["game_version"] dataid = request_info["root"][0].find("refid").text profile = get_game_profile(dataid, game_version) djid, djid_split = get_id_from_profile(dataid) best_scores = [] db = get_db() for record in db.table("sdvx_scores_best").search( (where("game_version") == game_version) & (where("sdvx_id") == djid) ): best_scores.append( [ record["music_id"], record["music_type"], record["score"], record["exscore"], record["clear_type"], record["score_grade"], 0, 0, record["btn_rate"], record["long_rate"], record["vol_rate"], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ] ) if int(ver) == 7: best_scores[-1].extend([0, 0, 0, 0, 0]) response = E.response( E.game( E.music( *[ E.info( E.param(x, __type="u32"), ) for x in best_scores ], ), ), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_save") async def game_sv_save(ver: str, request: Request): request_info = await core_process_request(request) game_version = request_info["game_version"] dataid = request_info["root"][0].find("refid").text profile = get_profile(dataid) game_profile = profile["version"].get(str(game_version), {}) root = request_info["root"][0] game_profile["appeal_id"] = int(root.find("appeal_id").text) nodes = [ "appeal_id", "skill_level", "skill_base_id", "skill_name_id", "skill_type", "earned_gamecoin_packet", "earned_gamecoin_block", "earned_blaster_energy", "earned_extrack_energy", # "hispeed", "lanespeed", "gauge_option", "ars_option", "notes_option", "early_late_disp", "draw_adjust", "eff_c_left", "eff_c_right", "music_id", "music_type", "sort_type", "narrow_down", "headphone", "start_option", ] for node in nodes: n = root.find(node) if n is not None: if node.startswith("earned_"): game_profile[node] += int(n.text) else: game_profile[node] = int(n.text) game_profile["used_packet_booster"] = int(root.find("ea_shop")[0].text) game_profile["used_block_booster"] = int(root.find("ea_shop")[1].text) game_profile["print_count"] = int(root.find("print")[0].text) # item fix (copied from drs, this is regarded) old_items = game_profile["items"] items = {} for old in old_items: t = str(old[0]) i = str(old[1]) p = str(old[2]) if t not in items: items[t] = {} if i not in items[t]: items[t][i] = {} items[t][i] = p for info in root.find("item"): t = info.find("id").text i = info.find("type").text p = info.find("param").text if t not in items: items[t] = {} if i not in items[t]: items[t][i] = {} items[t][i] = p items_list = [] for t in items: for i in items[t]: items_list.append([int(t), int(i), int(items[t][i])]) game_profile["items"] = items_list # param fix (copied from drs, this is regarded) old_params = game_profile["params"] params = {} for old in old_params: t = str(old[0]) i = str(old[1]) p = old[2] if t not in params: params[t] = {} if i not in params[t]: params[t][i] = {} params[t][i] = p for info in root.find("param"): t = info.find("type").text i = info.find("id").text p = info.find("param") if t not in params: params[t] = {} if i not in params[t]: params[t][i] = {} params[t][i] = [int(x) for x in p.text.split(" ")] params_list = [] for t in params: for i in params[t]: params_list.append([int(t), int(i), params[t][i]]) game_profile["params"] = params_list profile["version"][str(game_version)] = game_profile get_db().table("sdvx_profile").upsert(profile, where("card") == dataid) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_save_m") async def game_sv_save_m(ver: str, request: Request): request_info = await core_process_request(request) game_version = request_info["game_version"] timestamp = time.time() root = request_info["root"][0] dataid = root.find("dataid").text profile = get_game_profile(dataid, game_version) djid, djid_split = get_id_from_profile(dataid) track = root.find("track") play_id = int(track.find("play_id").text) music_id = int(track.find("music_id").text) music_type = int(track.find("music_type").text) score = int(track.find("score").text) exscore = int(track.find("exscore").text) clear_type = int(track.find("clear_type").text) score_grade = int(track.find("score_grade").text) max_chain = int(track.find("max_chain").text) just = int(track.find("just").text) critical = int(track.find("critical").text) near = int(track.find("near").text) error = int(track.find("error").text) effective_rate = int(track.find("effective_rate").text) btn_rate = int(track.find("btn_rate").text) long_rate = int(track.find("long_rate").text) vol_rate = int(track.find("vol_rate").text) mode = int(track.find("mode").text) gauge_type = int(track.find("gauge_type").text) notes_option = int(track.find("notes_option").text) online_num = int(track.find("online_num").text) local_num = int(track.find("local_num").text) challenge_type = int(track.find("challenge_type").text) retry_cnt = int(track.find("retry_cnt").text) judge = [int(x) for x in track.find("judge").text.split(" ")] db = get_db() db.table("sdvx_scores").insert( { "timestamp": timestamp, "game_version": game_version, "sdvx_id": djid, "play_id": play_id, "music_id": music_id, "music_type": music_type, "score": score, "exscore": exscore, "clear_type": clear_type, "score_grade": score_grade, "max_chain": max_chain, "just": just, "critical": critical, "near": near, "error": error, "effective_rate": effective_rate, "btn_rate": btn_rate, "long_rate": long_rate, "vol_rate": vol_rate, "mode": mode, "gauge_type": gauge_type, "notes_option": notes_option, "online_num": online_num, "local_num": local_num, "challenge_type": challenge_type, "retry_cnt": retry_cnt, "judge": judge, }, ) best = db.table("sdvx_scores_best").get( (where("sdvx_id") == djid) & (where("game_version") == game_version) & (where("music_id") == music_id) & (where("music_type") == music_type) ) best = {} if best is None else best best_score_data = { "game_version": game_version, "sdvx_id": djid, "name": profile["name"], "music_id": music_id, "music_type": music_type, "score": max(score, best.get("score", score)), "exscore": max(exscore, best.get("exscore", exscore)), "clear_type": max(clear_type, best.get("clear_type", clear_type)), "score_grade": max(score_grade, best.get("score_grade", score_grade)), "btn_rate": max(btn_rate, best.get("btn_rate", btn_rate)), "long_rate": max(long_rate, best.get("long_rate", long_rate)), "vol_rate": max(vol_rate, best.get("vol_rate", vol_rate)), } db.table("sdvx_scores_best").upsert( best_score_data, (where("sdvx_id") == djid) & (where("game_version") == game_version) & (where("music_id") == music_id) & (where("music_type") == music_type), ) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_hiscore") async def game_sv_hiscore(ver: str, request: Request): request_info = await core_process_request(request) game_version = request_info["game_version"] best_scores = [] db = get_db() for record in db.table("sdvx_scores_best").search( (where("game_version") == game_version) ): best_scores.append( [ record["music_id"], record["music_type"], record["sdvx_id"], record["name"], record["score"], ] ) response = E.response( E.game( E.sc( *[ E.d( E.id(s[0], __type="u32"), E.ty(s[1], __type="u32"), E.a_sq(s[2], __type="str"), E.a_nm(s[3], __type="str"), E.a_sc(s[4], __type="u32"), E.l_sq(s[2], __type="str"), E.l_nm(s[3], __type="str"), E.l_sc(s[4], __type="u32"), ) for s in best_scores ], ), ), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_lounge") async def game_sv_lounge(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(E.interval(30, __type="u32")), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_shop") async def game_sv_shop(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(E.nxt_time(1000 * 5 * 60, __type="u32")), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_load_r") async def game_sv_load_r(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_frozen") async def game_sv_frozen(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_save_e") async def game_sv_save_e(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_save_mega") async def game_sv_save_mega(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_play_e") async def game_sv_play_e(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_play_s") async def game_sv_play_s(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_entry_s") async def game_sv_entry_s(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_entry_e") async def game_sv_entry_e(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers) @router.post("/{gameinfo}/game/sv{ver}_log") async def game_sv_log(ver: str, request: Request): request_info = await core_process_request(request) response = E.response( E.game(), ) response_body, response_headers = await core_prepare_response(request, response) return Response(content=response_body, headers=response_headers)