Clean up code

This commit is contained in:
clovervidia 2022-10-06 18:26:14 -04:00
parent 0cd65d41e9
commit 96dc1b406c
No known key found for this signature in database
GPG Key ID: E8DBF85CF7D599FE
3 changed files with 56 additions and 58 deletions

29
iksm.py
View File

@ -4,7 +4,6 @@
import requests, json, re, sys
import os, base64, hashlib
import uuid, time, random, string
from bs4 import BeautifulSoup
session = requests.Session()
@ -32,7 +31,7 @@ def get_nsoapp_version():
page = requests.get("https://apps.apple.com/us/app/nintendo-switch-online/id1234806557")
soup = BeautifulSoup(page.text, 'html.parser')
elt = soup.find("p", {"class": "whats-new__latest__version"})
ver = elt.get_text().replace("Version ","").strip()
ver = elt.get_text().replace("Version ", "").strip()
return ver
except:
return NSOAPP_VERSION
@ -106,7 +105,7 @@ def get_session_token(session_token_code, auth_code_verifier):
nsoapp_version = get_nsoapp_version()
app_head = {
'User-Agent': 'OnlineLounge/' + nsoapp_version + ' NASDKAPI Android',
'User-Agent': f'OnlineLounge/{nsoapp_version} NASDKAPI Android',
'Accept-Language': 'en-US',
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
@ -184,15 +183,15 @@ def get_gtoken(f_gen_url, session_token, ver):
# get access token
body = {}
try:
idToken = id_response["id_token"]
f, uuid, timestamp = call_imink_api(idToken, 1, f_gen_url)
id_token = id_response["id_token"]
f, uuid, timestamp = call_imink_api(id_token, 1, f_gen_url)
parameter = {
'f': f,
'language': user_lang,
'naBirthday': user_info["birthday"],
'naCountry': user_country,
'naIdToken': idToken,
'naIdToken': id_token,
'requestId': uuid,
'timestamp': timestamp
}
@ -212,7 +211,7 @@ def get_gtoken(f_gen_url, session_token, ver):
'Content-Length': str(990 + len(f)),
'Connection': 'Keep-Alive',
'Accept-Encoding': 'gzip',
'User-Agent': 'com.nintendo.znca/' + nsoapp_version + '(Android/7.1.2)',
'User-Agent': f'com.nintendo.znca/{nsoapp_version}(Android/7.1.2)',
}
url = "https://api-lp1.znc.srv.nintendo.net/v3/Account/Login"
@ -220,11 +219,11 @@ def get_gtoken(f_gen_url, session_token, ver):
splatoon_token = json.loads(r.text)
try:
idToken = splatoon_token["result"]["webApiServerCredential"]["accessToken"]
id_token = splatoon_token["result"]["webApiServerCredential"]["accessToken"]
except:
# retry once if 9403/9599 error from nintendo
try:
f, uuid, timestamp = call_imink_api(idToken, 1, f_gen_url)
f, uuid, timestamp = call_imink_api(id_token, 1, f_gen_url)
body["parameter"]["f"] = f
body["parameter"]["requestId"] = uuid
body["parameter"]["timestamp"] = timestamp
@ -232,31 +231,31 @@ def get_gtoken(f_gen_url, session_token, ver):
url = "https://api-lp1.znc.srv.nintendo.net/v3/Account/Login"
r = requests.post(url, headers=app_head, json=body)
splatoon_token = json.loads(r.text)
idToken = splatoon_token["result"]["webApiServerCredential"]["accessToken"]
id_token = splatoon_token["result"]["webApiServerCredential"]["accessToken"]
except:
print("Error from Nintendo (in Account/Login step):")
print(json.dumps(splatoon_token, indent=2))
print("Re-running the script usually fixes this.")
sys.exit(1)
f, uuid, timestamp = call_imink_api(idToken, 2, f_gen_url)
f, uuid, timestamp = call_imink_api(id_token, 2, f_gen_url)
# get web service token
app_head = {
'X-Platform': 'Android',
'X-ProductVersion': nsoapp_version,
'Authorization': f'Bearer {idToken}',
'Authorization': f'Bearer {id_token}',
'Content-Type': 'application/json; charset=utf-8',
'Content-Length': '391',
'Accept-Encoding': 'gzip',
'User-Agent': 'com.nintendo.znca/' + nsoapp_version + '(Android/7.1.2)'
'User-Agent': f'com.nintendo.znca/{nsoapp_version}(Android/7.1.2)'
}
body = {}
parameter = {
'f': f,
'id': 4834290508791808,
'registrationToken': idToken,
'registrationToken': id_token,
'requestId': uuid,
'timestamp': timestamp
}
@ -271,7 +270,7 @@ def get_gtoken(f_gen_url, session_token, ver):
except:
# retry once if 9403/9599 error from nintendo
try:
f, uuid, timestamp = call_imink_api(idToken, 2, f_gen_url)
f, uuid, timestamp = call_imink_api(id_token, 2, f_gen_url)
body["parameter"]["f"] = f
body["parameter"]["requestId"] = uuid
body["parameter"]["timestamp"] = timestamp

82
s3s.py
View File

@ -6,10 +6,7 @@
import argparse, datetime, json, os, shutil, re, requests, sys, time, uuid
import msgpack
from packaging import version
from PIL import Image
from io import BytesIO
import iksm, utils#, utils_ss
import iksm, utils
A_VERSION = "0.1.5"
@ -98,7 +95,7 @@ def headbutt():
'Origin': 'https://api.lp1.av5ja.srv.nintendo.net',
'X-Requested-With': 'com.nintendo.znca',
'Referer': f'https://api.lp1.av5ja.srv.nintendo.net/?lang={USER_LANG}&na_country={USER_COUNTRY}&na_lang={USER_LANG}',
'Accept-Encoding': 'gzip, deflate'
'Accept-Encoding': 'gzip, deflate'
}
return graphql_head
@ -138,7 +135,7 @@ def gen_new_tokens(reason, force=False):
if SESSION_TOKEN == "":
print("Please log in to your Nintendo Account to obtain your session_token.")
new_token = iksm.log_in(A_VERSION)
if new_token == None:
if new_token is None:
print("There was a problem logging you in. Please try again later.")
elif new_token == "skip":
manual_entry = True
@ -206,7 +203,7 @@ def fetch_json(which, separate=False, exportall=False, specific=False, numbers_o
queries.append("LatestBattleHistoriesQuery")
else:
queries.append(None)
if which == "both" or which == "salmon":
if which in ("both", "salmon"):
queries.append("CoopHistoryQuery")
else:
queries.append(None)
@ -214,7 +211,7 @@ def fetch_json(which, separate=False, exportall=False, specific=False, numbers_o
needs_sorted = False # https://ygdp.yale.edu/phenomena/needs-washed :D
for sha in queries:
if sha != None:
if sha is not None:
if DEBUG:
print(f"* making query1 to {sha}")
sha = utils.translate_rid[sha]
@ -279,12 +276,12 @@ def fetch_json(which, separate=False, exportall=False, specific=False, numbers_o
if needs_sorted: # put regular, bankara, and private in order, since they were exported in sequential chunks
try:
ink_list = [x for x in ink_list if x['data']['vsHistoryDetail'] != None] # just in case
ink_list = [x for x in ink_list if x['data']['vsHistoryDetail'] is not None] # just in case
ink_list = sorted(ink_list, key=lambda d: d['data']['vsHistoryDetail']['playedTime'])
except:
print("(!) Exporting without sorting results.json")
try:
salmon_list = [x for x in salmon_list if x['data']['coopHistoryDetail'] != None]
salmon_list = [x for x in salmon_list if x['data']['coopHistoryDetail'] is not None]
salmon_list = sorted(salmon_list, key=lambda d: d['data']['coopHistoryDetail']['playedTime'])
except:
print("(!) Exporting without sorting coop_results.json")
@ -341,7 +338,7 @@ def set_scoreboard(battle):
# https://github.com/fetus-hina/stat.ink/wiki/Spl3-API:-Post-v3-battle#player-structure
our_team_players, their_team_players = [], []
for i, player in list(enumerate(battle["myTeam"]["players"])):
for i, player in enumerate(battle["myTeam"]["players"]):
p_dict = {}
p_dict["me"] = "yes" if player["isMyself"] else "no"
p_dict["name"] = player["name"]
@ -353,7 +350,7 @@ def set_scoreboard(battle):
p_dict["weapon"] = utils.b64d(player["weapon"]["id"])
p_dict["inked"] = player["paint"]
p_dict["rank_in_team"] = i+1
if "result" in player and player["result"] != None:
if "result" in player and player["result"] is not None:
p_dict["kill_or_assist"] = player["result"]["kill"]
p_dict["assist"] = player["result"]["assist"]
p_dict["kill"] = p_dict["kill_or_assist"] - p_dict["assist"]
@ -364,7 +361,7 @@ def set_scoreboard(battle):
p_dict["disconnected"] = "yes"
our_team_players.append(p_dict)
for i, player in list(enumerate(battle["otherTeams"][0]["players"])): # no support for tricolor TW yet
for i, player in enumerate(battle["otherTeams"][0]["players"]): # no support for tricolor TW yet
p_dict = {}
p_dict["me"] = "no"
p_dict["name"] = player["name"]
@ -376,7 +373,7 @@ def set_scoreboard(battle):
p_dict["weapon"] = utils.b64d(player["weapon"]["id"])
p_dict["inked"] = player["paint"]
p_dict["rank_in_team"] = i+1
if "result" in player and player["result"] != None:
if "result" in player and player["result"] is not None:
p_dict["kill_or_assist"] = player["result"]["kill"]
p_dict["assist"] = player["result"]["assist"]
p_dict["kill"] = p_dict["kill_or_assist"] - p_dict["assist"]
@ -469,14 +466,14 @@ def prepare_battle_result(battle, ismonitoring, overview_data=None):
## WEAPON, K/D/A/S, TURF INKED ##
#################################
for i, player in list(enumerate(battle["myTeam"]["players"])): # specified again in set_scoreboard()
for i, player in enumerate(battle["myTeam"]["players"]): # specified again in set_scoreboard()
if player["isMyself"] == True:
payload["weapon"] = utils.b64d(player["weapon"]["id"])
payload["inked"] = player["paint"]
payload["species"] = player["species"].lower() # not supported for now
payload["rank_in_team"] = i+1
# ... = player["result"]["festDragonCert"] NONE, DRAGON, or DOUBLE_DRAGON - splatfest
if player["result"] != None: # null if player disconnect
if player["result"] is not None: # null if player disconnect
payload["kill_or_assist"] = player["result"]["kill"]
payload["assist"] = player["result"]["assist"]
payload["kill"] = payload["kill_or_assist"] - payload["assist"]
@ -490,7 +487,7 @@ def prepare_battle_result(battle, ismonitoring, overview_data=None):
result = battle["judgement"]
if result == "WIN":
payload["result"] = "win"
elif result == "LOSE" or result == "DEEMED_LOSE":
elif result in ("LOSE", "DEEMED_LOSE"):
payload["result"] = "lose"
elif result == "EXEMPTED_LOSE":
payload["result"] = "exempted_lose" # doesn't count toward stats
@ -541,11 +538,11 @@ def prepare_battle_result(battle, ismonitoring, overview_data=None):
except TypeError: # draw - 'result' is null
pass
payload["knockout"] = "no" if battle["knockout"] == None or battle["knockout"] == "NEITHER" else "yes"
payload["knockout"] = "no" if battle["knockout"] is None or battle["knockout"] == "NEITHER" else "yes"
payload["rank_exp_change"] = battle["bankaraMatch"]["earnedUdemaePoint"]
if overview_data or ismonitoring: # if we're passing in the overview.json file with -i, or monitoring mode
if overview_data == None:
if overview_data is None:
overview_post = requests.post(utils.GRAPHQL_URL,
data=utils.gen_graphql_body(utils.translate_rid["BankaraBattleHistoriesQuery"]),
headers=headbutt(),
@ -559,7 +556,7 @@ def prepare_battle_result(battle, ismonitoring, overview_data=None):
ranked_list = screen["data"]["latestBattleHistories"]["historyGroups"]["nodes"]
break
for parent in ranked_list: # groups in overview (ranked) JSON/screen
for idx, child in list(enumerate(parent["historyDetails"]["nodes"])):
for idx, child in enumerate(parent["historyDetails"]["nodes"]):
if child["id"] == battle["id"]: # found the battle ID in the other file
@ -569,7 +566,7 @@ def prepare_battle_result(battle, ismonitoring, overview_data=None):
payload["rank_before_s_plus"] = int(full_rank[1])
# anarchy battle (series) - not open
if "bankaraMatchChallenge" in parent and parent["bankaraMatchChallenge"] != None:
if "bankaraMatchChallenge" in parent and parent["bankaraMatchChallenge"] is not None:
# rankedup = parent["bankaraMatchChallenge"]["isUdemaeUp"]
ranks = ["c-", "c", "c+", "b-", "b", "b+", "a-", "a", "a+", "s"] # s+ handled separately
@ -580,7 +577,7 @@ def prepare_battle_result(battle, ismonitoring, overview_data=None):
else:
payload["rank_up_battle"] = "no"
if parent["bankaraMatchChallenge"]["udemaeAfter"] == None:
if parent["bankaraMatchChallenge"]["udemaeAfter"] is None:
payload["rank_after"] = payload["rank_before"]
else:
if idx != 0:
@ -597,7 +594,7 @@ def prepare_battle_result(battle, ismonitoring, overview_data=None):
payload["challenge_lose"] = parent["bankaraMatchChallenge"]["loseCount"]
# send exp change (gain)
if payload["rank_exp_change"] == None:
if payload["rank_exp_change"] is None:
payload["rank_exp_change"] = parent["bankaraMatchChallenge"]["earnedUdemaePoint"]
if DEBUG:
@ -652,11 +649,11 @@ def post_result(data, ismonitoring, isblackout, istestrun, overview_data=None):
if isinstance(data, list): # -o export format
try:
data = [x for x in data if x['data']['vsHistoryDetail'] != None] # avoid {'data': {'vsHistoryDetail': None}} error
data = [x for x in data if x['data']['vsHistoryDetail'] is not None] # avoid {'data': {'vsHistoryDetail': None}} error
results = sorted(data, key=lambda d: d['data']['vsHistoryDetail']['playedTime'])
except KeyError:
try:
data = [x for x in data if x['coopHistoryDetail'] != None]
data = [x for x in data if x['coopHistoryDetail'] is not None]
results = sorted(data, key=lambda d: d['data']['coopHistoryDetail']['playedTime'])
except KeyError: # unsorted - shouldn't happen
print("(!) Uploading without chronologically sorting results")
@ -727,7 +724,7 @@ def post_result(data, ismonitoring, isblackout, istestrun, overview_data=None):
print("Error uploading battle. Message from server:")
print(postbattle.content.decode('utf-8'))
elif time_uploaded <= time_now - 5: # give some leeway
print(f"Battle already uploaded - {headerloc}")
print(f"Battle already uploaded - {headerloc}")
else: # 200 OK
print(f"Battle uploaded to {headerloc}")
@ -810,7 +807,7 @@ def get_num_results(which):
def fetch_and_upload_single_result(hash, noun, ismonitoring, isblackout, istestrun):
'''Perform a GraphQL request for a single vsResultId/coopResultId and call post_result().'''
if noun == "battles" or noun == "battle":
if noun in ("battles", "battle"):
dict_key = "VsHistoryDetailQuery"
dict_key2 = "vsResultId"
else: # noun == "jobs" or "job"
@ -833,7 +830,7 @@ def check_if_missing(which, ismonitoring, isblackout, istestrun):
urls = []
# https://github.com/fetus-hina/stat.ink/wiki/Spl3-API:-Get-UUID-List-(for-s3s)
if which == "both" or which == "ink":
if which in ("both", "ink"):
urls.append("https://stat.ink/api/v3/s3s/uuid-list") # max 200 entries
else:
urls.append(None)
@ -845,7 +842,7 @@ def check_if_missing(which, ismonitoring, isblackout, istestrun):
noun = "battles" # first (and maybe only)
which = "ink"
for url in urls:
if url != None:
if url is not None:
printed = False
auth = {'Authorization': f'Bearer {API_KEY}'}
resp = requests.get(url, headers=auth) # no params = all: regular, bankara, private
@ -916,8 +913,8 @@ def monitor_battles(which, secs, isblackout, istestrun):
# ! fetch from online
ink_results, salmon_results = fetch_json(which, separate=True, numbers_only=True) # only numbers or it'd take a long time
if which == "both" or which == "ink":
for i, num in reversed(list(enumerate(ink_results))):
if which in ("both", "ink"):
for num in reversed(ink_results):
if num not in cached_battles:
# get the full battle data
result_post = requests.post(utils.GRAPHQL_URL,
@ -966,8 +963,8 @@ def monitor_battles(which, secs, isblackout, istestrun):
cached_battles.append(num)
post_result(result, True, isblackout, istestrun) # True = is monitoring mode
if which == "both" or which == "salmon":
for i, num in reversed(list(enumerate(salmon_results))):
if which in ("both", "salmon"):
for num in reversed(salmon_results):
if num not in cached_jobs:
# get the full job data
result_post = requests.post(utils.GRAPHQL_URL,
@ -1003,6 +1000,7 @@ def monitor_battles(which, secs, isblackout, istestrun):
print("Please run s3s again with " + '\033[91m' + "-r" + '\033[0m' + " to get these battles.")
print("Bye!")
class SquidProgress:
'''Display animation while waiting.'''
@ -1079,18 +1077,18 @@ def main():
sys.exit(1)
secs = -1
if n_value != None:
if n_value is not None:
try:
secs = int(parser_result.N)
except ValueError:
print("Number provided must be an integer. Exiting.")
sys.exit(1)
if secs < 0:
print("No.")
sys.exit(1)
print("No.")
sys.exit(1)
elif secs < 60:
print("Minimum number of seconds in monitoring mode is 60. Exiting.")
sys.exit(1)
print("Minimum number of seconds in monitoring mode is 60. Exiting.")
sys.exit(1)
# export results to file: -o
############################
@ -1106,17 +1104,17 @@ def main():
os.makedirs(export_dir)
print()
if parents != None:
if parents is not None:
with open(os.path.join(cwd, export_dir, "overview.json"), "x") as fout:
json.dump(parents, fout)
print("Created overview.json with general info about your battle and job stats.")
if results != None:
if results is not None:
with open(os.path.join(cwd, export_dir, "results.json"), "x") as fout:
json.dump(results, fout)
print("Created results.json with detailed recent battle stats (up to 50 of each type).")
if coop_results != None:
if coop_results is not None:
with open(os.path.join(cwd, export_dir, "coop_results.json"), "x") as fout:
json.dump(coop_results, fout)
print("Created coop_results.json with detailed recent Salmon Run job stats (up to 50).")
@ -1160,7 +1158,7 @@ def main():
to_upload = []
for battle in data:
if battle["data"]["vsHistoryDetail"] != None:
if battle["data"]["vsHistoryDetail"] is not None:
full_id = utils.b64d(battle["data"]["vsHistoryDetail"]["id"])
old_uuid = full_id[-36:] # not unique because nintendo hates us
new_uuid = str(uuid.uuid5(utils.S3S_NAMESPACE, full_id[-52:]))

View File

@ -1,6 +1,7 @@
# (ↄ) 2017-2022 eli fessler (frozenpandaman), clovervidia
# https://github.com/frozenpandaman/s3s
# License: GPLv3
import base64, datetime, json, re, requests, uuid
from bs4 import BeautifulSoup
@ -89,7 +90,7 @@ def gen_graphql_body(sha256hash, varname=None, varvalue=None):
"variables": {}
}
if varname != None and varvalue != None:
if varname is not None and varvalue is not None:
great_passage["variables"][varname] = varvalue
return json.dumps(great_passage)