Created
January 26, 2025 19:14
-
-
Save hclivess/1a0b7d436214f9d0b2c0cc633150e776 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import xml.etree.ElementTree as ET | |
from requests import get, exceptions | |
from colorama import Fore | |
import os, io, zipfile | |
import aiohttp | |
import aiofiles | |
import asyncio | |
from time import sleep | |
OUT_DIR = "./dlc" | |
LANGUAGE = ["all", "en"] | |
TIER = ["all", "25", "50", "100", "retina", "iphone", "ipad", "ipad3", "mp3", "caf", "wav"] | |
ALL_LANGUAGES = True | |
ALL_TIERS = True | |
BASE_URL = "http://oct2018-4-35-0-uam5h44a.tstodlc.eamobile.com/netstorage/gameasset/direct/simpsons/" | |
DOWNLOAD_QUEUE = [] # [ Url, Filename, Folder, Expected Size ] | |
CONCURRENCY = 10 # Number of simultaneous downloads | |
FORCE_DOWNLOAD = False # Set to True to force re-downloads | |
def log(severity: int, message: str): | |
colors = [Fore.BLUE, Fore.YELLOW, Fore.RED, Fore.WHITE] | |
prefixes = ["[i] ", "[!] ", "[!] ", ""] | |
print(colors[severity] + prefixes[severity] + Fore.WHITE + message) | |
def downloadFile(url: str, filename: str, expected_size: int = None): | |
os.makedirs(OUT_DIR, exist_ok=True) | |
full_path = os.path.join(OUT_DIR, filename) | |
# Skip if file exists and size matches (unless force download is enabled) | |
if os.path.exists(full_path) and not FORCE_DOWNLOAD: | |
actual_size = os.path.getsize(full_path) | |
if expected_size is None or actual_size == expected_size: | |
log(0, f"Skipping existing file {filename} (size: {actual_size} bytes)") | |
with open(full_path, "rb") as f: | |
return f.read() | |
else: | |
log(1, f"File {filename} exists but size mismatch (expected: {expected_size}, actual: {actual_size}). Re-downloading...") | |
max_retries = 3 | |
for retry in range(max_retries): | |
try: | |
response = get(url, timeout=10) | |
if response.status_code == 200: | |
data = response.content | |
log(0, f"Downloaded {filename} ({len(data)} bytes)") | |
with open(full_path, "wb+") as outFile: | |
outFile.write(data) | |
return data | |
else: | |
log(1, f"Status {response.status_code}. Retry {retry+1}/{max_retries} for {url}") | |
except (exceptions.ConnectionError, exceptions.Timeout) as e: | |
log(1, f"Connection error: {e}. Retry {retry+1}/{max_retries} for {url}") | |
sleep(2 ** retry) | |
log(2, f"Failed to download {url} after {max_retries} retries") | |
return None | |
def getDLCIndexXml(url: str, filename: str): | |
zippedFileData = downloadFile(url, filename) | |
if not zippedFileData: | |
return None | |
with zipfile.ZipFile(io.BytesIO(zippedFileData)) as z: | |
return z.read(z.infolist()[0]) # Read the first file in the zip | |
def getDLCIndexes(): | |
log(0, "Getting DLC Indexes...") | |
try: | |
os.makedirs(os.path.join(OUT_DIR, "dlc"), exist_ok=True) | |
masterIndex = getDLCIndexXml(BASE_URL + "dlc/DLCIndex.zip", "dlc/DLCIndex.zip") | |
if not masterIndex: | |
log(2, "Failed to retrieve or parse DLCIndex.zip") | |
return [] | |
tree = ET.fromstring(masterIndex) | |
return [item.get("index").replace(":", "/") for item in tree.findall("./IndexFile")] | |
except ET.ParseError as e: | |
log(2, f"XML Parse Error: {e}") | |
return [] | |
class DLCIndexParser(ET.XMLParser): | |
def __init__(self): | |
super().__init__() | |
self.tier = "" | |
self.Language = "" | |
self.FileName = "" | |
self.FileSize = 0 | |
def start(self, tag, attrs): | |
if tag == "Package": | |
self.tier = attrs.get("tier", "") | |
self.FileName = attrs.get("FileName", "") | |
self.Language = attrs.get("Language", "") | |
self.FileSize = int(attrs.get("FileSize", 0)) | |
def end(self, tag): | |
if tag == "Package" and self.tier and self.Language: | |
if (self.Language in LANGUAGE or ALL_LANGUAGES) and (self.tier in TIER or ALL_TIERS): | |
folder = self.FileName.split(":")[0] | |
DOWNLOAD_QUEUE.append([ | |
BASE_URL + self.FileName.replace(":", "/"), | |
self.FileName.split(":")[-1], | |
folder, | |
self.FileSize # Add expected file size to the queue | |
]) | |
async def async_download(session, semaphore, url, folder, filename, expected_size): | |
full_path = os.path.join(OUT_DIR, folder, filename) | |
# Skip if file exists and size matches (unless force download is enabled) | |
if os.path.exists(full_path) and not FORCE_DOWNLOAD: | |
actual_size = os.path.getsize(full_path) | |
if expected_size is None or actual_size == expected_size: | |
log(0, f"Skipping existing {filename} (size: {actual_size} bytes)") | |
return | |
else: | |
log(1, f"File {filename} exists but size mismatch (expected: {expected_size}, actual: {actual_size}). Re-downloading...") | |
async with semaphore: | |
for retry in range(3): | |
try: | |
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: | |
if response.status == 200: | |
data = await response.read() | |
os.makedirs(os.path.dirname(full_path), exist_ok=True) | |
async with aiofiles.open(full_path, "wb") as f: | |
await f.write(data) | |
log(0, f"Downloaded {filename} ({len(data)} bytes)") | |
return | |
else: | |
log(1, f"Status {response.status}. Retry {retry+1}/3 for {url}") | |
except (aiohttp.ClientError, asyncio.TimeoutError) as e: | |
log(1, f"Error: {e}. Retry {retry+1}/3 for {url}") | |
await asyncio.sleep(2 ** retry) | |
log(2, f"Failed to download {url}") | |
async def main(): | |
indexes = getDLCIndexes() | |
if not indexes: | |
log(2, "No DLC indexes found. Exiting.") | |
return | |
for index in indexes: | |
try: | |
dlc_data = getDLCIndexXml(BASE_URL + index, f"dlc/{index.split('/')[1]}") | |
if dlc_data: | |
parser = DLCIndexParser() | |
ET.fromstring(dlc_data, parser=parser) | |
log(0, f"Processed {index}") | |
except ET.ParseError as e: | |
log(2, f"Parse error: {e}") | |
if not DOWNLOAD_QUEUE: | |
log(2, "No files to download. Exiting.") | |
return | |
semaphore = asyncio.Semaphore(CONCURRENCY) | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for dl in DOWNLOAD_QUEUE: | |
url, filename, folder, expected_size = dl | |
tasks.append(async_download(session, semaphore, url, folder, filename, expected_size)) | |
await asyncio.gather(*tasks) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment