Merge pull request #270 from sepalani/nas

NAS - Refactoring
This commit is contained in:
polaris- 2016-04-06 21:48:56 -04:00
commit cf12519350
4 changed files with 551 additions and 462 deletions

View File

@ -48,6 +48,7 @@
<Compile Include="gamespy\gs_query.py" />
<Compile Include="gamespy\gs_utility.py" />
<Compile Include="gamespy\__init__.py" />
<Compile Include="other\dlc.py" />
<Compile Include="other\utils.py" />
<Compile Include="other\__init__.py" />
<Compile Include="tools\import_wiimm_data.py" />

View File

@ -20,75 +20,281 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import base64
import logging
import time
import urlparse
import BaseHTTPServer
import SocketServer
import os
import random
import traceback
import gamespy.gs_database as gs_database
import other.utils as utils
from gamespy import gs_database
from other import dlc, utils
import dwc_config
logger = dwc_config.get_logger('NasServer')
# If a game from this list requests a file listing, the server will return
# that only one exists and return a random one.
# This is used for Mystery Gift distribution on Generation 4 Pokemon games
gamecodes_return_random_file = [
'ADAD',
'ADAE',
'ADAF',
'ADAI',
'ADAJ',
'ADAK',
'ADAS',
'CPUD',
'CPUE',
'CPUF',
'CPUI',
'CPUJ',
'CPUK',
'CPUS',
'IPGD',
'IPGE',
'IPGF',
'IPGI',
'IPGJ',
'IPGK',
'IPGS'
]
address = dwc_config.get_ip_port('NasServer')
def handle_post(handler, addr, post):
"""Handle unknown path."""
logger.log(logging.WARNING, "Unknown path request %s from %s:%d!",
handler.path, *addr)
handler.send_response(404)
return None
class NasServer(object):
def start(self):
httpd = NasHTTPServer(address, NasHTTPServerHandler)
logger.log(logging.INFO, "Now listening for connections on %s:%d...",
*address)
httpd.serve_forever()
def handle_ac_action(handler, db, addr, post):
"""Handle unknown ac action request."""
logger.log(logging.WARNING, "Unknown ac action: %s", handler.path)
return {}
class NasHTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
def __init__(self, server_address, RequestHandlerClass):
# self.db = gs_database.GamespyDatabase()
BaseHTTPServer.HTTPServer.__init__(self, server_address,
RequestHandlerClass)
def handle_ac_acctcreate(handler, db, addr, post):
"""Handle ac acctcreate request.
TODO: Test for duplicate accounts.
"""
if db.is_banned(post):
ret = {
"retry": "1",
"returncd": "3913",
"locator": "gamespy.com",
"reason": "User banned."
}
logger.log(logging.DEBUG, "Acctcreate denied for banned user %s",
str(post))
else:
ret = {
"retry": "0",
"returncd": "002",
"userid": db.get_next_available_userid()
}
logger.log(logging.DEBUG, "Acctcreate response to %s:%d", *addr)
logger.log(logging.DEBUG, "%s", ret)
return ret
def handle_ac_login(handler, db, addr, post):
"""Handle ac login request."""
if db.is_banned(post):
ret = {
"retry": "1",
"returncd": "3914",
"locator": "gamespy.com",
"reason": "User banned."
}
logger.log(logging.DEBUG, "Login denied for banned user %s", str(post))
# Un-comment these lines to enable console registration feature
# elif not db.pending(post):
# logger.log(logging.DEBUG, "Login denied - Unknown console %s", post)
# ret = {
# "retry": "1",
# "returncd": "3921",
# "locator": "gamespy.com",
# }
# elif not db.registered(post):
# logger.log(logging.DEBUG, "Login denied - console pending %s", post)
# ret = {
# "retry": "1",
# "returncd": "3888",
# "locator": "gamespy.com",
# }
else:
challenge = utils.generate_random_str(8)
post["challenge"] = challenge
authtoken = db.generate_authtoken(post["userid"], post)
ret = {
"retry": "0",
"returncd": "001",
"locator": "gamespy.com",
"challenge": challenge,
"token": authtoken,
}
logger.log(logging.DEBUG, "Login response to %s:%d", *addr)
logger.log(logging.DEBUG, "%s", ret)
return ret
def handle_ac_svcloc(handler, db, addr, post):
"""Handle ac svcloc request."""
# Get service based on service id number
ret = {
"retry": "0",
"returncd": "007",
"statusdata": "Y"
}
authtoken = db.generate_authtoken(post["userid"], post)
if 'svc' in post:
if post["svc"] in ("9000", "9001"):
# DLC host = 9000
# In case the client's DNS isn't redirecting to
# dls1.nintendowifi.net
ret["svchost"] = handler.headers['host']
# Brawl has 2 host headers which Apache chokes
# on, so only return the first one or else it
# won't work
ret["svchost"] = ret["svchost"].split(',')[0]
if post["svc"] == 9000:
ret["token"] = authtoken
else:
ret["servicetoken"] = authtoken
elif post["svc"] == "0000":
# Pokemon requests this for some things
ret["servicetoken"] = authtoken
ret["svchost"] = "n/a"
else:
# Empty svc - Fix Error Code 24101 (Boom Street)
ret["svchost"] = "n/a"
ret["servicetoken"] = authtoken
logger.log(logging.DEBUG, "Svcloc response to %s:%d", *addr)
logger.log(logging.DEBUG, "%s", ret)
return ret
def handle_ac(handler, addr, post):
"""Handle ac POST request."""
logger.log(logging.DEBUG, "Ac request to %s from %s:%d",
handler.path, *addr)
logger.log(logging.DEBUG, "%s", post)
action = str(post["action"]).lower()
command = handler.ac_actions.get(action, handle_ac_action)
ret = command(handler, gs_database.GamespyDatabase(), addr, post)
ret.update({"datetime": time.strftime("%Y%m%d%H%M%S")})
handler.send_response(200)
handler.send_header("Content-type", "text/plain")
handler.send_header("NODE", "wifiappe1")
return utils.dict_to_qs(ret)
def handle_pr(handler, addr, post):
"""Handle pr POST request."""
logger.log(logging.DEBUG, "Pr request to %s from %s:%d",
handler.path, *addr)
logger.log(logging.DEBUG, "%s", post)
words = len(post["words"].split('\t'))
wordsret = "0" * words
ret = {
"prwords": wordsret,
"returncd": "000",
"datetime": time.strftime("%Y%m%d%H%M%S")
}
for l in "ACEJKP":
ret["prwords" + l] = wordsret
handler.send_response(200)
handler.send_header("Content-type", "text/plain")
handler.send_header("NODE", "wifiappe1")
logger.log(logging.DEBUG, "Pr response to %s:%d", *addr)
logger.log(logging.DEBUG, "%s", ret)
return utils.dict_to_qs(ret)
def handle_download_action(handler, dlc_path, post):
"""Handle unknown download action request."""
logger.log(logging.WARNING, "Unknown download action: %s", handler.path)
handler.send_response(200)
return None
def handle_download_count(handler, dlc_path, post):
"""Handle download count request."""
ret = dlc.download_count(dlc_path, post)
handler.send_response(200)
handler.send_header("Content-type", "text/plain")
handler.send_header("X-DLS-Host", "http://127.0.0.1/")
return ret
def handle_download_list(handler, dlc_path, post):
"""Handle download list request."""
ret = dlc.download_list(dlc_path, post)
handler.send_response(200)
handler.send_header("Content-type", "text/plain")
handler.send_header("X-DLS-Host", "http://127.0.0.1/")
return ret
def handle_download_contents(handler, dlc_path, post):
"""Handle download contents request."""
ret = dlc.download_contents(dlc_path, post)
if ret is None:
handler.send_response(404)
else:
handler.send_response(200)
handler.send_header("Content-type", "application/x-dsdl")
handler.send_header("Content-Disposition",
'attachment; filename="%s"' % post["contents"])
handler.send_header("X-DLS-Host", "http://127.0.0.1/")
return ret
def handle_download(handler, addr, post):
"""Handle download POST request."""
logger.log(logging.DEBUG, "Download request to %s from %s:%d",
handler.path, *addr)
logger.log(logging.DEBUG, "%s", post)
action = str(post["action"]).lower()
dlc_dir = os.path.abspath("dlc")
dlc_path = os.path.abspath(os.path.join("dlc", post["gamecd"]))
if os.path.commonprefix([dlc_dir, dlc_path]) != dlc_dir:
logging.log(logging.WARNING,
'Attempted directory traversal attack "%s",'
' cancelling.', dlc_path)
handler.send_response(403)
return
command = handler.download_actions.get(action, handle_download_action)
ret = command(handler, dlc_path, post)
logger.log(logging.DEBUG, "Download response to %s:%d", *addr)
return ret
class NasHTTPServerHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""Nintendo NAS server handler."""
post_paths = {
"/ac": handle_ac,
"/pr": handle_pr,
"/download": handle_download
}
ac_actions = {
"acctcreate": handle_ac_acctcreate,
"login": handle_ac_login,
"svcloc": handle_ac_svcloc,
}
download_actions = {
"count": handle_download_count,
"list": handle_download_list,
"contents": handle_download_contents,
}
def version_string(self):
return "Nintendo Wii (http)"
def do_GET(self):
self.server = lambda: None
self.server.db = gs_database.GamespyDatabase()
"""Handle GET request."""
try:
# conntest server
self.send_response(200)
@ -98,435 +304,43 @@ class NasHTTPServerHandler(BaseHTTPServer.BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write("ok")
except:
logger.log(logging.ERROR, "Unknown exception: %s",
traceback.format_exc())
logger.log(logging.ERROR, "Exception occurred on GET request!")
logger.log(logging.ERROR, "%s", traceback.format_exc())
def do_POST(self):
self.server = lambda: None
self.server.db = gs_database.GamespyDatabase()
try:
length = int(self.headers['content-length'])
post = self.str_to_dict(self.rfile.read(length))
if self.client_address[0] == '127.0.0.1':
client_address = (
self.headers.get('x-forwarded-for',
self.client_address[0]),
self.client_address[1]
)
else:
client_address = self.client_address
post = utils.qs_to_dict(self.rfile.read(length))
client_address = (
self.headers.get('x-forwarded-for', self.client_address[0]),
self.client_address[1]
)
post['ipaddr'] = client_address[0]
if self.path == "/ac":
logger.log(logging.DEBUG, "Request to %s from %s",
self.path, client_address)
logger.log(logging.DEBUG, "%s", post)
ret = {
"datetime": time.strftime("%Y%m%d%H%M%S"),
"retry": "0"
}
action = post["action"]
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.send_header("NODE", "wifiappe1")
command = self.post_paths.get(self.path, handle_post)
ret = command(self, client_address, post)
if action == "acctcreate":
# TODO: test for duplicate accounts
if self.server.db.is_banned(post):
logger.log(logging.DEBUG,
"acctcreate denied for banned user %s",
str(post))
ret = {
"datetime": time.strftime("%Y%m%d%H%M%S"),
"returncd": "3913",
"locator": "gamespy.com",
"retry": "1",
"reason": "User banned."
}
else:
ret["returncd"] = "002"
ret['userid'] = \
self.server.db.get_next_available_userid()
logger.log(logging.DEBUG,
"acctcreate response to %s",
client_address)
logger.log(logging.DEBUG, "%s", ret)
ret = self.dict_to_str(ret)
elif action == "login":
if self.server.db.is_banned(post):
logger.log(logging.DEBUG,
"login denied for banned user %s",
str(post))
ret = {
"datetime": time.strftime("%Y%m%d%H%M%S"),
"returncd": "3914",
"locator": "gamespy.com",
"retry": "1",
"reason": "User banned."
}
# Un-comment these lines to enable console registration
# feature
# elif not self.server.db.pending(post):
# logger.log(logging.DEBUG,
# "Login denied - Unknown console %s",
# post)
# ret = {
# "datetime": time.strftime("%Y%m%d%H%M%S"),
# "returncd": "3921",
# "locator": "gamespy.com",
# "retry": "1",
# }
# elif not self.server.db.registered(post):
# logger.log(logging.DEBUG,
# "Login denied - console pending %s",
# post)
# ret = {
# "datetime": time.strftime("%Y%m%d%H%M%S"),
# "returncd": "3888",
# "locator": "gamespy.com",
# "retry": "1",
# }
else:
challenge = utils.generate_random_str(8)
post["challenge"] = challenge
authtoken = self.server.db.generate_authtoken(
post["userid"],
post
)
ret.update({
"returncd": "001",
"locator": "gamespy.com",
"challenge": challenge,
"token": authtoken,
})
logger.log(logging.DEBUG, "login response to %s",
client_address)
logger.log(logging.DEBUG, "%s", ret)
ret = self.dict_to_str(ret)
elif action == "SVCLOC" or action == "svcloc":
# Get service based on service id number
ret["returncd"] = "007"
ret["statusdata"] = "Y"
authtoken = self.server.db.generate_authtoken(
post["userid"],
post
)
if 'svc' in post:
if post["svc"] in ("9000", "9001"):
# DLC host = 9000
# In case the client's DNS isn't redirecting to
# dls1.nintendowifi.net
ret["svchost"] = self.headers['host']
# Brawl has 2 host headers which Apache chokes
# on, so only return the first one or else it
# won't work
ret["svchost"] = ret["svchost"].split(',')[0]
if post["svc"] == 9000:
ret["token"] = authtoken
else:
ret["servicetoken"] = authtoken
elif post["svc"] == "0000":
# Pokemon requests this for some things
ret["servicetoken"] = authtoken
ret["svchost"] = "n/a"
else:
# Empty svc - Fix Error Code 24101 (Boom Street)
ret["svchost"] = "n/a"
ret["servicetoken"] = authtoken
logger.log(logging.DEBUG, "svcloc response to %s",
client_address)
logger.log(logging.DEBUG, "%s", ret)
ret = self.dict_to_str(ret)
else:
logger.log(logging.WARNING,
"Unknown action request %s from %s!",
self.path, client_address)
elif self.path == "/pr":
logger.log(logging.DEBUG, "Request to %s from %s",
self.path, client_address)
logger.log(logging.DEBUG, "%s", post)
words = len(post["words"].split('\t'))
wordsret = "0" * words
ret = {
"prwords": wordsret,
"returncd": "000",
"datetime": time.strftime("%Y%m%d%H%M%S")
}
for l in "ACEJKP":
ret["prwords" + l] = wordsret
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.send_header("NODE", "wifiappe1")
logger.log(logging.DEBUG, "pr response to %s", client_address)
logger.log(logging.DEBUG, "%s", ret)
ret = self.dict_to_str(ret)
elif self.path == "/download":
logger.log(logging.DEBUG, "Request to %s from %s",
self.path, client_address)
logger.log(logging.DEBUG, "%s", post)
action = post["action"]
ret = ""
dlcdir = os.path.abspath('dlc')
dlcpath = os.path.abspath("dlc/" + post["gamecd"])
dlc_contenttype = False
if os.path.commonprefix([dlcdir, dlcpath]) != dlcdir:
logging.log(logging.WARNING,
'Attempted directory traversal attack "%s",'
' cancelling.', dlcpath)
self.send_response(403)
return
def safeloadfi(fn, mode='rb'):
"""safeloadfi : string -> string
Safely load contents of a file, given a filename,
and closing the file afterward.
"""
with open(os.path.join(dlcpath, fn), mode) as fi:
return fi.read()
if action == "count":
if post["gamecd"] in gamecodes_return_random_file:
ret = "1"
else:
count = 0
if os.path.exists(dlcpath):
count = len(os.listdir(dlcpath))
if os.path.isfile(dlcpath + "/_list.txt"):
attr1 = post.get("attr1", None)
attr2 = post.get("attr2", None)
attr3 = post.get("attr3", None)
dlcfi = safeloadfi("_list.txt")
lst = self.filter_list(dlcfi,
attr1, attr2, attr3)
count = self.get_file_count(lst)
ret = "%d" % count
if action == "list":
num = post.get("num", None)
offset = post.get("offset", None)
if num is not None:
num = int(num)
if offset is not None:
offset = int(offset)
attr1 = post.get("attr1", None)
attr2 = post.get("attr2", None)
attr3 = post.get("attr3", None)
if os.path.exists(dlcpath):
# Look for a list file first.
# If the list file exists, send the entire thing back
# to the client.
if os.path.isfile(os.path.join(dlcpath, "_list.txt")):
if post["gamecd"].startswith("IRA") and \
attr1.startswith("MYSTERY"):
# Pokemon BW Mystery Gifts, until we have a
# better solution for that
ret = self.filter_list(
safeloadfi("_list.txt"),
attr1, attr2, attr3
)
ret = self.filter_list_g5_mystery_gift(
ret,
post["rhgamecd"]
)
ret = self.filter_list_by_date(
ret,
post["token"]
)
elif post["gamecd"] in \
gamecodes_return_random_file:
# Pokemon Gen 4 Mystery Gifts, same here
ret = self.filter_list(
safeloadfi("_list.txt"),
attr1, attr2, attr3
)
ret = self.filter_list_by_date(
ret,
post["token"]
)
else:
# default case for most games
ret = self.filter_list(
safeloadfi("_list.txt"),
attr1, attr2, attr3,
num, offset
)
if action == "contents":
# Get only the base filename just in case there is a path
# involved somewhere in the filename string.
dlc_contenttype = True
contents = os.path.basename(post["contents"])
ret = safeloadfi(contents)
self.send_response(200)
if dlc_contenttype is True:
self.send_header("Content-type", "application/x-dsdl")
self.send_header("Content-Disposition",
"attachment; filename=\"" +
post["contents"] + "\"")
else:
self.send_header("Content-type", "text/plain")
self.send_header("X-DLS-Host", "http://127.0.0.1/")
logger.log(logging.DEBUG, "download response to %s",
client_address)
# if dlc_contenttype is False:
# logger.log(logging.DEBUG, "%s", ret)
else:
self.send_response(404)
logger.log(logging.WARNING,
"Unknown path request %s from %s!",
self.path, client_address)
return
self.send_header("Content-Length", str(len(ret)))
self.end_headers()
self.wfile.write(ret)
if ret is not None:
self.send_header("Content-Length", str(len(ret)))
self.end_headers()
self.wfile.write(ret)
except:
logger.log(logging.ERROR, "Unknown exception: %s",
traceback.format_exc())
logger.log(logging.ERROR, "Exception occurred on POST request!")
logger.log(logging.ERROR, "%s", traceback.format_exc())
def str_to_dict(self, s):
# Enable keep_blank_values, skipped otherwise.
# TODO: Move this in utils module?
ret = urlparse.parse_qs(s, True)
for k, v in ret.iteritems():
try:
# I'm not sure about the replacement for '-', but it'll at
# least let it be decoded.
# For the most part it's not important since it's mostly
# used for the devname/ingamesn fields.
ret[k] = base64.b64decode(urlparse.unquote(v[0])
.replace("*", "=")
.replace("?", "/")
.replace(">", "+")
.replace("-", "/"))
except TypeError:
print "Could not decode following string: ret[%s] = %s" \
% (k, v[0])
print "url: %s" % s
# If you don't assign it like this it'll be a list, which
# breaks other code.
ret[k] = v[0]
class NasHTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
"""Threading HTTP server."""
pass
return ret
def dict_to_str(self, dict):
"""Convert dict to str.
nas(wii).nintendowifi.net has a URL query-like format but does not
use encoding for special characters.
"""
for k, v in dict.iteritems():
dict[k] = base64.b64encode(v).replace("=", "*")
return "&".join("{!s}={!s}".format(k, v) for k, v in dict.items()) + \
"\r\n"
def filter_list_g5_mystery_gift(self, data, rhgamecd):
"""Custom selection for generation 5 mystery gifts, so that the random
or data-based selection still works properly."""
if rhgamecd[2] == 'A':
filterBit = 0x100000
elif rhgamecd[2] == 'B':
filterBit = 0x200000
elif rhgamecd[2] == 'D':
filterBit = 0x400000
elif rhgamecd[2] == 'E':
filterBit = 0x800000
else:
# unknown game, can't filter
return data
output = []
for line in data.splitlines():
lineBits = int(line.split('\t')[3], 16)
if lineBits & filterBit == filterBit:
output.append(line)
return '\r\n'.join(output) + '\r\n'
def filter_list_by_date(self, data, token):
"""Allow user to control which file to receive by setting
the local date selected file will be the one at
index (day of year) mod (file count)."""
try:
userData = self.server.db.get_nas_login(token)
date = time.strptime(userData['devtime'], '%y%m%d%H%M%S')
files = data.splitlines()
ret = files[(int(date.tm_yday) - 1) % len(files)] + '\r\n'
except:
ret = self.filter_list_random_files(data, 1)
return ret
def filter_list_random_files(self, data, count):
"""Get [count] random files from the filelist."""
samples = random.sample(data.splitlines(), count)
return '\r\n'.join(samples) + '\r\n'
def filter_list(self, data, attr1=None, attr2=None, attr3=None,
num=None, offset=None):
"""Filter the list based on the attribute fields.
If nothing matches, at least return a newline.
Pokemon BW at least expects this and will error without it.
"""
if attr1 is None and attr2 is None and attr3 is None and \
num is None and offset is None:
# Nothing to filter, just return the input data
return data
nc = lambda a, b: (a is None or a == b)
attrs = lambda data: (len(data) == 6 and nc(attr1, data[2]) and
nc(attr2, data[3]) and nc(attr3, data[4]))
output = filter(lambda line: attrs(line.split("\t")),
data.splitlines())
if offset is not None:
output = output[offset:]
if num is not None:
output = output[:num]
return '\r\n'.join(output) + '\r\n'
def get_file_count(self, data):
return sum(1 for line in data.splitlines() if line)
class NasServer(object):
def start(self):
address = dwc_config.get_ip_port('NasServer')
httpd = NasHTTPServer(address, NasHTTPServerHandler)
logger.log(logging.INFO, "Now listening for connections on %s:%d...",
*address)
httpd.serve_forever()
if __name__ == "__main__":

232
other/dlc.py Normal file
View File

@ -0,0 +1,232 @@
"""DWC Network Server Emulator
Copyright (C) 2014 polaris-
Copyright (C) 2014 ToadKing
Copyright (C) 2016 Sepalani
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import os
import random
import time
from gamespy.gs_database import GamespyDatabase
# If a game from this list requests a file listing, the server will return
# that only one exists and return a random one.
# This is used for Mystery Gift distribution on Generation 4 Pokemon games
gamecodes_return_random_file = [
'ADAD',
'ADAE',
'ADAF',
'ADAI',
'ADAJ',
'ADAK',
'ADAS',
'CPUD',
'CPUE',
'CPUF',
'CPUI',
'CPUJ',
'CPUK',
'CPUS',
'IPGD',
'IPGE',
'IPGF',
'IPGI',
'IPGJ',
'IPGK',
'IPGS'
]
def get_file_count(data):
return sum(1 for line in data.splitlines() if line)
def filter_list(data, attr1=None, attr2=None, attr3=None,
num=None, offset=None):
"""Filter the list based on the attribute fields.
If nothing matches, at least return a newline.
Pokemon BW at least expects this and will error without it.
"""
if attr1 is None and attr2 is None and attr3 is None and \
num is None and offset is None:
# Nothing to filter, just return the input data
return data
def attrs(data):
"""Filter attrs."""
def nc(a, b):
"""Filter nc."""
return a is None or a == b
return \
len(data) == 6 and \
nc(attr1, data[2]) and \
nc(attr2, data[3]) and \
nc(attr3, data[4])
output = filter(lambda line: attrs(line.split("\t")), data.splitlines())
if offset is not None:
output = output[offset:]
if num is not None:
output = output[:num]
return '\r\n'.join(output) + '\r\n'
def filter_list_random_files(data, count):
"""Get [count] random files from the filelist."""
samples = random.sample(data.splitlines(), count)
return '\r\n'.join(samples) + '\r\n'
def filter_list_by_date(data, token):
"""Allow user to control which file to receive by setting
the local date selected file will be the one at
index (day of year) mod (file count)."""
try:
userData = GamespyDatabase().get_nas_login(token)
date = time.strptime(userData['devtime'], '%y%m%d%H%M%S')
files = data.splitlines()
ret = files[(int(date.tm_yday) - 1) % len(files)] + '\r\n'
except:
ret = filter_list_random_files(data, 1)
return ret
def filter_list_g5_mystery_gift(data, rhgamecd):
"""Custom selection for generation 5 mystery gifts, so that the random
or data-based selection still works properly."""
if rhgamecd[2] == 'A':
filterBit = 0x100000
elif rhgamecd[2] == 'B':
filterBit = 0x200000
elif rhgamecd[2] == 'D':
filterBit = 0x400000
elif rhgamecd[2] == 'E':
filterBit = 0x800000
else:
# unknown game, can't filter
return data
output = []
for line in data.splitlines():
lineBits = int(line.split('\t')[3], 16)
if lineBits & filterBit == filterBit:
output.append(line)
return '\r\n'.join(output) + '\r\n'
def safeloadfi(dlc_path, name, mode='rb'):
"""safeloadfi : string -> string
Safely load contents of a file, given a filename,
and closing the file afterward.
"""
try:
with open(os.path.join(dlc_path, name), mode) as f:
return f.read()
except:
return None
def download_count(dlc_path, post):
"""Handle download count request."""
if post["gamecd"] in gamecodes_return_random_file:
return "1"
elif os.path.exists(dlc_path):
if os.path.isfile(os.path.join(dlc_path, "_list.txt")):
attr1 = post.get("attr1", None)
attr2 = post.get("attr2", None)
attr3 = post.get("attr3", None)
dlc_file = safeloadfi(dlc_path, "_list.txt")
ls = filter_list(dlc_file, attr1, attr2, attr3)
count = get_file_count(ls)
else:
count = len(os.listdir(dlc_path))
return "%d" % count
else:
return "0"
def download_size(dlc_path, name):
"""Return download filename and size.
Used in download list.
"""
return (name, str(os.path.getsize(os.path.join(dlc_path, name))))
def download_list(dlc_path, post):
"""Handle download list request.
Look for a list file first. If the list file exists, send the
entire thing back to the client.
"""
# Get list file
if not os.path.exists(dlc_path):
return "\r\n"
elif os.path.isfile(os.path.join(dlc_path, "_list.txt")):
list_data = safeloadfi(dlc_path, "_list.txt") or "\r\n"
else:
# Doesn't have _list.txt file
try:
ls = [
download_size(dlc_path, name)
for name in sorted(os.listdir(dlc_path))
]
list_data = "\r\n".join("\t\t\t\t\t".join(f) for f in ls) + "\r\n"
except:
return "\r\n"
attr1 = post.get("attr1", None)
attr2 = post.get("attr2", None)
attr3 = post.get("attr3", None)
if post["gamecd"].startswith("IRA") and attr1.startswith("MYSTERY"):
# Pokemon BW Mystery Gifts, until we have a better solution for that
ret = filter_list(list_data, attr1, attr2, attr3)
ret = filter_list_g5_mystery_gift(ret, post["rhgamecd"])
return filter_list_by_date(ret, post["token"])
elif post["gamecd"] in gamecodes_return_random_file:
# Pokemon Gen 4 Mystery Gifts, same here
ret = filter_list(list_data, attr1, attr2, attr3)
return filter_list_by_date(ret, post["token"])
else:
# Default case for most games
num = post.get("num", None)
if num is not None:
num = int(num)
offset = post.get("offset", None)
if offset is not None:
offset = int(offset)
return filter_list(list_data, attr1, attr2, attr3, num, offset)
def download_contents(dlc_path, post):
"""Handle download contents request.
Get only the base filename just in case there is a path involved
somewhere in the filename string.
"""
contents = os.path.basename(post["contents"])
return safeloadfi(dlc_path, contents)

View File

@ -18,11 +18,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import base64
import logging
import logging.handlers
import random
import string
import struct
import urlparse
import ctypes
import os
@ -327,3 +329,43 @@ def pretty_print_hex(orig_data, cols=16, sep=' '):
# output += "\n"
#
# return output
def qs_to_dict(s):
"""Convert query string to dict."""
ret = urlparse.parse_qs(s, True)
for k, v in ret.items():
try:
# I'm not sure about the replacement for '-', but it'll at
# least let it be decoded.
# For the most part it's not important since it's mostly
# used for the devname/ingamesn fields.
ret[k] = base64.b64decode(urlparse.unquote(v[0])
.replace("*", "=")
.replace("?", "/")
.replace(">", "+")
.replace("-", "/"))
except TypeError:
"""
print("Could not decode following string: ret[%s] = %s"
% (k, v[0]))
print("url: %s" % s)
"""
# If you don't assign it like this it'll be a list, which
# breaks other code.
ret[k] = v[0]
return ret
def dict_to_qs(d):
"""Convert dict to query string.
nas(wii).nintendowifi.net has a URL query-like format but does not
use encoding for special characters.
"""
# Dictionary comprehension is used to not modify the original
ret = {k: base64.b64encode(v).replace("=", "*") for k, v in d.items()}
return "&".join("{!s}={!s}".format(k, v) for k, v in ret.items()) + "\r\n"