MonkeyBusiness/modules/sdvx/game.py
2025-12-24 14:30:02 +00:00

880 lines
28 KiB
Python

import xml.etree.ElementTree as ET
from os import path
from tinydb import Query, where
import config
import random
import time
from fastapi import APIRouter, Request, Response
from core_common import core_process_request, core_prepare_response, E
from core_database import get_db
router = APIRouter(prefix="/local2", tags=["local2"])
router.model_whitelist = ["KFC"]
def get_profile(cid):
return get_db().table("sdvx_profile").get(where("card") == cid)
def get_game_profile(cid, game_version):
profile = get_profile(cid)
return profile["version"].get(str(game_version), None)
def get_id_from_profile(cid):
profile = get_db().table("sdvx_profile").get(where("card") == cid)
djid = "%08d" % profile["sdvx_id"]
djid_split = "-".join([djid[:4], djid[4:]])
return profile["sdvx_id"], djid_split
@router.post("/{gameinfo}/game/sv{ver}_common")
async def game_sv_common(ver: str, request: Request):
request_info = await core_process_request(request)
event = [
"DEMOGAME_PLAY",
"MATCHING_MODE",
"MATCHING_MODE_FREE_IP",
"LEVEL_LIMIT_EASING",
"ACHIEVEMENT_ENABLE",
"APICAGACHADRAW\t30",
"VOLFORCE_ENABLE",
"AKANAME_ENABLE",
"PAUSE_ONLINEUPDATE",
"CONTINUATION",
"TENKAICHI_MODE",
"QC_MODE",
"KAC_MODE",
# "APPEAL_CARD_GEN_PRICE\t100",
# "APPEAL_CARD_GEN_NEW_PRICE\t200",
# "APPEAL_CARD_UNLOCK\t0,20170914,0,20171014,0,20171116,0,20180201,0,20180607,0,20181206,0,20200326,0,20200611,4,10140732,6,10150431",
"FAVORITE_APPEALCARD_MAX\t200",
"FAVORITE_MUSIC_MAX\t200",
"EVENTDATE_APRILFOOL",
"KONAMI_50TH_LOGO",
"OMEGA_ARS_ENABLE",
"DISABLE_MONITOR_ID_CHECK",
"SKILL_ANALYZER_ABLE",
"BLASTER_ABLE",
"STANDARD_UNLOCK_ENABLE",
"PLAYERJUDGEADJ_ENABLE",
"MIXID_INPUT_ENABLE",
"EVENTDATE_ONIGO",
"EVENTDATE_GOTT",
"GENERATOR_ABLE",
"CREW_SELECT_ABLE",
"PREMIUM_TIME_ENABLE",
"OMEGA_ENABLE\t1,2,3,4,5,6,7,8,9",
"HEXA_ENABLE\t1,2,3,4,5,6,7,8,9,10,11",
"HEXA_OVERDRIVE_ENABLE\t8",
"MEGAMIX_ENABLE",
"VALGENE_ENABLE",
"ARENA_ENABLE",
"ARENA_LOCAL_TO_ONLINE_ENABLE",
"ARENA_ALTER_MODE_WINDOW_ENABLE",
"ARENA_PASS_MATCH_WINDOW_ENABLE",
"DEMOLOOP_PASELI_FESTIVAL_2022",
"DISABLED_MUSIC_IN_ARENA_ONLINE",
"ARENA_VOTE_MODE_ENABLE",
"DISP_PASELI_BANNER",
"S_PUC_EFFECT_ENABLE",
"SUPER_RANDOM_ACTIVE",
"PLAYER_RADAR_ENABLE",
"APRIL_RAINBOW_LINE_ACTIVE",
"USE_CUDA_VIDEO_PRESENTER",
"CHARACTER_IGNORE_DISABLE\t122,123,131,139,140,143,149,160,162,163,164,167,170,174",
"STAMP_IGNORE_DISABLE\t273~312,773~820,993~1032,1245~1284,1469~1508,1585~1632,1633~1672,1737~1776,1777~1816,1897~1936",
"SUBBG_IGNORE_DISABLE\t166~185,281~346,369~381,419~438,464~482,515~552,595~616,660~673,714~727",
]
unlock = []
for f in (
path.join("modules", "sdvx", "music_db.xml"),
path.join("music_db.xml"),
):
if path.exists(f):
with open(f, "r", encoding="shift_jisx0213") as fp:
tree = ET.parse(fp, ET.XMLParser())
mdb = tree.getroot()
for entry in mdb:
mid = entry.get("id")
# print(mid)
difficulties = {
0: "novice",
1: "advanced",
2: "exhaust",
3: "infinite",
4: "maximum",
5: "ultimate",
}
for k in difficulties:
d = entry.find("difficulty").find(difficulties[k])
if d is not None:
limit = int(d.find("limited").text)
if limit != 3:
# print(mid, difficulties[k], limit)
unlock.append([mid, k])
break
if unlock == []:
for i in range(2400):
for j in range(0, 5):
unlock.append([i, j])
response = E.response(
E.game(
E.event(
*[
E.info(
E.event_id(s, __type="str"),
)
for s in event
],
),
E.music_limited(
*[
E.info(
E.music_id(s[0], __type="s32"),
E.music_type(s[1], __type="u8"),
E.limited(3, __type="u8"),
)
for s in unlock
],
),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_new")
async def game_sv_new(ver: str, request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
root = request_info["root"][0]
dataid = root.find("dataid").text
cardno = root.find("cardno").text
name = root.find("name").text
db = get_db().table("sdvx_profile")
all_profiles_for_card = db.get(Query().card == dataid)
if all_profiles_for_card is None:
all_profiles_for_card = {"card": dataid, "version": {}}
if "sdvx_id" not in all_profiles_for_card:
sdvx_id = random.randint(10000000, 99999999)
all_profiles_for_card["sdvx_id"] = sdvx_id
all_profiles_for_card["version"][str(game_version)] = {
"game_version": game_version,
"name": name,
"appeal_id": 0,
"skill_level": 0,
"skill_base_id": 0,
"skill_name_id": 0,
"earned_gamecoin_packet": 0,
"earned_gamecoin_block": 0,
"earned_blaster_energy": 0,
"earned_extrack_energy": 0,
"used_packet_booster": 0,
"used_block_booster": 0,
"hispeed": 0,
"lanespeed": 0,
"gauge_option": 0,
"ars_option": 0,
"notes_option": 0,
"early_late_disp": 0,
"draw_adjust": 0,
"eff_c_left": 0,
"eff_c_right": 1,
"music_id": 0,
"music_type": 0,
"sort_type": 0,
"narrow_down": 0,
"headphone": 1,
"print_count": 0,
"start_option": 0,
"bgm": 0,
"submonitor": 0,
"nemsys": 0,
"stampA": 0,
"stampB": 0,
"stampC": 0,
"stampD": 0,
"items": [],
"params": [],
}
db.upsert(all_profiles_for_card, where("card") == dataid)
response = E.response(
E.game(
E.result(0, __type="u8"),
),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_load")
async def game_sv_load(ver: str, request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
dataid = request_info["root"][0].find("dataid").text
profile = get_game_profile(dataid, game_version)
if profile:
djid, djid_split = get_id_from_profile(dataid)
unlock = []
for i in range(301):
unlock.append([i, 11, 15])
for i in range(6001):
unlock.append([i, 1, 1])
unlock.append([599, 4, 10])
for item in profile["items"]:
unlock.append(item)
customize = [
[
2,
2,
[
profile["bgm"],
profile["submonitor"],
profile["nemsys"],
profile["stampA"],
profile["stampB"],
profile["stampC"],
profile["stampD"],
],
]
]
for item in profile["params"]:
customize.append(item)
response = E.response(
E.game(
E.result(0, __type="u8"),
E.name(profile["name"], __type="str"),
E.code(djid_split, __type="str"),
E.sdvx_id(djid_split, __type="str"),
E.appeal_id(profile["appeal_id"], __type="u16"),
E.skill_level(profile["skill_level"], __type="s16"),
E.skill_base_id(profile["skill_base_id"], __type="s16"),
E.skill_name_id(profile["skill_name_id"], __type="s16"),
E.gamecoin_packet(profile["earned_gamecoin_packet"], __type="u32"),
E.gamecoin_block(profile["earned_gamecoin_block"], __type="u32"),
E.blaster_energy(profile["earned_blaster_energy"], __type="u32"),
E.blaster_count(9999, __type="u32"),
E.extrack_energy(profile["earned_extrack_energy"], __type="u16"),
E.play_count(1001, __type="u32"),
E.day_count(301, __type="u32"),
E.today_count(21, __type="u32"),
E.play_chain(31, __type="u32"),
E.max_play_chain(31, __type="u32"),
E.week_count(9, __type="u32"),
E.week_play_count(101, __type="u32"),
E.week_chain(31, __type="u32"),
E.max_week_chain(1001, __type="u32"),
E.creator_id(1, __type="u32"),
E.eaappli(E.relation(1, __type="s8")),
E.ea_shop(
E.blaster_pass_enable(1, __type="bool"),
E.blaster_pass_limit_date(1605871200, __type="u64"),
),
E.kac_id(profile["name"], __type="str"),
E.block_no(0, __type="s32"),
E.volte_factory(
*[
E.info(
E.goods_id(s, __type="s32"),
E.status(1, __type="s32"),
)
for s in range(1, 999)
],
),
*[
E.campaign(
E.campaign_id(s, __type="s32"),
E.jackpot_flg(1, __type="bool"),
)
for s in range(99)
],
E.cloud(E.relation(1, __type="s8")),
E.something(
*[
E.info(
E.ranking_id(s[0], __type="s32"),
E.value(s[1], __type="s64"),
)
for s in [[1402, 20000]]
],
),
E.festival(
E.fes_id(1, __type="s32"),
E.live_energy(1000000, __type="s32"),
*[
E.bonus(
E.energy_type(s, __type="s32"),
E.live_energy(1000000, __type="s32"),
)
for s in range(1, 6)
],
),
E.valgene_ticket(
E.ticket_num(0, __type="s32"),
E.limit_date(1605871200, __type="u64"),
),
E.arena(
E.last_play_season(0, __type="s32"),
E.rank_point(0, __type="s32"),
E.shop_point(0, __type="s32"),
E.ultimate_rate(0, __type="s32"),
E.ultimate_rank_num(0, __type="s32"),
E.rank_play_cnt(0, __type="s32"),
E.ultimate_play_cnt(0, __type="s32"),
),
E.hispeed(profile["hispeed"], __type="s32"),
E.lanespeed(profile["lanespeed"], __type="u32"),
E.gauge_option(profile["gauge_option"], __type="u8"),
E.ars_option(profile["ars_option"], __type="u8"),
E.notes_option(profile["notes_option"], __type="u8"),
E.early_late_disp(profile["early_late_disp"], __type="u8"),
E.draw_adjust(profile["draw_adjust"], __type="s32"),
E.eff_c_left(profile["eff_c_left"], __type="u8"),
E.eff_c_right(profile["eff_c_right"], __type="u8"),
E.last_music_id(profile["music_id"], __type="s32"),
E.last_music_type(profile["music_type"], __type="u8"),
E.sort_type(profile["sort_type"], __type="u8"),
E.narrow_down(profile["narrow_down"], __type="u8"),
E.headphone(profile["headphone"], __type="u8"),
E.item(
*[
E.info(
E.id(s[0], __type="u32"),
E.type(s[1], __type="u8"),
E.param(s[2], __type="u32"),
)
for s in unlock
],
),
E.param(
*[
E.info(
E.type(s[0], __type="s32"),
E.id(s[1], __type="s32"),
E.param(s[2], __type="s32", __count=len(s[2])),
)
for s in customize
],
),
),
)
else:
response = E.response(
E.game(
E.result(1, __type="u8"),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_load_m")
async def game_sv_load_m(ver: str, request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
dataid = request_info["root"][0].find("refid").text
profile = get_game_profile(dataid, game_version)
djid, djid_split = get_id_from_profile(dataid)
best_scores = []
db = get_db()
for record in db.table("sdvx_scores_best").search(
(where("game_version") == game_version) & (where("sdvx_id") == djid)
):
best_scores.append(
[
record["music_id"],
record["music_type"],
record["score"],
record["exscore"],
record["clear_type"],
record["score_grade"],
0,
0,
record["btn_rate"],
record["long_rate"],
record["vol_rate"],
0,
0,
0,
0,
0,
0,
0,
0,
0,
0
]
)
if int(ver) == 7:
best_scores[-1].extend([0, 0, 0, 0, 0])
response = E.response(
E.game(
E.music(
*[
E.info(
E.param(x, __type="u32"),
)
for x in best_scores
],
),
),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_save")
async def game_sv_save(ver: str, request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
dataid = request_info["root"][0].find("refid").text
profile = get_profile(dataid)
game_profile = profile["version"].get(str(game_version), {})
root = request_info["root"][0]
game_profile["appeal_id"] = int(root.find("appeal_id").text)
nodes = [
"appeal_id",
"skill_level",
"skill_base_id",
"skill_name_id",
"skill_type",
"earned_gamecoin_packet",
"earned_gamecoin_block",
"earned_blaster_energy",
"earned_extrack_energy", #
"hispeed",
"lanespeed",
"gauge_option",
"ars_option",
"notes_option",
"early_late_disp",
"draw_adjust",
"eff_c_left",
"eff_c_right",
"music_id",
"music_type",
"sort_type",
"narrow_down",
"headphone",
"start_option",
]
for node in nodes:
n = root.find(node)
if n is not None:
if node.startswith("earned_"):
game_profile[node] += int(n.text)
else:
game_profile[node] = int(n.text)
game_profile["used_packet_booster"] = int(root.find("ea_shop")[0].text)
game_profile["used_block_booster"] = int(root.find("ea_shop")[1].text)
game_profile["print_count"] = int(root.find("print")[0].text)
# item fix (copied from drs, this is regarded)
old_items = game_profile["items"]
items = {}
for old in old_items:
t = str(old[0])
i = str(old[1])
p = str(old[2])
if t not in items:
items[t] = {}
if i not in items[t]:
items[t][i] = {}
items[t][i] = p
for info in root.find("item"):
t = info.find("id").text
i = info.find("type").text
p = info.find("param").text
if t not in items:
items[t] = {}
if i not in items[t]:
items[t][i] = {}
items[t][i] = p
items_list = []
for t in items:
for i in items[t]:
items_list.append([int(t), int(i), int(items[t][i])])
game_profile["items"] = items_list
# param fix (copied from drs, this is regarded)
old_params = game_profile["params"]
params = {}
for old in old_params:
t = str(old[0])
i = str(old[1])
p = old[2]
if t not in params:
params[t] = {}
if i not in params[t]:
params[t][i] = {}
params[t][i] = p
for info in root.find("param"):
t = info.find("type").text
i = info.find("id").text
p = info.find("param")
if t not in params:
params[t] = {}
if i not in params[t]:
params[t][i] = {}
params[t][i] = [int(x) for x in p.text.split(" ")]
params_list = []
for t in params:
for i in params[t]:
params_list.append([int(t), int(i), params[t][i]])
game_profile["params"] = params_list
profile["version"][str(game_version)] = game_profile
get_db().table("sdvx_profile").upsert(profile, where("card") == dataid)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_save_m")
async def game_sv_save_m(ver: str, request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
timestamp = time.time()
root = request_info["root"][0]
dataid = root.find("dataid").text
profile = get_game_profile(dataid, game_version)
djid, djid_split = get_id_from_profile(dataid)
track = root.find("track")
play_id = int(track.find("play_id").text)
music_id = int(track.find("music_id").text)
music_type = int(track.find("music_type").text)
score = int(track.find("score").text)
exscore = int(track.find("exscore").text)
clear_type = int(track.find("clear_type").text)
score_grade = int(track.find("score_grade").text)
max_chain = int(track.find("max_chain").text)
just = int(track.find("just").text)
critical = int(track.find("critical").text)
near = int(track.find("near").text)
error = int(track.find("error").text)
effective_rate = int(track.find("effective_rate").text)
btn_rate = int(track.find("btn_rate").text)
long_rate = int(track.find("long_rate").text)
vol_rate = int(track.find("vol_rate").text)
mode = int(track.find("mode").text)
gauge_type = int(track.find("gauge_type").text)
notes_option = int(track.find("notes_option").text)
online_num = int(track.find("online_num").text)
local_num = int(track.find("local_num").text)
challenge_type = int(track.find("challenge_type").text)
retry_cnt = int(track.find("retry_cnt").text)
judge = [int(x) for x in track.find("judge").text.split(" ")]
db = get_db()
db.table("sdvx_scores").insert(
{
"timestamp": timestamp,
"game_version": game_version,
"sdvx_id": djid,
"play_id": play_id,
"music_id": music_id,
"music_type": music_type,
"score": score,
"exscore": exscore,
"clear_type": clear_type,
"score_grade": score_grade,
"max_chain": max_chain,
"just": just,
"critical": critical,
"near": near,
"error": error,
"effective_rate": effective_rate,
"btn_rate": btn_rate,
"long_rate": long_rate,
"vol_rate": vol_rate,
"mode": mode,
"gauge_type": gauge_type,
"notes_option": notes_option,
"online_num": online_num,
"local_num": local_num,
"challenge_type": challenge_type,
"retry_cnt": retry_cnt,
"judge": judge,
},
)
best = db.table("sdvx_scores_best").get(
(where("sdvx_id") == djid)
& (where("game_version") == game_version)
& (where("music_id") == music_id)
& (where("music_type") == music_type)
)
best = {} if best is None else best
best_score_data = {
"game_version": game_version,
"sdvx_id": djid,
"name": profile["name"],
"music_id": music_id,
"music_type": music_type,
"score": max(score, best.get("score", score)),
"exscore": max(exscore, best.get("exscore", exscore)),
"clear_type": max(clear_type, best.get("clear_type", clear_type)),
"score_grade": max(score_grade, best.get("score_grade", score_grade)),
"btn_rate": max(btn_rate, best.get("btn_rate", btn_rate)),
"long_rate": max(long_rate, best.get("long_rate", long_rate)),
"vol_rate": max(vol_rate, best.get("vol_rate", vol_rate)),
}
db.table("sdvx_scores_best").upsert(
best_score_data,
(where("sdvx_id") == djid)
& (where("game_version") == game_version)
& (where("music_id") == music_id)
& (where("music_type") == music_type),
)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_hiscore")
async def game_sv_hiscore(ver: str, request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
best_scores = []
db = get_db()
for record in db.table("sdvx_scores_best").search(
(where("game_version") == game_version)
):
best_scores.append(
[
record["music_id"],
record["music_type"],
record["sdvx_id"],
record["name"],
record["score"],
]
)
response = E.response(
E.game(
E.sc(
*[
E.d(
E.id(s[0], __type="u32"),
E.ty(s[1], __type="u32"),
E.a_sq(s[2], __type="str"),
E.a_nm(s[3], __type="str"),
E.a_sc(s[4], __type="u32"),
E.l_sq(s[2], __type="str"),
E.l_nm(s[3], __type="str"),
E.l_sc(s[4], __type="u32"),
)
for s in best_scores
],
),
),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_lounge")
async def game_sv_lounge(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(E.interval(30, __type="u32")),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_shop")
async def game_sv_shop(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(E.nxt_time(1000 * 5 * 60, __type="u32")),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_load_r")
async def game_sv_load_r(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_frozen")
async def game_sv_frozen(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_save_e")
async def game_sv_save_e(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_save_mega")
async def game_sv_save_mega(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_play_e")
async def game_sv_play_e(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_play_s")
async def game_sv_play_s(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_entry_s")
async def game_sv_entry_s(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_entry_e")
async def game_sv_entry_e(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/game/sv{ver}_log")
async def game_sv_log(ver: str, request: Request):
request_info = await core_process_request(request)
response = E.response(
E.game(),
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)