diff --git a/main.py b/main.py index 2349d233..05d7cab9 100644 --- a/main.py +++ b/main.py @@ -1,195 +1,455 @@ -# -*- coding: utf-8 -*- -import spoilers -import mtgs_scraper -import scryfall_scraper -import mythic_scraper -import wizards_scraper -import os -import json -import io +import datetime +import pathlib import sys -import verify_files + +import contextvars +from typing import Dict, Any, List, Union, Tuple + import requests +import requests_cache import yaml -from lxml import etree -presets = { - "isfullspoil": False, # when full spoil comes around, we only want to use WOTC images - "includeMasterpieces": True, # if the set has masterpieces, let's get those too - "oldRSS": False, # maybe MTGS hasn't updated their spoiler.rss but new cards have leaked - "dumpXML": False, # let travis print (XML for testing) - # only use Scryfall data (no mtgs for ANY sets) - "scryfallOnly": False, - "dumpErrors": True # print (the error log from out/errors.json) -} +# Scryfall API for downloading spoiler sets +SCRYFALL_SET_URL: str = "https://api.scryfall.com/sets/{}" -setinfos = verify_files.load_file('set_info.yml','yaml_multi') -manual_sets = verify_files.load_file('cards_manual.yml','yaml') -card_corrections = verify_files.load_file('cards_corrections.yml','yaml') -delete_cards = verify_files.load_file('cards_delete.yml','yaml') - -errorlog = [] - -# TODO insert configparser to add config.ini file +# Downloader sessions for header consistency +SESSION: contextvars.ContextVar = contextvars.ContextVar("SESSION_SCRYFALL") -def parseargs(): - for argument in sys.argv: - for preset in presets: - if argument.split('=')[0].lower().replace('-', '') == preset.lower(): - argvalue = argument.split('=')[1] - if argvalue in ['true', 'True', 'T', 't']: - argvalue = True - elif argvalue in ['false', 'False', 'F', 'f']: - argvalue = False - presets[preset] = argvalue - print("Setting preset " + preset + " to value " + str(argvalue)) - - -def save_allsets(AllSets): - with io.open('out/AllSets.json', 'w', encoding='utf8') as json_file: - data = json.dumps(AllSets, ensure_ascii=False, indent=2, sort_keys=True, separators=(',', ':')) - json_file.write(data) - - -def save_masterpieces(masterpieces, setinfo): - with open('out/' + setinfo['masterpieces']['code'] + '.json', 'w') as outfile: - json.dump(masterpieces, outfile, sort_keys=True, indent=2, separators=(',', ': ')) - - -def save_setjson(mtgs, filename): - with io.open('out/' + filename + '.json', 'w', encoding='utf8') as json_file: - data = json.dumps(mtgs, ensure_ascii=False, indent=2, sort_keys=True, separators=(',', ':')) - json_file.write(data) - - -def save_errorlog(errorlog): - with open('out/errors.yml', 'w') as outfile: - yaml.safe_dump(errorlog, outfile, default_flow_style=False) - - -def save_xml(xmlstring, outfile): - if os.path.exists(outfile): - append_or_write = 'w' - else: - append_or_write = 'w' - with open(outfile, append_or_write) as xmlfile: - xmlfile.write(xmlstring) - - -def verify_xml(file, schema): +def load_yaml_file( + input_file: str, lib_to_use: str = "yaml" +) -> Union[Dict[str, Any], List[Dict[str, Any]]]: + """ + Load a yaml file from system + :param input_file: File to open + :param lib_to_use: Open format + :return: Loaded file + """ try: - schema_doc = etree.fromstring(schema) - except Exception as e: - print ("XSD for " + file + " is invalid") - print (schema) - print (e) - return False - xml_schema = etree.XMLSchema(schema_doc) - try: - xml_doc = etree.parse(file) - except Exception as e: - print ("XML file " + file + " is invalid") - print (e) - return False - try: - xml_schema.assert_(xml_doc) - except: - xsd_errors = xml_schema.error_log - print ("Errors validating XML file " + file + " against XSD:") - for error in xsd_errors: - print (error) - sys.exit("Error: " + file + " does not pass Cockatrice XSD validation.") - return False - return True + with pathlib.Path(input_file).open("r") as f: + if lib_to_use == "yaml": + return yaml.safe_load(f) + else: + return [of for of in yaml.safe_load_all(f)] + except Exception as ex: + print("Unable to load {}: {}".format(input_file, ex.args)) + sys.exit(2) -if __name__ == '__main__': - parseargs() - AllSets = spoilers.get_allsets() # get AllSets from mtgjson - combinedjson = {} - noCards = [] - del AllSets['RNA'] - for setinfo in setinfos: - if setinfo['code'] in AllSets: - print ("Found " +setinfo['code']+ " set from set_info.yml in MTGJSON, not adding it") +# File containing all spoiler set details +SET_INFO_FILE: List[Dict[str, Any]] = load_yaml_file("set_info.yml", "yaml_multi") + + +def __get_session() -> requests.Session: + """ + Get the session for downloading content + :return: Session + """ + requests_cache.install_cache( + cache_name="scryfall_cache", backend="sqlite", expire_after=604800 # 1 week + ) + + if not SESSION.get(None): + SESSION.set(requests.Session()) + return SESSION.get() + + +def __download(scryfall_url: str) -> Dict[str, Any]: + """ + Get the data from Scryfall in JSON format using our secret keys + :param scryfall_url: URL to __download JSON data from + :return: JSON object of the Scryfall data + """ + session = __get_session() + response: Any = session.get(url=scryfall_url, timeout=5.0) + request_api_json: Dict[str, Any] = response.json() + print("Downloaded: {} (Cache = {})".format(scryfall_url, response.from_cache)) + return request_api_json + + +def download_scryfall_set(set_code: str) -> List[Dict[str, Any]]: + """ + Download a set from scryfall in entirety + :param set_code: Set code + :return: Card list + """ + set_content: Dict[str, Any] = __download(SCRYFALL_SET_URL.format(set_code)) + if set_content["object"] == "error": + print("API download failed for {}: {}".format(set_code, set_content)) + return [] + + spoiler_cards = [] + download_url = set_content["search_uri"] + + page_downloaded: int = 1 + while download_url: + page_downloaded += 1 + + cards = __download(download_url) + if cards["object"] == "error": + print("Error downloading {0}: {1}".format(set_code, cards)) + break + + for card in cards["data"]: + spoiler_cards.append(card) + + if not cards.get("has_more"): + break + + download_url = cards["next_page"] + + return sorted(spoiler_cards, key=lambda c: (c["name"], c["collector_number"])) + + +def build_types(sf_card: Dict[str, Any]) -> Tuple[List[str], List[str], List[str]]: + """ + Build the super, type, and sub-types of a given card + :param sf_card: Scryfall card + :return: Tuple of types + """ + all_super_types = ["Legendary", "Snow", "Elite", "Basic", "World", "Ongoing"] + + # return values + super_types, types, sub_types = [], [], [] + + type_line = sf_card["type_line"] + + if u"—" in type_line: + card_subs = type_line.split(u"—")[1].strip() + sub_types = card_subs.split(" ") if " " in card_subs else [card_subs] + + for card_type in all_super_types: + if card_type in type_line: + super_types.append(card_type) + + types = type_line.split(u"—")[0] + for card_type in all_super_types: + types = types.replace(card_type, "") + + return super_types, types, sub_types + + +def convert_scryfall(scryfall_cards: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Convert SF cards to MTGJSON format for dispatching + :param scryfall_cards: List of Scryfall cards + :return: MTGJSON card list + """ + trice_cards = [] + + composed_sf_cards = [] + + # Handle split/transform cards + for sf_card in scryfall_cards: + if "layout" in sf_card.keys(): + if sf_card["layout"] in ["transform", "split"]: + # Make a copy for zoning + combined_sides = sf_card.copy() + del combined_sides["card_faces"] + + # Quick pointers + face_0 = sf_card["card_faces"][0] + face_1 = sf_card["card_faces"][1] + + # Update data for the combined + combined_sides["layout"] = "double-faced" + combined_sides["names"] = [face_0["name"], face_1["name"]] + + # Re-structure two cards into singletons + front_side = {**combined_sides, **face_0} + back_side = {**combined_sides, **face_1} + + # Uniquify them + front_side["collector_number"] += "a" + back_side["collector_number"] += "b" + + # And continue on our journey + composed_sf_cards.extend([front_side, back_side]) + else: + composed_sf_cards.append(sf_card) + + # Build trice cards from SF cards + for sf_card in composed_sf_cards: + super_types, types, sub_types = build_types(sf_card) + + trice_card = { + "cmc": sf_card["cmc"], + "names": sf_card.get("names", None), + "mana_cost": sf_card.get("mana_cost", ""), + "name": sf_card["name"], + "number": sf_card["collector_number"], + "rarity": sf_card["rarity"].replace("mythic", "mythic rare").title(), + "text": sf_card.get("oracle_text", ""), + "url": sf_card["image_uris"].get("normal", None), + "type": sf_card.get("type_line", "Unknown").replace(u"—", "-"), + "colorIdentity": sf_card.get("color_identity", None), + "colors": sf_card["colors"], + "power": sf_card.get("power", None), + "toughness": sf_card.get("toughness", None), + "layout": sf_card["layout"].replace("normal", ""), + "loyalty": sf_card.get("loyalty", None), + "artist": sf_card.get("artist", ""), + "flavor": sf_card.get("flavor_text", None), + "multiverseId": sf_card.get("multiverse_id", None), + "superTypes": super_types, + "types": types, + "subTypes": sub_types, + } + trice_cards.append(trice_card) + + return trice_cards + + +def open_header(card_xml_file) -> None: + """ + Add the header data to the XML file + :param card_xml_file: Card file path + """ + card_xml_file.write( + "\n" + "\n" + "\n" + "\n" + ) + + +def fill_header_sets(card_xml_file, set_code, set_name, release_date) -> None: + """ + Add header data for set files + :param card_xml_file: Card file path + :param set_code: Set code + :param set_name: Set name + :param release_date: Release Date + """ + card_xml_file.write( + "\n" + set_code + "\n" + "" + set_name + "\n" + "Expansion\n" + "" + release_date + "\n" + "\n" + ) + + +def close_header(card_xml_file) -> None: + """ + Add closing data to files + :param card_xml_file: Card file path + """ + card_xml_file.write("\n\n") + + +def close_xml_file(card_xml_file) -> None: + """ + Add final touch to files to validate them + :param card_xml_file: Card file path + """ + card_xml_file.write("\n\n") + + +def write_cards( + card_xml_file: Any, trice_dict: List[Dict[str, Any]], set_code: str +) -> None: + """ + Given a list of cards, write the cards to an output file + :param card_xml_file: Output file to write to + :param trice_dict: List of cards + :param set_code: Set code + """ + count = 0 + related = 0 + + for card in trice_dict: + if "names" in card.keys() and card["names"]: + if "layout" in card and card["layout"] != "double-faced": + if card["name"] == card["names"][1]: + continue + + count += 1 + set_name = card["name"] + + if "mana_cost" in card.keys(): + mana_cost = card["mana_cost"].replace("{", "").replace("}", "") + else: + mana_cost = "" + + if "power" in card.keys() or "toughness" in card.keys(): + if card["power"]: + pt = str(card["power"]) + "/" + str(card["toughness"]) + else: + pt = 0 + else: + pt = 0 + + if "text" in card.keys(): + text = card["text"] + else: + text = "" + + card_cmc = str(card["cmc"]) + card_type = card["type"] + if "names" in card.keys(): + if "layout" in card: + if card["layout"] == "split" or card["layout"] == "aftermath": + if "names" in card: + if card["name"] == card["names"][0]: + for json_card in trice_dict: + if json_card["name"] == card["names"][1]: + card_type += " // " + json_card["type"] + new_mc = "" + if "mana_cost" in json_card: + new_mc = json_card["mana_cost"] + mana_cost += " // " + new_mc.replace( + "{", "" + ).replace("}", "") + card_cmc += " // " + str(json_card["cmc"]) + text += "\n---\n" + json_card["text"] + set_name += " // " + json_card["name"] + elif card["layout"] == "double-faced": + if "names" not in card.keys(): + print(card["name"] + ' is double-faced but no "names" key') + else: + for dfc_name in card["names"]: + if dfc_name != card["name"]: + related = dfc_name + else: + print( + card["name"] + + " has names, but layout != split, aftermath, or double-faced" + ) + else: + print(card["name"] + " has multiple names and no 'layout' key") + + table_row = "1" + if "Land" in card_type: + table_row = "0" + elif "Sorcery" in card_type: + table_row = "3" + elif "Instant" in card_type: + table_row = "3" + elif "Creature" in card_type: + table_row = "2" + + if "number" in card: + if "b" in str(card["number"]): + if "layout" in card: + if card["layout"] == "split" or card["layout"] == "aftermath": + continue + + card_xml_file.write("\n") + card_xml_file.write("" + set_name + "\n") + card_xml_file.write( + '' + + str(set_code) + + "\n" + ) + card_xml_file.write("" + mana_cost + "\n") + card_xml_file.write("" + card_cmc + "\n") + + if "colors" in card.keys(): + for color in card["colors"]: + card_xml_file.write("" + str(color) + "\n") + + if set_name + " enters the battlefield tapped" in text: + card_xml_file.write("1\n") + + card_xml_file.write("" + card_type + "\n") + + if pt: + card_xml_file.write("" + pt + "\n") + + if "loyalty" in card.keys(): + card_xml_file.write("" + str(card["loyalty"]) + "\n") + card_xml_file.write("" + table_row + "\n") + card_xml_file.write("" + text + "\n") + + if related: + card_xml_file.write("" + related + "\n") + related = "" + + card_xml_file.write("\n") + + +def write_spoilers_xml(trice_dicts) -> None: + """ + Write the spoiler.xml file + :param trice_dicts: Dict of entries + """ + pathlib.Path("out").mkdir(exist_ok=True) + card_xml_file = pathlib.Path("out/spoiler.xml").open("w") + + # Fill in set headers + open_header(card_xml_file) + for value in SET_INFO_FILE: + fill_header_sets( + card_xml_file, value["code"], value["name"], value["releaseDate"] + ) + close_header(card_xml_file) + + # Write in all the cards + for value in SET_INFO_FILE: + try: + write_cards(card_xml_file, trice_dicts[value["code"]], value["code"]) + except KeyError: + print("Skipping " + value["code"]) + + close_xml_file(card_xml_file) + + +def write_set_xml( + trice_dict: List[Dict[str, Any]], set_code: str, set_name: str, release_date: str +) -> None: + """ + Write out a single magic set to XML format + :param trice_dict: Cards to print + :param set_code: Set code + :param set_name: Set name + :param release_date: Set release date + """ + if not trice_dict: + return + + pathlib.Path("out").mkdir(exist_ok=True) + card_xml_file = pathlib.Path("out/{}.xml".format(set_code)).open("w") + + open_header(card_xml_file) + fill_header_sets(card_xml_file, set_code, set_name, release_date) + close_header(card_xml_file) + write_cards(card_xml_file, trice_dict, set_code) + close_xml_file(card_xml_file) + + +def main() -> None: + """ + Main dispatch thread + """ + spoiler_xml = {} + for set_info in SET_INFO_FILE: + print("Handling {}".format(set_info["code"])) + + if not set_info["scryfallOnly"]: continue - print("Handling {}".format(setinfo['code'])) - if presets['oldRSS'] or 'noRSS' in setinfo and setinfo['noRSS']: - mtgs = {"cards": []} - else: - mtgs = mtgs_scraper.scrape_mtgs( - 'http://www.mtgsalvation.com/spoilers.rss') # scrape mtgs rss feed - mtgs = mtgs_scraper.parse_mtgs(mtgs, setinfo=setinfo) # parse spoilers into mtgjson format - if manual_sets and manual_sets != '' and setinfo['code'] in manual_sets: - manual_cards = manual_sets[setinfo['code']] - else: - manual_cards = [] - mtgs = spoilers.correct_cards( - mtgs, manual_cards, card_corrections, delete_cards['delete']) # fix using the fixfiles - mtgjson = spoilers.get_image_urls(mtgs, presets['isfullspoil'], setinfo) # get images - if presets['scryfallOnly'] or 'scryfallOnly' in setinfo and setinfo['scryfallOnly']: - scryfall = scryfall_scraper.get_scryfall( - 'https://api.scryfall.com/cards/search?q=++e:' + setinfo['code'].lower()) - mtgjson = scryfall #_scraper.smash_mtgs_scryfall(mtgs, scryfall) - if 'fullSpoil' in setinfo and setinfo['fullSpoil']: - wotc = wizards_scraper.scrape_fullspoil('', setinfo) - wizards_scraper.smash_fullspoil(mtgjson, wotc) - [mtgjson, errors] = spoilers.error_check( - mtgjson, card_corrections) # check for errors where possible - errorlog += errors - if not 'cards' in mtgjson or mtgjson['cards'] == [] or not mtgjson['cards']: - noCards.append(setinfo['code']) - continue - spoilers.write_xml( - mtgjson, setinfo['code'], setinfo['name'], setinfo['releaseDate']) - #save_xml(spoilers.pretty_xml(setinfo['code']), 'out/spoiler.xml') - mtgjson = spoilers.add_headers(mtgjson, setinfo) - AllSets = spoilers.make_allsets(AllSets, mtgjson, setinfo['code']) - if 'masterpieces' in setinfo: # repeat all of the above for masterpieces - # masterpieces aren't in the rss feed, so for the new cards, we'll go to their individual pages on mtgs - # old cards will get their infos copied from mtgjson (including fields that may not apply like 'artist') - # the images will still come from mtgs - masterpieces = spoilers.make_masterpieces( - setinfo['masterpieces'], AllSets, mtgjson) - [masterpieces, errors] = spoilers.error_check(masterpieces) - errorlog += errors - spoilers.write_xml(masterpieces, setinfo['masterpieces']['code'], - setinfo['masterpieces']['name'], setinfo['masterpieces']['releaseDate']) - AllSets = spoilers.make_allsets( - AllSets, masterpieces, setinfo['masterpieces']['code']) - save_masterpieces(masterpieces, setinfo) - save_xml(spoilers.pretty_xml('out/' + setinfo['masterpieces']['code'] + '.xml'), 'out/' + setinfo['masterpieces']['code'] + '.xml') - combinedjson[setinfo['masterpieces']['code']] = masterpieces - if 'cards' in mtgjson and mtgjson['cards'] and not mtgjson['cards'] == []: - save_setjson(mtgjson, setinfo['code']) - combinedjson[setinfo['code']] = mtgjson - if os.path.isfile('out/' + setinfo['code'] + '.xml'): - save_xml(spoilers.pretty_xml('out/' + setinfo['code'] + '.xml'), 'out/' + setinfo['code'] + '.xml') - if noCards != []: - print("Not processing set(s) with no cards: {}".format(noCards)) - save_setjson(combinedjson, 'spoiler') - spoilers.write_combined_xml(combinedjson, setinfos) - save_xml(spoilers.pretty_xml('out/spoiler.xml'), 'out/spoiler.xml') - cockatrice_xsd = requests.get('https://raw.githubusercontent.com/Cockatrice/Cockatrice/master/doc/cards.xsd').text - if verify_xml('out/spoiler.xml', cockatrice_xsd): # check if our XML passes Cockatrice's XSD - print ('spoiler.xml passes Cockatrice XSD verification') - else: - print ('spoiler.xml fails Cockatrice XSD verification') - errorlog = spoilers.remove_corrected_errors(errorlog, card_corrections) - save_errorlog(errorlog) - save_allsets(AllSets) - # save_setjson(mtgjson) - if presets['dumpXML']: - print ('') - with open('out/spoiler.xml', 'r') as xmlfile: - print (xmlfile.read()) - print ('') - if presets['dumpErrors']: - if errorlog != {}: - print ('//----- DUMPING ERROR LOG -----') - print (yaml.safe_dump(errorlog, default_flow_style=False)) - print ('//----- END ERROR LOG -----') - else: - print ("No Detected Errors!") + cards = download_scryfall_set(set_info["code"]) + trice_dict = convert_scryfall(cards) + + # Write SET.xml + write_set_xml( + trice_dict, set_info["code"], set_info["name"], set_info["releaseDate"] + ) + + # Save for spoiler.xml + spoiler_xml[set_info["code"]] = trice_dict + + # Write out the spoiler.xml file + write_spoilers_xml(spoiler_xml) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt index 66cb7cbc..7af936f6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +contextvars requests feedparser lxml