Skip to content

Instantly share code, notes, and snippets.

@hclivess
Created January 26, 2025 19:14
Show Gist options
  • Save hclivess/1a0b7d436214f9d0b2c0cc633150e776 to your computer and use it in GitHub Desktop.
Save hclivess/1a0b7d436214f9d0b2c0cc633150e776 to your computer and use it in GitHub Desktop.
import xml.etree.ElementTree as ET
from requests import get, exceptions
from colorama import Fore
import os, io, zipfile
import aiohttp
import aiofiles
import asyncio
from time import sleep
OUT_DIR = "./dlc"
LANGUAGE = ["all", "en"]
TIER = ["all", "25", "50", "100", "retina", "iphone", "ipad", "ipad3", "mp3", "caf", "wav"]
ALL_LANGUAGES = True
ALL_TIERS = True
BASE_URL = "http://oct2018-4-35-0-uam5h44a.tstodlc.eamobile.com/netstorage/gameasset/direct/simpsons/"
DOWNLOAD_QUEUE = [] # [ Url, Filename, Folder, Expected Size ]
CONCURRENCY = 10 # Number of simultaneous downloads
FORCE_DOWNLOAD = False # Set to True to force re-downloads
def log(severity: int, message: str):
colors = [Fore.BLUE, Fore.YELLOW, Fore.RED, Fore.WHITE]
prefixes = ["[i] ", "[!] ", "[!] ", ""]
print(colors[severity] + prefixes[severity] + Fore.WHITE + message)
def downloadFile(url: str, filename: str, expected_size: int = None):
os.makedirs(OUT_DIR, exist_ok=True)
full_path = os.path.join(OUT_DIR, filename)
# Skip if file exists and size matches (unless force download is enabled)
if os.path.exists(full_path) and not FORCE_DOWNLOAD:
actual_size = os.path.getsize(full_path)
if expected_size is None or actual_size == expected_size:
log(0, f"Skipping existing file {filename} (size: {actual_size} bytes)")
with open(full_path, "rb") as f:
return f.read()
else:
log(1, f"File {filename} exists but size mismatch (expected: {expected_size}, actual: {actual_size}). Re-downloading...")
max_retries = 3
for retry in range(max_retries):
try:
response = get(url, timeout=10)
if response.status_code == 200:
data = response.content
log(0, f"Downloaded {filename} ({len(data)} bytes)")
with open(full_path, "wb+") as outFile:
outFile.write(data)
return data
else:
log(1, f"Status {response.status_code}. Retry {retry+1}/{max_retries} for {url}")
except (exceptions.ConnectionError, exceptions.Timeout) as e:
log(1, f"Connection error: {e}. Retry {retry+1}/{max_retries} for {url}")
sleep(2 ** retry)
log(2, f"Failed to download {url} after {max_retries} retries")
return None
def getDLCIndexXml(url: str, filename: str):
zippedFileData = downloadFile(url, filename)
if not zippedFileData:
return None
with zipfile.ZipFile(io.BytesIO(zippedFileData)) as z:
return z.read(z.infolist()[0]) # Read the first file in the zip
def getDLCIndexes():
log(0, "Getting DLC Indexes...")
try:
os.makedirs(os.path.join(OUT_DIR, "dlc"), exist_ok=True)
masterIndex = getDLCIndexXml(BASE_URL + "dlc/DLCIndex.zip", "dlc/DLCIndex.zip")
if not masterIndex:
log(2, "Failed to retrieve or parse DLCIndex.zip")
return []
tree = ET.fromstring(masterIndex)
return [item.get("index").replace(":", "/") for item in tree.findall("./IndexFile")]
except ET.ParseError as e:
log(2, f"XML Parse Error: {e}")
return []
class DLCIndexParser(ET.XMLParser):
def __init__(self):
super().__init__()
self.tier = ""
self.Language = ""
self.FileName = ""
self.FileSize = 0
def start(self, tag, attrs):
if tag == "Package":
self.tier = attrs.get("tier", "")
self.FileName = attrs.get("FileName", "")
self.Language = attrs.get("Language", "")
self.FileSize = int(attrs.get("FileSize", 0))
def end(self, tag):
if tag == "Package" and self.tier and self.Language:
if (self.Language in LANGUAGE or ALL_LANGUAGES) and (self.tier in TIER or ALL_TIERS):
folder = self.FileName.split(":")[0]
DOWNLOAD_QUEUE.append([
BASE_URL + self.FileName.replace(":", "/"),
self.FileName.split(":")[-1],
folder,
self.FileSize # Add expected file size to the queue
])
async def async_download(session, semaphore, url, folder, filename, expected_size):
full_path = os.path.join(OUT_DIR, folder, filename)
# Skip if file exists and size matches (unless force download is enabled)
if os.path.exists(full_path) and not FORCE_DOWNLOAD:
actual_size = os.path.getsize(full_path)
if expected_size is None or actual_size == expected_size:
log(0, f"Skipping existing {filename} (size: {actual_size} bytes)")
return
else:
log(1, f"File {filename} exists but size mismatch (expected: {expected_size}, actual: {actual_size}). Re-downloading...")
async with semaphore:
for retry in range(3):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
data = await response.read()
os.makedirs(os.path.dirname(full_path), exist_ok=True)
async with aiofiles.open(full_path, "wb") as f:
await f.write(data)
log(0, f"Downloaded {filename} ({len(data)} bytes)")
return
else:
log(1, f"Status {response.status}. Retry {retry+1}/3 for {url}")
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
log(1, f"Error: {e}. Retry {retry+1}/3 for {url}")
await asyncio.sleep(2 ** retry)
log(2, f"Failed to download {url}")
async def main():
indexes = getDLCIndexes()
if not indexes:
log(2, "No DLC indexes found. Exiting.")
return
for index in indexes:
try:
dlc_data = getDLCIndexXml(BASE_URL + index, f"dlc/{index.split('/')[1]}")
if dlc_data:
parser = DLCIndexParser()
ET.fromstring(dlc_data, parser=parser)
log(0, f"Processed {index}")
except ET.ParseError as e:
log(2, f"Parse error: {e}")
if not DOWNLOAD_QUEUE:
log(2, "No files to download. Exiting.")
return
semaphore = asyncio.Semaphore(CONCURRENCY)
async with aiohttp.ClientSession() as session:
tasks = []
for dl in DOWNLOAD_QUEUE:
url, filename, folder, expected_size = dl
tasks.append(async_download(session, semaphore, url, folder, filename, expected_size))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment