Added a lot of shit. time for another refactor

This commit is contained in:
573dev 2020-10-17 22:45:10 -05:00
parent b7271d570c
commit 31b4208fad
10 changed files with 381 additions and 6 deletions

3
.gitignore vendored
View File

@ -104,3 +104,6 @@ ENV/
# logs
logs/
# Decompiled v8 code
decompiled/

View File

@ -1,2 +1,3 @@
[mypy]
ignore_missing_imports = True
plugins = sqlmypy

View File

@ -14,6 +14,8 @@ REQUIREMENTS = [
"lxml",
"pycryptodome",
"kbinxml",
"sqlalchemy",
"sqlalchemy-stubs",
]
EXTRAS = {

View File

@ -7,11 +7,18 @@ from typing import Optional, Union
from flask import Flask, has_request_context, request
from v8_server.config import Development, Production
from v8_server.model.connection import Base, Database
from v8_server.model.user import User
from v8_server.utils.flask import generate_secret_key
from .version import __version__
# Make sure the database has been created
with Database() as db:
Base.metadata.create_all(db.engine)
class RequestFormatter(logging.Formatter):
def format(self, record):
encrypted = "None"

View File

View File

@ -0,0 +1,34 @@
from __future__ import annotations
from pathlib import Path
from types import TracebackType
from typing import Optional, Type
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
from sqlalchemy.orm import Session as AlchemySession
Base: DeclarativeMeta = declarative_base()
class Database(object):
def __init__(self, echo: bool = False) -> None:
self.uri = f"sqlite+pysqlite:///{Path(__file__).parent / 'v8.db'}"
self.engine = create_engine(self.uri, echo=echo)
self.session = AlchemySession(bind=self.engine)
def __enter__(self) -> Database:
return self
def __exit__(
self,
exception_type: Optional[Type[BaseException]],
exception_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> bool:
self.session.close()
return exception_type is None
def __del__(self) -> None:
self.engine.dispose()

97
v8_server/model/user.py Normal file
View File

@ -0,0 +1,97 @@
from sqlalchemy import JSON, Column, ForeignKey, UniqueConstraint
from sqlalchemy.orm import relationship
from sqlalchemy.types import Integer, String
from v8_server.model.connection import Base
class User(Base):
"""
Table representing a user. Each user has a Unique ID and a pin which is used
with all cards associated with the user's account.
"""
__tablename__ = "users"
uid = Column(Integer, nullable=False, primary_key=True)
pin = Column(String(4), nullable=False)
cards = relationship("Card", back_populates="user")
extids = relationship("ExtID", back_populates="user")
refids = relationship("RefID", back_populates="user")
def __repr__(self) -> str:
return f'User<uid: {self.uid}, pin: "{self.pin}">'
class Card(Base):
"""
Table representing a card associated with a user. Users may have zero or more cards
associated with them. When a new card is used in a game, a new user will be created
to asociate with a card.
"""
__tablename__ = "cards"
uid = Column(String(16), nullable=False, primary_key=True)
user_id = Column(Integer, ForeignKey("users.uid"), nullable=False)
user = relationship("User", back_populates="cards")
def __repr__(self) -> str:
return f'Card<uid: "{self.uid}", user_id: {self.user_id}>'
class ExtID(Base):
"""
Table representing and extid for a user across a game series. Each game series on
the network gets its own extid (8 digit number) for each user.
"""
__tablename__ = "extids"
__table_args_ = (UniqueConstraint("game", "user_id", name="game_user_id"),)
uid = Column(Integer, nullable=False, primary_key=True)
game = Column(String(32), nullable=False)
user_id = Column(Integer, ForeignKey("users.uid"), nullable=False)
user = relationship("User", back_populates="extids")
def __repr__(self) -> str:
return f'ExtID<uid: {self.uid}, game: "{self.game}", user_id: {self.user_id}>'
class RefID(Base):
"""
Table representing a refid for a user. Each unique game on the network will need
a refid for each user/game/version they have a profile for. If a user does not have
a profile for a particular game, a new and unique refid will be generated for the
user.
Note that a user might have an extid/refid for a game without a profile, but a user
can not have a profile without an extid/refid
"""
__tablename__ = "refids"
__table_args__ = (
UniqueConstraint("game", "version", "user_id", name="game_version_user_id"),
)
uid = Column(String(16), nullable=False, primary_key=True)
game = Column(String(32), nullable=False)
version = Column(Integer, nullable=False)
user_id = Column(Integer, ForeignKey("users.uid"), nullable=False)
user = relationship("User", back_populates="refids")
def __repr__(self) -> str:
return (
f'RefID<uid: {self.uid}, game: "{self.game}", version: {self.version} '
f"user_id: {self.user_id}>"
)
class Profile(Base):
"""
Table for storing JSON profile blobs, indexed by refid
"""
__tablename__ = "profiles"
ref_id = Column(
String(16), ForeignKey("refids.uid"), nullable=False, primary_key=True
)
data = Column(JSON, nullable=False)

BIN
v8_server/model/v8.db Normal file

Binary file not shown.

View File

@ -6,3 +6,54 @@ def e_type(_type, count: Optional[int] = None) -> Dict[str, str]:
if count is not None:
result["__count"] = str(count)
return result
class Model:
"""
Object representing a parsed Model String.
"""
def __init__(
self, game: str, dest: str, spec: str, rev: str, version: Optional[int]
) -> None:
"""
Initialize a Model object.
Parameters:
game - Game code (such as LDJ)
dest - Destination region for the game (such as J)
spec - Spec for the game (such as A)
rev - Revision of the game (such as A)
version - Integer representing version, usually in the form of YYYYMMDDXX
where YYYY is a year, MM is a month, DD is a day and XX is
sub-day versioning.
"""
self.game = game
self.dest = dest
self.spec = spec
self.rev = rev
self.version = version
@staticmethod
def from_modelstring(model: str) -> "Model":
"""
Parse a modelstring and return a Model
Parameters:
model - Modelstring in a form similar to "K39:J:B:A:2010122200". Note that
The last part (version number) may be left off.
Returns:
A Model object.
"""
parts = model.split(":")
if len(parts) == 5:
game, dest, spec, rev, version = parts
return Model(game, dest, spec, rev, int(version))
elif len(parts) == 4:
game, dest, spec, rev = parts
return Model(game, dest, spec, rev, None)
raise Exception("Couldn't parse model {}".format(model))
def __str__(self) -> str:
if self.version is None:
return f"{self.game}:{self.dest}:{self.spec}:{self.rev}"
else:
return f"{self.game}:{self.dest}:{self.spec}:{self.rev}:{self.version}"

View File

@ -1,12 +1,16 @@
import random
from typing import Dict, Tuple
from flask import request
from lxml import etree as ET # noqa: N812
from lxml.builder import E
from sqlalchemy.orm.exc import MultipleResultsFound
from v8_server import app
from v8_server.model.connection import Database
from v8_server.model.user import Card, ExtID, Profile, RefID, User
from v8_server.utils.eamuse import e_type
from v8_server.utils.xml import eamuse_prepare_xml, eamuse_read_xml
from v8_server.utils.xml import eamuse_prepare_xml, eamuse_read_xml, get_xml_attrib
@app.route("/", defaults={"path": ""}, methods=["GET", "POST"])
@ -141,18 +145,182 @@ def package() -> Tuple[bytes, Dict[str, str]]:
return eamuse_prepare_xml(response, request)
class CardStatus(object):
"""
List of statuses we return to the game for various reasons
"""
SUCCESS = 0
NO_PROFILE = 109
NOT_ALLOWED = 110
NOT_REGISTERED = 112
INVALID_PIN = 116
def create_refid(user_id: int) -> str:
with Database() as db:
# Create a new extid that is unique
while True:
e_id = random.randint(0, 89999999) + 10000000
if db.session.query(ExtID).filter(ExtID.uid == e_id).count() == 0:
break
# Use that ext_id
ext_id = ExtID(uid=e_id, game="GFDM", user_id=user_id)
try:
db.session.add(ext_id)
except Exception:
# Most likely a duplicate error as this user already has an ExtID for this
# game series
pass
# Create a new refid that is unique
while True:
r_id = "".join(random.choice("0123456789ABCDEF") for _ in range(16))
if db.session.query(RefID).filter(RefID.uid == r_id).count() == 0:
break
# Use that ref_id
ref_id = RefID(uid=r_id, game="GFDM", version=8, user_id=user_id)
db.session.add(ref_id)
db.session.commit()
uid = ref_id.uid
return uid
def has_profile(user_id: int) -> bool:
result = False
with Database() as db:
ref_id = (
db.session.query(RefID)
.filter(RefID.game == "GFDM", RefID.version == 8, RefID.user_id == user_id)
.one()
)
result = (
db.session.query(Profile).filter(Profile.ref_id == ref_id.uid).count() != 0
)
return result
@app.route("/cardmng/service", methods=["POST"])
def cardmng() -> Tuple[bytes, Dict[str, str]]:
"""
This is for dealing with the card management
"""
xml, model, module, method, command = eamuse_read_xml(request)
xml, model_str, module, method, command = eamuse_read_xml(request)
if method == "inquire":
card_id = get_xml_attrib(xml[0], "cardid")
# Check if the user has already been created
return ""
with Database() as db:
# Check if the user already exists
try:
card = db.session.query(Card).filter(Card.uid == card_id).one_or_none()
except MultipleResultsFound:
app.logger.error(f"Multiple Cards found for Card ID: {card_id}")
raise
if card is None:
# This user doesn't exist, force the system to create a new account
response = E.response(
E.cardmng({"status": str(CardStatus.NOT_REGISTERED)})
)
else:
# Special handing for looking up whether the previous game's profile
# existed
user = db.session.query(User).filter(User.uid == card.user_id).one()
bound = has_profile(user.uid)
expired = False
ref_id = (
db.session.query(RefID)
.filter(
RefID.game == "GFDM",
RefID.version == 8,
RefID.user_id == user.uid,
)
.one()
)
paseli_enabled = False
response = E.response(
E.cardmng(
{
"refid": ref_id.uid,
"dataid": ref_id.uid,
"newflag": "1", # A;ways seems to be set to 1
"binded": "1" if bound else "0",
"expired": "1" if expired else "0",
"ecflag": "1" if paseli_enabled else "0",
"useridflag": "1",
"extidflag": "1",
}
)
)
elif method == "getrefid":
# Given a card_id, and a pin, register the card with the system and generate a
# new data_id/ref_id + ext_id
card_id = get_xml_attrib(xml[0], "cardid")
pin = get_xml_attrib(xml[0], "passwd")
with Database() as db:
# Create the user object
user = User(pin=pin)
db.session.add(user)
# We must commit to assign a uid
db.session.commit()
user_id = user.uid
# Now insert the card, tying it to the account
card = Card(uid=card_id, user_id=user_id)
db.session.add(card)
db.session.commit()
ref_id = create_refid(user_id)
response = E.response(E.cardmng({"dataid": ref_id, "refid": ref_id}))
elif method == "authpass":
# Given a data_id/ref_id previously found via inquire, verify the pin
ref_id = get_xml_attrib(xml[0], "refid")
pin = get_xml_attrib(xml[0], "pass")
with Database() as db:
refid = db.session.query(RefID).filter(RefID.uid == ref_id).one()
user = (
db.session.query(User).filter(User.uid == refid.user_id).one_or_none()
)
if user is not None:
valid = pin == user.pin
else:
valid = False
response = E.response(
E.cardmng(
{
"status": str(
CardStatus.SUCCESS if valid else CardStatus.INVALID_PIN
)
}
)
)
elif method == "bindmodel":
# Given a refid, bind the user's card to the current version of the game
# TODO: Not implemented right now. do it later
ref_id = get_xml_attrib(xml[0], "refid")
response = E.response(E.cardmng({"dataid": ref_id}))
elif method == "getkeepspan":
# Unclear what this method does, return an arbitrary span
response = E.response(E.cardmng({"keepspan", "30"}))
elif method == "getdatalist":
# Unclear what this method does, return a dummy response
response = base_response(module)
else:
response = base_response(module)
return eamuse_prepare_xml(response, request)
@app.route("/local/service", methods=["POST"])
@ -176,8 +344,20 @@ def local() -> Tuple[bytes, Dict[str, str]]:
)
)
elif module == "demodata":
# TODO: Not really sure what to return here
if method == "get":
response = E.response(E.demodata())
response = base_response(module)
elif module == "cardutil":
# TODO: Not really sure what to return here
if method == "check":
# Return the users game information
# If the user doesn't have any game information yet:
response = E.response(
E.cardutil(E.card(E.kind("0", e_type("s8")), {"no": "1", "state": "0"}))
)
# Else get the user info and send that you idiot
else:
response = base_response(module)
@ -201,9 +381,9 @@ def services() -> Tuple[bytes, Dict[str, str]]:
"pcbevent",
"pcbtracker",
"sidmgr",
"traceroute",
"userdata",
"userid",
"eemall",
]
services = {