Merge branch 'drmext:main' into main

This commit is contained in:
dog dawb 2026-04-10 16:48:14 +08:00 committed by GitHub
commit 1fa076c8cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 967 additions and 118 deletions

View File

@ -10,7 +10,7 @@ Run [start.bat (Windows)](start.bat) or [start.sh (Linux, MacOS)](start.sh)
## Playable Games
- IIDX 18-20, 29-33 (Online Arena/BPL support)
- DDR A20P, A3 (OmniMIX/GF, BPL, and [Fake PFREE](https://github.com/drmext/BemaniPatcher/blob/nopr/ddra3.html#L133) support)
- DDR A20P, A3, WORLD (OmniMIX/GF, BPL, and [Fake PFREE](https://github.com/drmext/BemaniPatcher/blob/nopr/ddra3.html#L133) support)
- GD 6-10 DELTA (Battle Mode support)
- DRS
- NOST 3

5
activate-venv.bat Normal file
View File

@ -0,0 +1,5 @@
@echo off
cd /d %~dp0
start .venv\Scripts\activate.bat

View File

@ -119,6 +119,8 @@ async def core_get_game_version_from_software_version(software_version):
return 1
elif model == "MDX":
if ext >= 2024061200 and ext not in (2024042069, 2025042069): # GF
return 20
if ext >= 2019022600: # ???
return 19

View File

@ -44,7 +44,7 @@ async def forward_slashless(
try:
game_code = model.split(":")[0]
# TODO: check for more edge cases
if game_code == "MDX" and module == "eventlog" or module == "eventlog_2":
if game_code == "MDX" and module.startswith("eventlo"):
find_response = globals()[f"ddr_{module}_{method}"]
elif game_code == "REC":
find_response = globals()[f"drs_{module}_{method}"]

View File

@ -13,7 +13,7 @@ from utils.lz77 import lz77_decode
import lxml.etree as ET
import json
import struct
from typing import Dict, List, Tuple
from typing import Optional, Dict, List, Tuple
from os import path
@ -25,26 +25,35 @@ class DDR_Profile_Main_Items(BaseModel):
pin: str
class DDR_Profile_Version_Items(BaseModel):
game_version: int
calories_disp: bool
character: str
arrow_skin: str
filter: str
guideline: str
priority: str
timing_disp: bool
common: str
option: str
last: str
rival: str
rival_1_ddr_id: int
rival_2_ddr_id: int
rival_3_ddr_id: int
single_grade: int
double_grade: int
class DDR_Profile_19_Items(BaseModel):
game_version: Optional[int]
calories_disp: Optional[bool]
character: Optional[str]
arrow_skin: Optional[str]
filter: Optional[str]
guideline: Optional[str]
priority: Optional[str]
timing_disp: Optional[bool]
common: Optional[str]
option: Optional[str]
last: Optional[str]
rival: Optional[str]
rival_1_ddr_id: Optional[int]
rival_2_ddr_id: Optional[int]
rival_3_ddr_id: Optional[int]
single_grade: Optional[int]
double_grade: Optional[int]
class DDR_Profile_20_Items(BaseModel):
game_version: Optional[int]
common_dancername: Optional[str]
common_area: Optional[str]
rival_1_ddr_id: Optional[int]
rival_2_ddr_id: Optional[int]
rival_3_ddr_id: Optional[int]
customize: Optional[dict]
@router.get("/profiles")
async def ddr_profiles():
return get_db().table("ddr_profile").all()
@ -68,32 +77,48 @@ async def ddr_profile_id_patch(ddr_id: str, item: DDR_Profile_Main_Items):
return Response(status_code=204)
@router.patch("/profiles/{ddr_id}/{version}")
async def ddr_profile_id_version_patch(
ddr_id: str, version: int, item: DDR_Profile_Version_Items
):
@router.patch("/profiles/{ddr_id}/19")
async def ddr_profile_id_19_patch(ddr_id: str, item: DDR_Profile_19_Items):
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
profile = get_db().table("ddr_profile").get(where("ddr_id") == ddr_id)
game_profile = profile["version"].get(str(version), {})
game_profile = profile["version"].get("19", {})
if version >= 19:
game_profile["game_version"] = item.game_version
game_profile["calories_disp"] = "On" if item.calories_disp else "Off"
game_profile["character"] = item.character
game_profile["arrow_skin"] = item.arrow_skin
game_profile["filter"] = item.filter
game_profile["guideline"] = item.guideline
game_profile["priority"] = item.priority
game_profile["timing_disp"] = "On" if item.timing_disp else "Off"
game_profile["common"] = item.common
game_profile["option"] = item.option
game_profile["last"] = item.last
game_profile["rival"] = item.rival
game_profile["rival_1_ddr_id"] = item.rival_1_ddr_id
game_profile["rival_2_ddr_id"] = item.rival_2_ddr_id
game_profile["rival_3_ddr_id"] = item.rival_3_ddr_id
game_profile["game_version"] = item.game_version
game_profile["calories_disp"] = "On" if item.calories_disp else "Off"
game_profile["character"] = item.character
game_profile["arrow_skin"] = item.arrow_skin
game_profile["filter"] = item.filter
game_profile["guideline"] = item.guideline
game_profile["priority"] = item.priority
game_profile["timing_disp"] = "On" if item.timing_disp else "Off"
game_profile["common"] = item.common
game_profile["option"] = item.option
game_profile["last"] = item.last
game_profile["rival"] = item.rival
game_profile["rival_1_ddr_id"] = item.rival_1_ddr_id
game_profile["rival_2_ddr_id"] = item.rival_2_ddr_id
game_profile["rival_3_ddr_id"] = item.rival_3_ddr_id
profile["version"][str(version)] = game_profile
profile["version"]["19"] = game_profile
get_db().table("ddr_profile").upsert(profile, where("ddr_id") == ddr_id)
return Response(status_code=204)
@router.patch("/profiles/{ddr_id}/20")
async def ddr_profile_id_20_patch(ddr_id: str, item: DDR_Profile_20_Items):
ddr_id = int("".join([i for i in ddr_id if i.isnumeric()]))
profile = get_db().table("ddr_profile").get(where("ddr_id") == ddr_id)
game_profile = profile["version"].get("20", {})
game_profile["game_version"] = item.game_version
game_profile["common_dancername"] = item.common_dancername
game_profile["common_area"] = item.common_area
game_profile["rival_1_ddr_id"] = item.rival_1_ddr_id
game_profile["rival_2_ddr_id"] = item.rival_2_ddr_id
game_profile["rival_3_ddr_id"] = item.rival_3_ddr_id
game_profile["customize"] = item.customize
profile["version"]["20"] = game_profile
get_db().table("ddr_profile").upsert(profile, where("ddr_id") == ddr_id)
return Response(status_code=204)

25
modules/ddr/eventlog_3.py Normal file
View File

@ -0,0 +1,25 @@
import config
from fastapi import APIRouter, Request, Response
from core_common import core_process_request, core_prepare_response, E
router = APIRouter(prefix="/local2", tags=["local2"])
router.model_whitelist = ["MDX"]
@router.post("/{gameinfo}/eventlog_3/write")
async def ddr_eventlog_3_write(request: Request):
request_info = await core_process_request(request)
response = E.response(
E.eventlog_3(
E.gamesession(9999999, __type="s64"),
E.logsendflg(1, __type="s32"),
E.logerrlevel(0, __type="s32"),
E.evtidnosendflg(0, __type="s32"),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)

712
modules/ddr/playdata_3.py Normal file
View File

@ -0,0 +1,712 @@
import random
import time
from tinydb import Query, where
import config
from fastapi import APIRouter, Request, Response
from core_common import core_process_request, core_prepare_response, E
from core_database import get_db
from os import path
import json
router = APIRouter(prefix="/local2", tags=["local2"])
router.model_whitelist = ["MDX"]
def get_profile(cid):
return get_db().table("ddr_profile").get(where("card") == cid)
def get_game_profile(cid, game_version):
profile = get_profile(cid)
return profile["version"].get(str(game_version), None)
mdb = {}
ddr_metadata = path.join("webui", "ddr.json")
if path.exists(ddr_metadata):
with open(ddr_metadata, "r", encoding="utf-8") as fp:
mdb = json.load(fp)
music_load = []
for i in sorted(list(mdb.keys()), reverse=True):
info = mdb[i]
for idx, lvl in enumerate(info["diffLv"]):
if int(lvl) != 0:
music_load.append(f"{i},{1 if idx > 4 else 0},{idx % 5},0,{lvl}")
load_settings = {
"common": {
"dancername": "str",
"area": "s32",
"extrastar": "s32",
"playcount": "s32",
"weight": "s32",
"today_cal": "u64",
"is_disp_weight": "bool",
"is_takeover": "bool",
"pre_playable_num": "s32",
"is_subscribed": "bool",
"popup_subscribe_enable": "bool",
"popup_subscribe_disable": "bool",
},
"option": {
"hispeed": "s32",
"gauge": "s32",
"fastslow": "s32",
"guideline": "s32",
"stepzone": "s32",
"timing_disp": "s32",
"visibility": "s32",
"visible_time": "s32",
"lane": "s32",
"lane_hiddenpos": "s32",
"lane_suddenpos": "s32",
"lane_hidsudpos": "s32",
"lane_filter": "s32",
"scroll_direction": "s32",
"scroll_moving": "s32",
"arrow_priority": "s32",
"arrow_placement": "s32",
"arrow_color": "s32",
"arrow_design": "s32",
"cut_timing": "s32",
"cut_freeze": "s32",
"cut_jump": "s32",
"speed_type": "s32",
"real_speed": "s32",
"lane_preview": "s32",
"combo_priority": "s32",
"judge_priority": "s32",
"judge_position": "s32",
"timing_music": "s32",
},
"lastplay": {
"mode": "s32",
"folder": "s32",
"mcode": "s32",
"style": "s32",
"difficulty": "s32",
"window_main": "s32",
"window_sub": "s32",
"target": "s32",
"tab_main": "s32",
"tab_sub": "s32",
"tab_main_graph_type": "s32",
"tab_main_graph_disp": "s32",
"tab_sub_graph_type": "s32",
"tab_sub_graph_disp": "s32",
},
"filtersort": {
"title": "u64",
"version": "u64",
"genre": "u64",
"bpm": "u64",
"event": "u64",
"level": "u64",
"flare_rank": "u64",
"clear_rank": "u64",
"flare_skill_target": "u64",
"rival_flare_skill": "u64",
"rival_score_rank": "u64",
"sort_type": "u64",
"order_type": "s32",
"is_quickmode": "bool",
"cleartype": "u64",
"difficulty": "u64",
},
"checkguide": {
"tips_basic": "u64",
"tips_option": "u64",
"tips_event": "u64",
"tips_gimmick": "u64",
"tips_advance": "u64",
"guide_scene": "u64",
},
"brave": {
"last_braveid": "s32",
"last_window_btn": "s32",
},
}
customize_settings = {
"1": {
"0": -1, #appeal_board
},
"2": {
"1": -1, #character_left
"2": -1, #character_right
},
"3": {
"1": -1, #game_bg_system
"2": -1, #game_bg_play
},
"4": {
"0": -1, #lane_bg_single
},
"5": {
"0": -1, #lane_bg_double
},
"6": {
"0": -1, #lane_cover_single
},
"7": {
"0": -1, #lane_cover_double
},
"8": {
"0": -1, #song_vid
},
}
flares = [
(995000, 10),
(990000, 9),
(980000, 8),
(970000, 7),
(960000, 6),
(955000, 5),
(930000, 4),
(900000, 3),
(850000, 2),
(800000, 1),
]
@router.post("/{gameinfo}/playdata_3/musicdata_load")
async def playdata_3_musicdata_load(request: Request):
request_info = await core_process_request(request)
if mdb:
response = E.response(
E.playdata_3(
E.result(0, __type="s32"),
E.servertime(round(time.time() * 1000), __type="u64"),
*[
E.music(
E.music_str(s, __type="str"),
)
for s in music_load
],
)
)
else:
response = E.response(
E.playdata_3(
E.result(0, __type="s32"),
E.servertime(round(time.time() * 1000), __type="u64"),
E.music(
E.music_str("", __type="str"),
),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/playdata_3/playerdata_load")
async def playdata_3_playerdata_load(request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
data = request_info["root"][0].find("data")
#mode = data.find("mode").text
#gamesession = data.find("gamesession").text
refid = data.find("refid").text
default = "X0000000000000000000000000000000"
all_scores = {}
if refid != default:
p = get_profile(refid)
if p is not None:
ddr_id = p["ddr_id"]
profile = get_game_profile(refid, game_version)
for record in get_db().table("ddr_scores_best").search(where("ddr_id") == ddr_id):
mcode = record["mcode"]
difficulty = int(record["difficulty"])
score = int(record["score"])
flare = int(record.get("flare_force", 0))
if flare in (-1, 0):
for k, v in flares:
if score >= k:
flare = v
break
if mcode not in all_scores:
all_scores[mcode] = {}
all_scores[mcode][difficulty] = (
f"{difficulty - 4 if difficulty > 4 else difficulty},"
f"1,{record['rank']},{record['lamp']},{score},"
f"{record['ghostid']},{flare},{flare},0"
)
else:
p = {}
profile = {}
response = E.response(
E.playdata_3(
E.result(0, __type="s32"),
E.refid(refid, __type="str"),
E.gamesession(1, __type="s64"),
E.servertime(round(time.time() * 1000), __type="u64"),
E.is_locked(0, __type="bool"),
E.common(
E.ddrcode(p.get("ddr_id", 0), __type="s32"),
E.dancername(profile.get("common_dancername", ""), __type="str"),
E.is_new(not profile, __type="bool"),
E.is_registering(0, __type="bool"),
E.area(profile.get("common_area", 13), __type="s32"),
E.extrastar(profile.get("common_extrastar", 0), __type="s32"),
E.playcount(profile.get("common_playcount", 0), __type="s32"),
E.weight(profile.get("common_weight", 0), __type="s32"),
E.today_cal(profile.get("common_today_cal", 0), __type="u64"),
E.is_disp_weight(profile.get("common_is_disp_weight", 0), __type="bool"),
E.is_takeover(0, __type="bool"),
E.pre_playable_num(1, __type="s32"),
E.is_subscribed(1, __type="bool"),
E.popup_subscribe_enable(0, __type="bool"),
E.popup_subscribe_disable(0, __type="bool"),
),
*[
E(k,
*[
E(v, profile.get(f"{k}_{v}", 0), __type=load_settings[k][v])
for v in load_settings[k]
],
)
for k in load_settings if k != "common"
],
*[
E.rival(
E.slot(i, __type="s32"),
E.rivalcode(profile.get(f"rival_{i}_ddr_id", 0), __type="s32"),
)
for i in range (1, 4)
],
*[
E.score(
E.mcode(int(mcode), __type="s32"),
*[
E.score_single(E.score_str(all_scores[mcode][difficulty], __type="str"))
for difficulty in all_scores[mcode] if difficulty < 5
],
*[
E.score_double(E.score_str(all_scores[mcode][difficulty], __type="str"))
for difficulty in all_scores[mcode] if difficulty > 4
],
)
for mcode in all_scores.keys()
],
E.event(
E.event_str("1,101,0,0,14,0,0", __type="str"),
),
*[
E.event(
E.event_str(f"{x},9999,0,0,0,0,0", __type="str"),
)
for x in [
e
for e in range(101, 1, -1)
if e not in [4, 6, 7, 8, 14, 47, 90]
]
],
#E.league(),
#E.current(),
*[
E.customize(
E.category(c, __type="s32"),
E.key(profile["customize"][c][p], __type="s32"),
E.pattern(p, __type="s32")
)
for c in profile.get("customize", {})
for p in profile.get("customize", {}).get(c, {})
],
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/playdata_3/rivaldata_load")
async def playdata_3_rivaldata_load(request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
data = request_info["root"][0].find("data")
loadflag = int(data.find("loadkind").text)
country = data.find("country").text
region = data.find("region").text
customercode = data.find("customercode").text
companycode = data.find("companycode").text
locationid = data.find("locationid").text
pcbid = data.find("pcbid").text
targettime = data.find("targettime").text
ddrcode = int(data.find("ddrcode").text)
db = get_db()
if loadflag == 1:
all_scores = {}
for record in db.table("ddr_scores").search(where("ddr_id") != 0):
ddr_id = record["ddr_id"]
mcode = record["mcode"]
difficulty = record["difficulty"]
score = record["score"]
if (mcode, difficulty) not in all_scores or score > all_scores[
(mcode, difficulty)
].get("score"):
all_scores[mcode, difficulty] = {
"game_version": game_version,
"ddr_id": ddr_id,
"mcode": mcode,
"difficulty": difficulty,
"rank": record["rank"],
"lamp": record["lamp"],
"score": score,
"exscore": record["exscore"],
"ghostid": record.doc_id,
}
scores = list(all_scores.values())
elif loadflag == 2:
all_scores = {}
for record in db.table("ddr_scores").search(
(where("shoparea") == region)
& (where("ddr_id") != 0)
):
ddr_id = record["ddr_id"]
mcode = record["mcode"]
difficulty = record["difficulty"]
score = record["score"]
if (mcode, difficulty) not in all_scores or score > all_scores[
(mcode, difficulty)
].get("score"):
all_scores[mcode, difficulty] = {
"game_version": game_version,
"ddr_id": ddr_id,
"mcode": mcode,
"difficulty": difficulty,
"rank": record["rank"],
"lamp": record["lamp"],
"score": score,
"exscore": record["exscore"],
"ghostid": record.doc_id,
}
scores = list(all_scores.values())
elif loadflag == 3:
all_scores = {}
for record in db.table("ddr_scores").search(
(where("pcbid") == pcbid)
& (where("ddr_id") != 0)
):
ddr_id = record["ddr_id"]
mcode = record["mcode"]
difficulty = record["difficulty"]
score = record["score"]
if (mcode, difficulty) not in all_scores or score > all_scores[
(mcode, difficulty)
].get("score"):
all_scores[mcode, difficulty] = {
"game_version": game_version,
"ddr_id": ddr_id,
"mcode": mcode,
"difficulty": difficulty,
"rank": record["rank"],
"lamp": record["lamp"],
"score": score,
"exscore": record["exscore"],
"ghostid": record.doc_id,
}
scores = list(all_scores.values())
elif loadflag == 4:
scores = []
for s in db.table("ddr_scores_best").search(where("ddr_id") == ddrcode):
scores.append(s)
names = {}
rival_records = []
profiles = db.table("ddr_profile")
for p in profiles:
names[p["ddr_id"]] = {}
try:
names[p["ddr_id"]]["name"] = p["version"][str(20)]["common_dancername"]
names[p["ddr_id"]]["area"] = int(p["version"][str(20)]["common_area"])
except KeyError:
try:
names[p["ddr_id"]]["name"] = p["version"][str(19)]["common"].split(",")[27]
names[p["ddr_id"]]["area"] = int(str(p["version"][str(19)]["common"].split(",")[3]), 16)
except KeyError:
names[p["ddr_id"]]["name"] = "UNKNOWN"
names[p["ddr_id"]]["area"] = 13
for r in scores:
diffi = r["difficulty"]
if diffi > 4:
style = 1
diffi -= 4
else:
style = 0
rival_records.append(f"{r["mcode"]},{style},{diffi},0,{names[r["ddr_id"]]["name"] if r["ddr_id"] in names else "UNKNOWN"},0,0,1,{r["score"]},{r["ghostid"]}")
response = E.response(
E.playdata_3(
E.result(0, __type="s32"),
*[
E.record(
E.record_str(s, __type="str")
)
for s in rival_records
]
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/playdata_3/playerdata_new")
async def playdata_3_playerdata_new(request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
data = request_info["root"][0].find("data")
refid = data.find("refid").text
db = get_db()
all_profiles_for_card = db.table("ddr_profile").get(Query().card == refid)
if "ddr_id" not in all_profiles_for_card:
ddr_id = random.randint(10000000, 99999999)
all_profiles_for_card["ddr_id"] = ddr_id
else:
ddr_id = all_profiles_for_card["ddr_id"]
tmp = {"game_version": game_version}
for k in load_settings:
for v in load_settings[k]:
tmp[f"{k}_" + v] = 0
tmp["rival_1_ddr_id"] = 0
tmp["rival_2_ddr_id"] = 0
tmp["rival_3_ddr_id"] = 0
tmp["customize"] = customize_settings
all_profiles_for_card["version"][str(game_version)] = tmp
db.table("ddr_profile").upsert(all_profiles_for_card, where("card") == refid)
response = E.response(
E.playdata_3(
E.result(0, __type="s32"),
E.refid(refid, __type="str"),
E.ddrcode(ddr_id, __type="s32"),
E.istakeover(0, __type="bool"),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/playdata_3/playerdata_save")
async def playdata_3_playerdata_save(request: Request):
request_info = await core_process_request(request)
game_version = request_info["game_version"]
retrycnt = int(request_info["root"][0].find("retrycnt").text)
data = request_info["root"][0].find("data")
refid = data.find("refid").text
savekind = int(data.find("savekind").text)
profile = get_profile(refid)
game_profile = get_game_profile(refid, game_version)
db = get_db()
if not refid.startswith("X000"):
if savekind in (1, 3):
for k in load_settings:
for v in load_settings[k]:
profile_setting = data.find(k).find(v)
if v == "playcount":
game_profile["common_playcount"] += 1
elif v.startswith("popup_subscribe"):
game_profile["common_" + v] = "0"
elif profile_setting is not None:
game_profile[f"{k}_" + v] = profile_setting.text
if "customize" not in game_profile:
game_profile["customize"] = customize_settings
profile["version"][str(game_version)] = game_profile
get_db().table("ddr_profile").upsert(profile, where("card") == refid)
elif savekind == 2 and retrycnt == 0:
timestamp = time.time()
ddr_id = int(data.find("common").find("ddrcode").text)
pcbid = data.find("pcbid").text
shoparea = data.find("region").text
n = data.find("result")
playstyle = int(n.find("style").text)
mcode = int(n.find("mcode").text)
diffi = int(n.find("difficulty").text)
difficulty = diffi + 4 if playstyle == 1 else diffi
rank = int(n.find("rank").text)
lamp = int(n.find("clearkind").text)
score = int(n.find("score").text)
exscore = int(n.find("exscore").text)
maxcombo = int(n.find("maxcombo").text)
#life = int(n.find("life").text)
fastcount = int(n.find("fastcount").text)
slowcount = int(n.find("slowcount").text)
judge_marvelous = int(n.find("judge_marv").text)
judge_perfect = int(n.find("judge_perf").text)
judge_great = int(n.find("judge_great").text)
judge_good = int(n.find("judge_good").text)
#judge_boo = int(n.find("judge_boo").text)
judge_miss = int(n.find("judge_miss").text)
judge_ok = int(n.find("judge_ok").text)
judge_ng = int(n.find("judge_ng").text)
calorie = int(n.find("calorie").text)
ghostsize = int(n.find("ghostsize").text)
ghost = n.find("ghost").text
flare_force = int(n.find("flare_force").text)
db.table("ddr_scores").insert(
{
"timestamp": timestamp,
"pcbid": pcbid,
"shoparea": shoparea,
"game_version": game_version,
"ddr_id": ddr_id,
"playstyle": playstyle,
"mcode": mcode,
"difficulty": difficulty,
"rank": rank,
"lamp": lamp,
"score": score,
"exscore": exscore,
"maxcombo": maxcombo,
"life": -1,
"fastcount": fastcount,
"slowcount": slowcount,
"judge_marvelous": judge_marvelous,
"judge_perfect": judge_perfect,
"judge_great": judge_great,
"judge_good": judge_good,
"judge_boo": 0,
"judge_miss": judge_miss,
"judge_ok": judge_ok,
"judge_ng": judge_ng,
"calorie": calorie,
"ghostsize": ghostsize,
"ghost": ghost,
"flare_force": flare_force,
},
)
best = db.table("ddr_scores_best").get(
(where("ddr_id") == ddr_id)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty)
)
best = {} if best is None else best
best_score_data = {
"game_version": game_version,
"ddr_id": ddr_id,
"playstyle": playstyle,
"mcode": mcode,
"difficulty": difficulty,
"rank": min(rank, best.get("rank", rank)),
"lamp": max(lamp, best.get("lamp", lamp)),
"score": max(score, best.get("score", score)),
"exscore": max(exscore, best.get("exscore", exscore)),
"flare_force": max(flare_force, best.get("flare_force", flare_force)),
}
ghostid = db.table("ddr_scores").get(
(where("ddr_id") == ddr_id)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty)
& (where("score") == max(score, best.get("score", score)))
)
if ghostid != None:
best_score_data["ghostid"] = ghostid.doc_id
else:
best_score_data["ghostid"] = -1
db.table("ddr_scores_best").upsert(
best_score_data,
(where("ddr_id") == ddr_id)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty),
)
response = E.response(
E.playdata_3(
E.result(0, __type="s32"),
)
)
else:
response = E.response(
E.playdata_3(
E.result(1, __type="s32"),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/playdata_3/ghostdata_load")
async def playdata_3_ghostdata_load(request: Request):
request_info = await core_process_request(request)
data = request_info["root"][0].find("data")
ghostid = int(data.find("ghostid").text)
record = get_db().table("ddr_scores").get(doc_id=ghostid)
response = E.response(
E.playdata_3(
E.result(0, __type="s32"),
E.ghostsize(record["ghostsize"], __type="s32"),
E.ghost(record["ghost"], __type="str"),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)
@router.post("/{gameinfo}/playdata_3/mergeddata_load")
async def playdata_3_mergeddata_load(request: Request):
request_info = await core_process_request(request)
response = E.response(
E.playdata_3(
E.result(0, __type="s32"),
E.league_class(0, __type="s32"),
E.is_advance_border_exceeded(0, __type="bool"),
E.is_exists_subscribed_user(0, __type="bool"),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)

View File

@ -58,6 +58,8 @@ async def playerdata_usergamedata_advanced(request: Request):
if "ddr_id" not in all_profiles_for_card:
ddr_id = random.randint(10000000, 99999999)
all_profiles_for_card["ddr_id"] = ddr_id
else:
ddr_id = all_profiles_for_card["ddr_id"]
all_profiles_for_card["version"][str(game_version)] = {
"game_version": game_version,
@ -95,13 +97,11 @@ async def playerdata_usergamedata_advanced(request: Request):
single_grade = profile.get("single_grade", 0)
double_grade = profile.get("double_grade", 0)
for record in db.table("ddr_scores_best").search(
(where("game_version") == game_version) & (where("ddr_id") == ddr_id)
):
for record in db.table("ddr_scores_best").search(where("ddr_id") == ddr_id):
mcode = record["mcode"]
difficulty = record["difficulty"]
if mcode not in all_scores:
all_scores[mcode] = [[0, 0, 0, 0, 0] for x in range(10)]
all_scores[mcode] = [[0, 0, 0, 0, 0] for x in range(9)] # not 10, there is no dp beginner
all_scores[mcode][difficulty] = [
1,
record["rank"],
@ -110,6 +110,25 @@ async def playerdata_usergamedata_advanced(request: Request):
record["ghostid"],
]
f = {}
for mcode in all_scores.keys():
if mcode not in f:
f[mcode] = []
for s in [score for score in all_scores.get(mcode)]:
if s[0] == 0:
f[mcode].append(E.note())
else:
f[mcode].append(
E.note(
E.count(s[0], __type="u16"),
E.rank(s[1], __type="u8"),
E.clearkind(s[2], __type="u8"),
E.score(s[3], __type="s32"),
E.ghostid(s[4], __type="s32"),
)
)
response = E.response(
E.playerdata(
E.result(0, __type="s32"),
@ -120,17 +139,10 @@ async def playerdata_usergamedata_advanced(request: Request):
E.music(
E.mcode(int(mcode), __type="u32"),
*[
E.note(
E.count(s[0], __type="u16"),
E.rank(s[1], __type="u8"),
E.clearkind(s[2], __type="u8"),
E.score(s[3], __type="s32"),
E.ghostid(s[4], __type="s32"),
)
for s in [score for score in all_scores.get(mcode)]
s for s in f.get(mcode)
],
)
for mcode in all_scores.keys()
for mcode in f.keys()
],
*[
E.eventdata(
@ -306,7 +318,6 @@ async def playerdata_usergamedata_advanced(request: Request):
best = db.table("ddr_scores_best").get(
(where("ddr_id") == ddr_id)
& (where("game_version") == game_version)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty)
)
@ -326,17 +337,18 @@ async def playerdata_usergamedata_advanced(request: Request):
ghostid = db.table("ddr_scores").get(
(where("ddr_id") == ddr_id)
& (where("game_version") == game_version)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty)
& (where("score") == max(score, best.get("score", score)))
)
best_score_data["ghostid"] = ghostid.doc_id
if ghostid != None:
best_score_data["ghostid"] = ghostid.doc_id
else:
best_score_data["ghostid"] = -1
db.table("ddr_scores_best").upsert(
best_score_data,
(where("ddr_id") == ddr_id)
& (where("game_version") == game_version)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty),
)
@ -386,8 +398,7 @@ async def playerdata_usergamedata_advanced(request: Request):
if loadflag == 1:
all_scores = {}
for record in db.table("ddr_scores").search(
(where("game_version") == game_version)
& (where("pcbid") == pcbid)
(where("pcbid") == pcbid)
& (where("ddr_id") != 0)
):
ddr_id = record["ddr_id"]
@ -414,8 +425,7 @@ async def playerdata_usergamedata_advanced(request: Request):
elif loadflag == 2:
all_scores = {}
for record in db.table("ddr_scores").search(
(where("game_version") == game_version)
& (where("shoparea") == shoparea)
(where("shoparea") == shoparea)
& (where("ddr_id") != 0)
):
ddr_id = record["ddr_id"]
@ -441,9 +451,7 @@ async def playerdata_usergamedata_advanced(request: Request):
elif loadflag == 4:
all_scores = {}
for record in db.table("ddr_scores").search(
(where("game_version") == game_version) & (where("ddr_id") != 0)
):
for record in db.table("ddr_scores").search(where("ddr_id") != 0):
ddr_id = record["ddr_id"]
mcode = record["mcode"]
difficulty = record["difficulty"]

View File

@ -58,6 +58,8 @@ async def playerdata_2_usergamedata_advanced(request: Request):
if "ddr_id" not in all_profiles_for_card:
ddr_id = random.randint(10000000, 99999999)
all_profiles_for_card["ddr_id"] = ddr_id
else:
ddr_id = all_profiles_for_card["ddr_id"]
all_profiles_for_card["version"][str(game_version)] = {
"game_version": game_version,
@ -93,9 +95,7 @@ async def playerdata_2_usergamedata_advanced(request: Request):
ddr_id = all_profiles_for_card["ddr_id"]
profile = get_game_profile(refid, game_version)
for record in db.table("ddr_scores_best").search(
(where("game_version") == game_version) & (where("ddr_id") == ddr_id)
):
for record in db.table("ddr_scores_best").search(where("ddr_id") == ddr_id):
mcode = record["mcode"]
difficulty = record["difficulty"]
if mcode not in all_scores:
@ -376,7 +376,6 @@ async def playerdata_2_usergamedata_advanced(request: Request):
best = db.table("ddr_scores_best").get(
(where("ddr_id") == ddr_id)
& (where("game_version") == game_version)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty)
)
@ -396,17 +395,18 @@ async def playerdata_2_usergamedata_advanced(request: Request):
ghostid = db.table("ddr_scores").get(
(where("ddr_id") == ddr_id)
& (where("game_version") == game_version)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty)
& (where("score") == max(score, best.get("score", score)))
)
best_score_data["ghostid"] = ghostid.doc_id
if ghostid != None:
best_score_data["ghostid"] = ghostid.doc_id
else:
best_score_data["ghostid"] = -1
db.table("ddr_scores_best").upsert(
best_score_data,
(where("ddr_id") == ddr_id)
& (where("game_version") == game_version)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty),
)
@ -468,8 +468,7 @@ async def playerdata_2_usergamedata_advanced(request: Request):
if loadflag == 1:
all_scores = {}
for record in db.table("ddr_scores").search(
(where("game_version") == game_version)
& (where("pcbid") == pcbid)
(where("pcbid") == pcbid)
& (where("ddr_id") != 0)
):
ddr_id = record["ddr_id"]
@ -496,8 +495,7 @@ async def playerdata_2_usergamedata_advanced(request: Request):
elif loadflag == 2:
all_scores = {}
for record in db.table("ddr_scores").search(
(where("game_version") == game_version)
& (where("shoparea") == shoparea)
(where("shoparea") == shoparea)
& (where("ddr_id") != 0)
):
ddr_id = record["ddr_id"]
@ -523,9 +521,7 @@ async def playerdata_2_usergamedata_advanced(request: Request):
elif loadflag == 4:
all_scores = {}
for record in db.table("ddr_scores").search(
(where("game_version") == game_version) & (where("ddr_id") != 0)
):
for record in db.table("ddr_scores").search(where("ddr_id") != 0):
ddr_id = record["ddr_id"]
mcode = record["mcode"]
difficulty = record["difficulty"]

24
modules/ddr/system_3.py Normal file
View File

@ -0,0 +1,24 @@
from fastapi import APIRouter, Request, Response
from core_common import core_process_request, core_prepare_response, E
import utils.card as conv
router = APIRouter(prefix="/local2", tags=["local2"])
router.model_whitelist = ["MDX"]
@router.post("/{gameinfo}/system_3/convcardnumber")
async def system_3_convcardnumber(request: Request):
request_info = await core_process_request(request)
cid = request_info["root"][0].find("data/card_id").text
response = E.response(
E.system_3(
E.data(E.card_number(conv.to_konami_id(cid), __type="str")),
E.result(0, __type="s32"),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)

View File

@ -0,0 +1,23 @@
import config
from fastapi import APIRouter, Request, Response
from core_common import core_process_request, core_prepare_response, E
router = APIRouter(prefix="/local2", tags=["local2"])
router.model_whitelist = ["MDX"]
@router.post("/{gameinfo}/wordcheck_3/tabooword_check")
async def wordcheck_3_tabooword_check(request: Request):
request_info = await core_process_request(request)
response = E.response(
E.wordcheck_3(
E.result(0, __type="s32"),
E.is_taboo(0, __type="bool"),
)
)
response_body, response_headers = await core_prepare_response(request, response)
return Response(content=response_body, headers=response_headers)

View File

@ -191,7 +191,8 @@ async def iidx33gamesystem_systeminfo(request: Request):
E.WorldTourismOpenList(val=-1),
E.OldBPLBattleOpenPhase(val=1),
E.BPLBattleOpenPhase(val=3),
E.beat(val=0),
E.VocaloidEvent(val=1),
# E.beat(val=5293),
)
)

View File

@ -29,9 +29,9 @@ Instructions:
### [import_ddr_spice_automap.py](import_ddr_spice_automap.py)
Example: `python utils\db\import_ddr_spice_automap.py --automap_xml automap_0.xml --version 19 --monkey_db db.json --ddr_id 12345678`
Example: `python utils\db\import_ddr_spice_automap.py --automap_xml automap_0.xml --version 3 --monkey_db db.json --ddr_id 12345678`
- `--version` 19 for A20P or 20 for A3
- `--version` 1 for A20P, 2 for A3, 3 for WORLD (automap source version, not destination)
- `--ddr_id` destination profile in db.json

View File

@ -24,43 +24,72 @@ def main(automap_xml, version, monkey_db, ddr_id):
if profile == None:
raise SystemExit(f"ERROR: DDR profile {ddr_id} not in {monkey_db}")
game_version = 19
if profile["version"].get(str(game_version), None) == None:
raise SystemExit(
f"ERROR: DDR profile {ddr_id} version {game_version} not in {monkey_db}"
)
scores = []
with open(automap_xml, "rb") as fp:
automap_0 = fp.read().split(b"\n\n")
if version == 19:
playerdata = "playerdata"
else:
if version == 3:
playerdata = "playdata_3"
game_version = 20
elif version == 2:
playerdata = "playerdata_2"
game_version = 19
elif version == 1:
playerdata = "playerdata"
game_version = 19
scores = []
scores_xml = False
for xml in automap_0:
tree = ET.ElementTree(ET.fromstring(xml.decode(encoding="shift-jis")))
root = tree.getroot()
if scores_xml:
for music in root.findall(f"{playerdata}/music"):
mcode = int(music.find("mcode").text)
for difficulty, chart in enumerate(music.findall("note")):
if int(chart.find("count").text) > 0:
rank = int(chart.find("rank").text)
clearkind = int(chart.find("clearkind").text)
score = int(chart.find("score").text)
try:
tree = ET.ElementTree(ET.fromstring(xml.decode(encoding="shift-jis")))
root = tree.getroot()
except:
continue
if version in (1, 2):
if scores_xml:
for music in root.findall(f"{playerdata}/music"):
mcode = int(music.find("mcode").text)
for difficulty, chart in enumerate(music.findall("note")):
c = chart.find("count")
if c == None:
continue
if int(c.text) > 0:
rank = int(chart.find("rank").text)
clearkind = int(chart.find("clearkind").text)
score = int(chart.find("score").text)
scores.append([mcode, difficulty, rank, clearkind, score, -1])
break
else:
try:
if root.find(f"{playerdata}/data/mode").text == "userload":
if len(root.find(f"{playerdata}/data/refid").text) == 16:
scores_xml = True
except AttributeError:
continue
elif version == 3:
if scores_xml:
for music in root.findall(f"{playerdata}/score"):
mcode = int(music.find("mcode").text)
for x in music.findall("score_single") + music.findall("score_double"):
s = x.find("score_str").text.split(",")
s = [int(val) for val in s]
difficulty = s[0] + 4 if x.tag == "score_double" else s[0]
rank = s[2]
clearkind = s[3]
score = s[4]
# flare = s[6]
scores.append([mcode, difficulty, rank, clearkind, score])
break
else:
try:
if root.find(f"{playerdata}/data/mode").text == "userload":
if len(root.find(f"{playerdata}/data/refid").text) == 16:
scores_xml = True
except AttributeError:
continue
break
else:
try:
a = root.find(f"{playerdata}")
if "method" in a.attrib:
if a.attrib["method"] == "playerdata_load":
if len(root.find(f"{playerdata}/data/refid").text) == 16:
scores_xml = True
except AttributeError:
continue
total_count = len(scores)
@ -73,6 +102,7 @@ def main(automap_xml, version, monkey_db, ddr_id):
rank = s[2]
lamp = s[3]
score = s[4]
# flare = s[5]
exscore = 0
print(
@ -81,7 +111,6 @@ def main(automap_xml, version, monkey_db, ddr_id):
best = db.table("ddr_scores_best").get(
(where("ddr_id") == ddr_id)
& (where("game_version") == game_version)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty)
)
@ -97,14 +126,14 @@ def main(automap_xml, version, monkey_db, ddr_id):
"lamp": max(lamp, best.get("lamp", lamp)),
"score": max(score, best.get("score", score)),
"exscore": max(exscore, best.get("exscore", exscore)),
# "flare_force": max(flare, best.get("flare_force", flare)),
}
ghostid = db.table("ddr_scores").get(
(where("ddr_id") == ddr_id)
& (where("game_version") == game_version)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty)
& (where("exscore") == best.get("exscore", exscore))
& (where("score") == max(score, best.get("score", score)))
)
if ghostid:
best_score_data["ghostid"] = ghostid.doc_id
@ -114,7 +143,6 @@ def main(automap_xml, version, monkey_db, ddr_id):
db.table("ddr_scores_best").upsert(
best_score_data,
(where("ddr_id") == ddr_id)
& (where("game_version") == game_version)
& (where("mcode") == mcode)
& (where("difficulty") == difficulty),
)
@ -129,8 +157,8 @@ if __name__ == "__main__":
parser.add_argument("--automap_xml", help="Input xml file", required=True)
parser.add_argument(
"--version",
help="19 is A20P, 20 is A3",
default=19,
help="1=A20P, 2=A3, 3=WORLD (automap_xml source version, not destination)",
default=3,
type=int,
)
parser.add_argument("--monkey_db", help="Output json file", required=True)