Last active
March 15, 2025 14:18
-
-
Save UserUnknownFactor/0bf00b99c8b70110995dd85a6b7abf70 to your computer and use it in GitHub Desktop.
SRPG extractor tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
meta: | |
id: srpg_studio_dts | |
file-extension: dts | |
endian: le | |
title: SRPG Studio Data Format | |
application: SRPG Studio | |
license: MIT | |
encoding: UTF-16LE | |
doc: | | |
SRPG Studio is a game engine for creating Strategy RPGs. | |
This format describes the data.dts file structure which contains | |
game assets, scripts, and project data. | |
seq: | |
- id: header | |
type: header | |
- id: entry_offsets | |
type: u4 | |
repeat: expr | |
repeat-expr: 36 # Based on the number of known entry names | |
types: | |
header: | |
seq: | |
- id: signature | |
contents: "SDTS" | |
doc: File signature | |
- id: is_encrypted | |
type: u4 | |
doc: Flag indicating if the data is encrypted (1) or not (0) | |
- id: version | |
type: u4 | |
doc: Format version | |
- id: unknown1 | |
type: u4 | |
doc: Unknown value | |
- id: archive_flag | |
type: u4 | |
doc: Archive flag | |
- id: project_offset | |
type: u4 | |
doc: Offset to the Project.srpgs data (relative to position 168) | |
entry: | |
doc: Entry containing resources for a specific asset category | |
params: | |
- id: idx | |
type: u4 | |
instances: | |
entry_offset: | |
value: _root.entry_offsets[idx] | |
next_entry_offset: | |
value: idx < 35 ? _root.entry_offsets[idx + 1] : _root.header.project_offset | |
entry_pos: | |
value: entry_offset + 168 | |
body: | |
if: entry_offset != next_entry_offset and entry_offset < next_entry_offset | |
io: _root._io | |
pos: entry_pos | |
size: next_entry_offset - entry_offset | |
type: entry_body | |
entry_body: | |
seq: | |
- id: resource_group_count | |
type: u4 | |
doc: Number of resource groups in this entry | |
- id: resource_group_offsets | |
type: u4 | |
repeat: expr | |
repeat-expr: resource_group_count | |
doc: Offsets to each resource group (relative to entry position) | |
instances: | |
resource_groups: | |
if: resource_group_count > 0 and _index < resource_group_count and resource_group_offsets[_index] < _io.size | |
repeat: expr | |
repeat-expr: resource_group_count | |
io: _root._io | |
pos: _io.pos - (4 + 4 * resource_group_count) + resource_group_offsets[_index] | |
type: resource_group | |
resource_group: | |
seq: | |
- id: name_len | |
type: u4 | |
doc: Length of the resource group name string in bytes | |
- id: name | |
type: str | |
size: name_len - 2 # Subtract 2 for the null terminator | |
encoding: utf-16le | |
doc: Name of the resource group | |
- id: null_terminator | |
size: 2 | |
doc: UTF-16LE null terminator | |
- id: resource_group_id | |
type: u8 | |
doc: Unique identifier for the resource group | |
- id: resource_count | |
type: u4 | |
doc: Number of resources in this group | |
- id: resource_offsets | |
type: u4 | |
repeat: expr | |
repeat-expr: resource_count | |
doc: Offsets to each resource (relative to current position) | |
- id: resource_lengths | |
type: u4 | |
repeat: expr | |
repeat-expr: resource_count | |
doc: Lengths of each resource | |
instances: | |
resources: | |
if: resource_count > 0 and _index < resource_count and resource_lengths[_index] < 10000000 # Sanity check | |
io: _root._io | |
pos: _io.pos + resource_offsets[_index] | |
size: resource_lengths[_index] | |
type: resource | |
repeat: expr | |
repeat-expr: resource_count | |
resource: | |
seq: | |
- id: data | |
size-eos: true | |
doc: Resource data (may be encrypted) | |
script_entry_body: | |
seq: | |
- id: script_count | |
type: u4 | |
doc: Number of scripts | |
- id: scripts | |
type: script | |
repeat: expr | |
repeat-expr: script_count | |
if: script_count < 10000 # Sanity check | |
script: | |
seq: | |
- id: path_len | |
type: u4 | |
doc: Length of the script path string in bytes | |
- id: path | |
type: str | |
size: path_len - 2 # Subtract 2 for the null terminator | |
encoding: utf-16le | |
doc: Path to the script | |
- id: null_terminator | |
size: 2 | |
- id: data_len | |
type: u4 | |
doc: Length of the script data in bytes | |
- id: data | |
size: data_len | |
doc: Script data (UTF-16LE encoded text) | |
instances: | |
entries: | |
doc: All entries in the file | |
repeat: expr | |
repeat-expr: 36 | |
type: entry(_index) | |
script_entry: | |
doc: Special script entry (the last entry) | |
io: _root._io | |
pos: _root.entry_offsets[35] + 168 | |
type: script_entry_body | |
if: _root.entry_offsets[35] != _root.header.project_offset and _root.entry_offsets[35] < _root.header.project_offset | |
project_data: | |
io: _root._io | |
pos: header.project_offset + 168 | |
size-eos: true | |
doc: Project.srpgs file data (may be encrypted) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, struct, hashlib, argparse, re | |
from enum import Enum | |
from math import log10 | |
from Crypto.Cipher import ARC4, ARC2 # NOTE: pip install pycryptodome | |
from Crypto.Util.Padding import pad, unpad | |
#from rc2 import RC2 | |
class SRPGCrypto: | |
SECRET_DUMMY = "_dummy" | |
SECRET_KEYSET = "keyset" | |
XOR_KEY = bytearray(b"T\x94\xc1X\xf4L\x92\x1b\xad\xe0\x9e:I\xd1\xc9\x92") | |
def __init__(self, secret: str, crypt_mode: int = -1): | |
self.secret = secret | |
self.crypt_mode = crypt_mode | |
self.XOR_KEY2 = bytearray(self.XOR_KEY) | |
self.XOR_KEY1 = bytearray(self.XOR_KEY) | |
self.XOR_KEY1[10] = 0x8E | |
self.XOR_KEY2[12] = 73 | |
secret_bytes = secret.encode("utf-16le") | |
hash_bytes = hashlib.md5(secret_bytes[:len(secret)]).digest() | |
self.handler = self._RC4(hash_bytes) if crypt_mode == -1 else self._RC2(hash_bytes) | |
class _RC4: | |
def __init__(self, key: bytes): | |
self.cipher = ARC4.new(key) | |
def decrypt(self, data: bytes) -> bytes: | |
return self.cipher.decrypt(data) | |
def encrypt(self, data: bytes) -> bytes: | |
return self.cipher.encrypt(data) | |
class _RC2: | |
def __init__(self, key: bytes): | |
# this config matches both C# and Windows CryptoAPI for RC2 | |
self.cipher = ARC2.new(key, ARC2.MODE_CBC, iv=b'\0'*8, effective_keylen=128) | |
def decrypt(self, data: bytes) -> bytes: | |
return self.cipher.decrypt(data) | |
def encrypt(self, data: bytes) -> bytes: | |
return self.cipher.encrypt(data) | |
def decrypt(self, data: bytes) -> bytes: | |
return self.handler.decrypt(data) | |
def encrypt(self, data: bytes) -> bytes: | |
return self.handler.encrypt(data) | |
# Data Structures | |
class ObfuscationMode(Enum): | |
NONE = 0 | |
TYPE1 = 1 | |
TYPE2 = 2 | |
class DataStruct: | |
def __init__(self): | |
self.project_begin = 0 | |
self.project_size = 0 | |
self.is_encrypted = False | |
self.scripts_start = None | |
self.fragments = self._init_fragments() | |
self.file_types = self._init_file_types() | |
self.suffix = None | |
@property | |
def project_resource(self): | |
return Resource( | |
path=".", | |
name="project", | |
suffix="no-srpgs", | |
begin=self.project_begin, | |
size=self.project_size | |
) | |
@staticmethod | |
def _init_fragments(): | |
return [ | |
Fragment(path) for path in [ | |
"Graphics/mapchip", | |
"Graphics/charchip", | |
"Graphics/face", | |
"Graphics/icon", | |
"Graphics/motion", | |
"Graphics/effect", | |
"Graphics/weapon", | |
"Graphics/bow", | |
"Graphics/thumbnail", | |
"Graphics/battleback", | |
"Graphics/eventback", | |
"Graphics/screenback", | |
"Graphics/worldmap", | |
"Graphics/eventstill", | |
"Graphics/charillust", | |
"Graphics/picture", | |
"UI/menuwindow", | |
"UI/textwindow", | |
"UI/title", | |
"UI/number", | |
"UI/bignumber", | |
"UI/gauge", | |
"UI/line", | |
"UI/risecursor", | |
"UI/mapcursor", | |
"UI/pagecursor", | |
"UI/selectcursor", | |
"UI/scrollcursor", | |
"UI/panel", | |
"UI/faceframe", | |
"UI/screenframe", | |
"Audio/music", | |
"Audio/sound", | |
"Fonts", | |
"Video" | |
]] | |
@staticmethod | |
def _init_file_types(): | |
return [ | |
FileType(suffix, header) for suffix, header in [ | |
("jpg", [0xFF, 0xD8, 0xFF]), | |
("png", [0x89, 0x50, 0x4E, 0x47]), | |
("bmp", [0x42, 0x4D]), | |
("mp3", [0x49, 0x44, 0x33]), | |
("wav", [0x52, 0x49, 0x46, 0x46]), | |
("ogg", [0x4F, 0x67, 0x67]), | |
("mid", [0x4D, 0x54, 0x68, 0x64]), | |
("woff", [0x77, 0x4F, 0x46, 0x46]), | |
("woff2", [0x77, 0x4F, 0x46, 0x32]), | |
("ttf", [0x00, 0x01, 0x00, 0x00, 0x00]), | |
("otf", [0x4F, 0x54, 0x54, 0x4F, 0x00]) | |
] | |
] | |
class FileType: | |
def __init__(self, suffix, header): | |
self.suffix = suffix | |
self.header = header | |
class Fragment: | |
def __init__(self, path): | |
self.path = path | |
self.begin = 0 | |
self.end = 0 | |
self.resource_group_count = 0 | |
self.resource_groups = [] | |
@property | |
def size(self): | |
return self.end - self.begin + 1 | |
class ResourceGroup: | |
def __init__(self, path, name, begin, end, resource_group_id=0): | |
self.path = path | |
self.name = name | |
self.resource_group_id = resource_group_id | |
self.begin = begin | |
self.end = end | |
self.resources = [] | |
class Resource: | |
def __init__(self, path, name, suffix, begin, size): | |
self.path = path | |
self.name = name | |
self.suffix = suffix | |
self.begin = begin | |
self.size = size | |
@property | |
def end(self): | |
return self.begin + self.size - 1 | |
@property | |
def save_path(self): | |
return os.path.join(f"{self.path}", f"{self.name}.{self.suffix}") | |
# Analyzer class | |
class SRPGAnalyzer: | |
def __init__(self, target): | |
self.dstruct = DataStruct() | |
if not os.path.isfile(target): | |
print(f"File {target} not found...") | |
import sys | |
sys.exit(2) | |
self.source = open(target, "rb") | |
def analysis(self): | |
print("Analysis started...") | |
self._analyze_header() | |
print(f"Analyzing {len(self.dstruct.fragments)} fragments: ") | |
for i, fragment in enumerate(self.dstruct.fragments): | |
self._analyze_fragment(fragment) | |
print("Analysis finished") | |
return self.dstruct | |
def _analyze_header(self): | |
print(" Analyzing header...") | |
self.source.seek(0) | |
buffer = self.source.read(168) | |
file_type = buffer[:4] | |
if file_type == b"SDTS": | |
self._analyze_sdts_header(buffer) | |
elif file_type == b"SRTS": | |
self._analyze_srts_header(buffer) | |
else: | |
raise RuntimeError("This is not a SRPG Studio game!") | |
print(f"Header analyzed SRPG ({'encrypted' if self.dstruct.is_encrypted else 'clear'}/{self.dstruct.version}/{self.dstruct.format}/{self.dstruct.unknown})") | |
def _analyze_sdts_header(self, buffer): | |
self.dstruct.is_encrypted = struct.unpack("<I", buffer[4:8])[0] == 1 | |
self.dstruct.version = struct.unpack("<I", buffer[8:12])[0] | |
self.dstruct.format = struct.unpack("<I", buffer[12:16])[0] | |
self.dstruct.unknown = struct.unpack("<I", buffer[16:20])[0] | |
self.dstruct.project_begin = struct.unpack("<I", buffer[20:24])[0] + 168 | |
self.dstruct.project_size = os.path.getsize(self.source.name) - self.dstruct.project_begin + 1 | |
fragm_n = len(self.dstruct.fragments) + 1 | |
split = struct.unpack(f"<{fragm_n}i", buffer[24:24+fragm_n*4]) | |
for i, fragment in enumerate(self.dstruct.fragments): | |
fragment.begin = split[i] + 168 | |
fragment.end = split[i + 1] + 168 - 1 | |
def _analyze_srts_header(self, buffer): | |
self.dstruct.version = struct.unpack("<I", buffer[4:8])[0] | |
self.dstruct.total_size = struct.unpack("<I", buffer[8:12])[0] | |
self.dstruct.first_folder_offset = struct.unpack("<I", buffer[12:16])[0] | |
folder_offset_list = [struct.unpack("<I", buffer[offset:offset+4])[0] | |
for offset in range(16, self.dstruct.first_folder_offset, 4)] | |
self.folder_offset_list = folder_offset_list | |
self.all_folder_file_offset_list = [ | |
self._get_folder_file_offset_list(folder_offset, buffer) | |
for folder_offset in folder_offset_list | |
] | |
self.all_folder_file_info_list = [ | |
[self._get_file_content(file_offset, buffer) for file_offset in folder_file_offset_list] | |
for folder_file_offset_list in self.all_folder_file_offset_list | |
] | |
def _get_folder_file_offset_list(self, folder_offset, buffer): | |
folder_file_num = struct.unpack("<I", buffer[folder_offset:folder_offset+4])[0] | |
return [folder_offset + struct.unpack("<I", buffer[folder_offset + 4 + i * 4:folder_offset + 8 + i * 4])[0] | |
for i in range(folder_file_num)] | |
def _get_file_content(self, file_offset, buffer): | |
name_len = struct.unpack("<I", buffer[file_offset:file_offset+4])[0] | |
base_name = buffer[file_offset + 4:file_offset + 4 + name_len * 2].decode("utf-16le").rstrip('\0') | |
meta = struct.unpack("<3I", buffer[file_offset + 4 + name_len * 2:file_offset + 4 + name_len * 2 + 12]) | |
num_file = meta[2] | |
content_size_list = struct.unpack(f"<{num_file}I", buffer[file_offset + 4 + name_len * 2 + 12:file_offset + 4 + name_len * 2 + 12 + num_file * 4]) | |
content_offset = file_offset + 4 + name_len * 2 + 12 + num_file * 4 | |
file_list = [] | |
for i in range(num_file): | |
name = self._generate_resource_name(base_name, i, num_file) | |
file_info = { | |
"start_offset": file_offset, | |
"name": name, | |
"meta": meta, | |
"content_size": content_size_list[i], | |
"content_offset": content_offset | |
} | |
content_offset += content_size_list[i] | |
file_list.append(file_info) | |
return file_list | |
def _analyze_fragment(self, fragment): | |
print(f" Analyzing fragment[{fragment.path}]... ", end='') | |
if fragment.size == 0: | |
print("EMPTY") | |
return | |
buffer = bytearray(1024) | |
positions = [] | |
f = self.source | |
f.seek(fragment.begin) | |
count = struct.unpack("<I", f.read(4))[0] | |
fragment.resource_group_count = count | |
#info_length = count * 4 + 4 | |
print(count) | |
while len(positions) < count: | |
read = f.readinto(buffer) | |
for i in range(0, read, 4): | |
position = struct.unpack("<I", buffer[i:i+4])[0] + fragment.begin | |
positions.append(position) | |
if len(positions) == count: | |
break | |
positions.append(fragment.end + 1) | |
resource_groups = [] | |
for i in range(count): | |
group = ResourceGroup(fragment.path, "", positions[i], positions[i + 1] - 1) | |
if group: | |
resource_groups.append(group) | |
fragment.resource_groups = resource_groups | |
self._analysis_resource_group(resource_groups) | |
def _analysis_resource_group(self, resource_groups): | |
print(" Starting resource group analysis... ") | |
f = self.source | |
for i, group in enumerate(resource_groups): | |
f.seek(group.begin) | |
name_length = struct.unpack("<I", f.read(4))[0] | |
group.name = f.read(name_length) | |
group.name = group.name.decode("utf-16le").rstrip('\0') | |
group.resource_group_id = struct.unpack("<Q", f.read(8))[0] | |
resource_count = struct.unpack("<I", f.read(4))[0] | |
resources = [] | |
for i in range(resource_count): | |
resource_name = self._generate_resource_name(group.name, i, resource_count) | |
resource = Resource(group.path, resource_name, None, 0, 0) | |
resource.size = struct.unpack("<I", f.read(4))[0] | |
resources.append(resource) | |
position = f.tell() | |
for resource in resources: | |
resource.begin = position | |
position += resource.size | |
group.resources = resources | |
self.dstruct.scripts_start = f.tell() + resources[-1].size | |
def _generate_resource_name(self, base_name, index, max_index=3): | |
if max_index > 1: | |
return f"{base_name}-{index+1:{int(log10(max_index))+1 if max_index > 10 else 1}}" | |
else: | |
return base_name | |
# Extractor class | |
class SRPGFiles: | |
def __init__(self, output, analyzer, key): | |
self.output = output | |
self.analyzer = analyzer | |
self.source_file = analyzer.source | |
self.key = key | |
def extract(self, target_type="dts"): | |
if target_type == "dts": | |
self.extract_dts() | |
elif target_type == "rts": | |
self.extract_rts() | |
self.source_file.close() | |
def extract_dts(self): | |
print("Starting extraction...") | |
resources = [resource for fragment in self.analyzer.dstruct.fragments | |
if fragment.resource_groups is not None | |
for group in fragment.resource_groups | |
for resource in group.resources] | |
if self.analyzer.dstruct.scripts_start < self.analyzer.dstruct.project_resource.begin: | |
self._extract_scripts() | |
if self.source_file.tell() < self.analyzer.dstruct.project_resource.begin: | |
self._extract_materials() | |
for resource in resources: | |
self._extract_resource(resource) | |
self._extract_resource(self.analyzer.dstruct.project_resource) | |
print("Extraction finished") | |
def extract_rts(self): | |
print("Starting to extract rts...") | |
data = self.source_file.read() | |
for i, folder_file_offset_list in enumerate(self.analyzer.all_folder_file_offset_list): | |
sub_dir_path = os.path.join(self.output, self.analyzer.dstruct.fragments[i].path) | |
os.makedirs(sub_dir_path, exist_ok=True) | |
for file_offset in folder_file_offset_list: | |
file_infos = self.analyzer._get_file_content(file_offset, data) | |
for file_info in file_infos: | |
name = file_info["name"] if file_info["name"] else "this_file_dont_have_name_in_rts_but_why" | |
file_content = data[file_info["content_offset"]:file_info["content_offset"] + file_info["content_size"]] | |
suffix = self._determine_file_suffix(file_content[:8]) | |
if not suffix: | |
suffix = "idk" | |
file_path = os.path.join(sub_dir_path, f"{name}.{suffix}") | |
with open(file_path, "wb") as f: | |
f.write(file_content) | |
print(f" Extracted {file_path}") | |
print("Extraction finished") | |
def _extract_resource(self, resource): | |
print(f"Extracting {resource.name}... ", end='') | |
if not resource.suffix: | |
decryptor = SRPGCrypto(self.key or SRPGCrypto.SECRET_KEYSET) | |
self.source_file.seek(resource.begin) | |
decrypted_data = decryptor.decrypt(self.source_file.read(8)) | |
resource.suffix = self._determine_file_suffix(decrypted_data) | |
save_path = os.path.join(self.output, resource.save_path) | |
if not os.path.isfile(save_path): | |
decryptor = SRPGCrypto(self.key or SRPGCrypto.SECRET_KEYSET) | |
self.source_file.seek(resource.begin) | |
encrypted_data = self.source_file.read(resource.size) | |
decrypted_data = decryptor.decrypt(encrypted_data) | |
os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
save_path = os.path.normpath(save_path) | |
save_path = save_path.replace('\\', '/').replace('*', '..') | |
save_path = re.sub(r"[^{}\w\-_\.;\[\],\+=!$\\\/]", '_', save_path) | |
with open(save_path, "wb") as out: | |
out.write(decrypted_data) | |
print("OK") | |
else: | |
print("SKIPPED") | |
def _extract_scripts(self): | |
self.source_file.seek(self.analyzer.dstruct.scripts_start) | |
scripts_count = struct.unpack("<I", self.source_file.read(4))[0] | |
if scripts_count > 16000: | |
self.source_file.seek(self.source_file.tell()-4) | |
return | |
print(f"Extracting {scripts_count} scripts... ") | |
extracted = 0 | |
for _ in range(0, scripts_count): | |
path_length = struct.unpack("<I", self.source_file.read(4))[0] | |
script_path = self.source_file.read(path_length)[:-2].decode("utf-16le") | |
script_length = struct.unpack("<I", self.source_file.read(4))[0] | |
script_data = self.source_file.read(script_length)[:-2] | |
if script_data[:2] != b"\xff\xfe": | |
script_data = b"\xff\xfe" + script_data | |
save_path = os.path.join(self.output, "Script", script_path) | |
save_path = os.path.normpath(save_path) | |
save_path = save_path.replace('\\', '/').replace('*', '..') | |
save_path = re.sub(r"[^{}\w\-_\.;\[\],\+=!$\\\/]", '_', save_path) | |
print(f" Extracting {script_path} ...", end='') | |
if not os.path.isfile(save_path): | |
os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
with open(save_path, "wb") as out: | |
out.write(script_data) | |
extracted += 1 | |
print(f"OK ({extracted}/{scripts_count})") | |
else: | |
print("SKIPPED") | |
def _extract_materials(self): | |
material_count = struct.unpack("<I", self.source_file.read(4))[0] | |
extracted = 0 | |
skipped = 0 | |
if material_count > 16000: | |
self.source_file.seek(self.source_file.tell()-4) | |
return | |
print(f"Extracting {material_count} materials... ") | |
for _ in range(0, material_count): | |
path_length = struct.unpack("<I", self.source_file.read(4))[0] | |
material_path = self.source_file.read(path_length)[:-2].decode("utf-16le") | |
length = struct.unpack("<I", self.source_file.read(4))[0] | |
data = self.source_file.read(length) | |
save_path = os.path.join(self.output, "Material", material_path) | |
save_path = os.path.normpath(save_path) | |
save_path = save_path.replace('\\', '/').replace('*', '..') | |
save_path = re.sub(r"[^{}\w\-_\.;\[\],\+=!$\\\/]", '_', save_path) | |
print(f" Extracting {save_path} ...", end='') | |
if not os.path.isfile(save_path): | |
os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
with open(save_path, "wb") as out: | |
out.write(data) | |
extracted += 1 | |
print(f"OK ({extracted}/{material_count})") | |
else: | |
print("SKIPPED") | |
def _determine_file_suffix(self, header): | |
for file_type in self.analyzer.dstruct.file_types: | |
if all(header[i] == file_type.header[i] for i in range(len(file_type.header))): | |
return file_type.suffix | |
return "idk" | |
def pack(self, target_type="dts"): | |
if target_type == "dts": | |
self.pack_dts() | |
elif target_type == "rts": | |
self.pack_rts() | |
self.source_file.close() | |
def pack_dts(self, where:str="data.dts_new"): | |
""" | |
Packs the DTS file dynamically, recalculating offsets, sizes, and metadata based on the output directory. | |
""" | |
print("Starting to pack DTS...") | |
with open(where, "wb") as target_file: | |
self._recalculate_fragments() | |
self._write_dts_header(target_file) | |
self._write_fragments(target_file) | |
self._write_project(target_file) | |
print("DTS packing finished successfully!") | |
def _recalculate_fragments(self): | |
current_offset = 168 # DTS header size in bytes | |
for fragment in self.analyzer.dstruct.fragments: | |
fragment_path = os.path.join(self.output, fragment.path) | |
if not os.path.exists(fragment_path) or not os.listdir(fragment_path): | |
# Empty fragment: no files | |
fragment.begin = current_offset | |
fragment.end = current_offset - 1 | |
fragment.size = 0 | |
fragment.resource_group_count = 0 | |
fragment.resource_groups = [] | |
continue | |
# Process each resource group in the fragment | |
fragment.resource_groups = [] | |
fragment.resource_group_count = 0 | |
for group_name in os.listdir(fragment_path): | |
group_path = os.path.join(fragment_path, group_name) | |
if not os.path.isdir(group_path): | |
continue | |
# Create a new resource group | |
group = ResourceGroup(fragment.path, group_name, current_offset, 0) | |
group.resources = [] | |
for file_name in os.listdir(group_path): | |
file_path = os.path.join(group_path, file_name) | |
# Extract resource name and suffix | |
name, suffix = os.path.splitext(file_name) | |
suffix = suffix[1:] # Remove the leading dot | |
# Create a new resource | |
resource = Resource( | |
path=fragment.path, | |
name=name, | |
suffix=suffix, | |
begin=current_offset, | |
size=os.path.getsize(file_path) | |
) | |
# Update offsets | |
resource.end = resource.begin + resource.size - 1 | |
current_offset += resource.size | |
# Add resource to the group | |
group.resources.append(resource) | |
# Finalize group offsets | |
group.end = current_offset - 1 | |
fragment.resource_groups.append(group) | |
fragment.resource_group_count += 1 | |
# Finalize fragment offsets and size | |
fragment.begin = fragment.resource_groups[0].begin if fragment.resource_groups else current_offset | |
fragment.end = fragment.resource_groups[-1].end if fragment.resource_groups else current_offset - 1 | |
fragment.size = fragment.end - fragment.begin + 1 | |
def _write_dts_header(self, target_file): | |
header = bytearray() | |
header.extend(b"SDTS") | |
header.extend(struct.pack("<I", 1 if self.analyzer.dstruct.is_encrypted else 0)) | |
header.extend(struct.pack("<I", 1285)) | |
header.extend(struct.pack("<I", 1)) | |
header.extend(struct.pack("<I", 63)) | |
self.analyzer.dstruct.project_begin = 168 + sum( | |
fragment.size for fragment in self.analyzer.dstruct.fragments | |
) | |
header.extend(struct.pack("<I", self.analyzer.dstruct.project_begin - 168)) | |
# Reserved space for fragment offsets | |
fragment_offsets = [fragment.begin - 168 if fragment.size > 0 else 0 for fragment in self.analyzer.dstruct.fragments] | |
header.extend(struct.pack("<36I", *fragment_offsets)) | |
target_file.write(header) | |
def _write_fragments(self, target_file): | |
for fragment in self.analyzer.dstruct.fragments: | |
if fragment.size == 0: | |
continue | |
# Write the resource groups in the fragment | |
for group in fragment.resource_groups: | |
# Write group metadata | |
group_name_bytes = group.name.encode("utf-16le") + b"\0\0" | |
target_file.write(struct.pack("<I", len(group_name_bytes))) # Name length | |
target_file.write(group_name_bytes) | |
target_file.write(b"\0" * 8) # Reserved | |
target_file.write(struct.pack("<I", len(group.resources))) # Resource count | |
# Write resource sizes | |
for resource in group.resources: | |
target_file.write(struct.pack("<I", resource.size)) | |
# Write resource data | |
for resource in group.resources: | |
file_path = os.path.join(self.output, resource.path, f"{resource.name}.{resource.suffix}") | |
with open(file_path, "rb") as f: | |
file_content = f.read() | |
encryptor = SRPGCrypto(SRPGCrypto.SECRET_KEYSET) | |
encrypted_data = encryptor.encrypt(file_content) | |
target_file.write(encrypted_data) | |
def _write_project(self, target_file): | |
project_path = os.path.join(self.output, "project.no-srpgs") | |
with open(project_path, "rb") as f: | |
project_data = f.read() | |
encryptor = SRPGCrypto(SRPGCrypto.SECRET_KEYSET) | |
encrypted_project_data = encryptor.encrypt(project_data) | |
target_file.write(encrypted_project_data) | |
def pack_rts(self, where:str="runtime.rts_new"): | |
print("Starting to pack rts...") | |
with open(where, "wb") as target_file: | |
# Placeholder for the RTS header | |
header_offset = 16 # Initial header size (e.g., version, size, folder offsets) | |
folder_offsets = [] | |
data = bytearray() | |
# Iterate through all folders and their associated files | |
for i, folder_file_offset_list in enumerate(self.analyzer.all_folder_file_offset_list): | |
sub_dir_path = os.path.join(self.output, self.analyzer.dstruct.fragments[i].path) | |
folder_start_offset = len(data) + header_offset | |
folder_offsets.append(folder_start_offset) | |
# Write the folder metadata | |
folder_data = bytearray() | |
num_files_in_folder = len(folder_file_offset_list) | |
folder_data.extend(struct.pack("<I", num_files_in_folder)) | |
# Track file offsets within this folder | |
file_offsets = [] | |
temp_file_data = bytearray() | |
# Process each file in the folder | |
for file_offset in folder_file_offset_list: | |
file_infos = self.analyzer._get_file_content(file_offset, data) | |
for file_info in file_infos: | |
file_path = os.path.join(sub_dir_path, f"{file_info['name']}.idk") | |
if not os.path.isfile(file_path): | |
print(f"Warning: Missing file {file_path}, skipping...") | |
continue | |
# Read the file content | |
with open(file_path, "rb") as f: | |
file_content = f.read() | |
# Update the file's metadata | |
file_size = len(file_content) | |
file_info["content_size"] = file_size | |
file_info["content_offset"] = len(temp_file_data) # Offset within the folder | |
temp_file_data.extend(file_content) | |
file_offsets.append(len(folder_data)) # Offset in the folder metadata | |
# Write the file metadata (name length, name, meta, size) | |
name_bytes = file_info["name"].encode("utf-16le") | |
folder_data.extend(struct.pack("<I", len(name_bytes) // 2)) # Name length | |
folder_data.extend(name_bytes) # Name | |
folder_data.extend(struct.pack("<3I", *file_info["meta"])) # Meta | |
folder_data.extend(struct.pack("<I", file_size)) # File size | |
data.extend(folder_data) | |
data.extend(temp_file_data) | |
# Write the final header | |
header = bytearray() | |
header.extend(struct.pack("<4s", b"SRTS")) # File type | |
header.extend(struct.pack("<I", 1)) # Version or other metadata | |
header.extend(struct.pack("<I", len(data) + header_offset)) # Total size | |
header.extend(struct.pack("<I", len(header) + len(folder_offsets) * 4)) # First folder offset | |
for offset in folder_offsets: | |
header.extend(struct.pack("<I", offset)) | |
target_file.write(header) | |
target_file.write(data) | |
print("RTS packing finished successfully!") | |
def extract_text(self): | |
""" | |
Extracts <dw_len><\0-terminated utf-16le string> from the binary project file. | |
""" | |
print("Extracting text strings from binary...") | |
self.source_file.seek(self.analyzer.dstruct.project_begin) | |
binary_data = self.source_file.read(self.analyzer.dstruct.project_size) | |
decryptor = SRPGCrypto(SRPGCrypto.SECRET_KEYSET) | |
decrypted_data = decryptor.decrypt(binary_data) | |
text_strings = self.extract_text_from_binary(decrypted_data) | |
if text_strings: | |
output_path = os.path.join(self.output, "extracted_strings.txt") | |
with open(output_path, "w", encoding="utf-8-sig") as f: | |
for string in text_strings: | |
f.write(string[0] + ";;" + str(string[1]) + "\n") | |
print(f"Extracted {len(text_strings)} strings to {output_path}") | |
else: | |
print("No strings found in the binary.") | |
@staticmethod | |
def extract_text_from_binary(data): | |
strings = [] | |
offset = 0 | |
data_len = len(data) | |
while offset < data_len: | |
if offset + 4 > data_len: | |
break | |
dw_len = struct.unpack_from("<I", data, offset)[0] | |
offset += 4 | |
string_byte_len = dw_len | |
if dw_len < 2 or dw_len > 2048 or offset + string_byte_len > data_len: | |
offset -= 3 | |
continue | |
string_data = data[offset:offset + string_byte_len] | |
try: | |
string = string_data.decode("utf-16le").rstrip("\0") | |
if '\0' in string: | |
offset -= 3 | |
continue | |
strings.append([string.replace("\n", "\\n"), offset]) | |
except UnicodeDecodeError: | |
offset -= 3 | |
continue | |
offset += string_byte_len | |
return strings | |
@staticmethod | |
def extract_language(file_name: str): | |
decryptor = SRPGCrypto(SRPGCrypto.SECRET_DUMMY, crypt_mode=0) | |
with open(file_name, "rb") as f: | |
lang_data = bytearray(f.read()) | |
l_key = len(decryptor.XOR_KEY) | |
for i in range(min(len(lang_data), l_key)): | |
lang_data[i] ^= decryptor.XOR_KEY[i % l_key] | |
lang_data = bytearray(decryptor.decrypt(lang_data)) | |
#with open(f"{file_name}_dec.dat", "wb") as f: | |
#f.write(lang_data) | |
count = struct.unpack("<I", lang_data[0:4])[0] | |
lang_id = struct.unpack("<I", lang_data[4:8])[0] | |
strings = [] | |
print(f"Extracting {count} strings if language {lang_id}...") | |
pos = 8 | |
for i in range(count): | |
str_len = struct.unpack("<I", lang_data[pos:pos+4])[0] | |
if count > len(lang_data) or str_len > len(lang_data): | |
break | |
str_data = lang_data[pos+4: pos+4+str_len].decode("utf-16le").rstrip("\0") + "→" | |
#print(str_data) | |
strings.append(str_data) | |
pos += 4 + str_len | |
if strings: | |
out_name = f"{os.path.splitext(file_name)[0]}_strings.csv" | |
with open(out_name, "wb") as f: | |
f.write('\n'.join(strings).encode("utf-8-sig")) | |
print(f"{len(strings) if len(strings) != count else 'All'} strings written to {out_name}...") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="SRPG Studio data extractor and packer") | |
parser.add_argument("-u", "--unpack", action="store_true", help="unpack data") | |
parser.add_argument("-r", "--repack", action="store_true", help="repack data") | |
parser.add_argument("-o", "--output", type=str, default="output", help="output directory") | |
parser.add_argument("-m", "--mode", type=str, choices=["dts", "rts"], default="dts", help="mode to operate on (dts or rts)") | |
parser.add_argument("-t", "--text", action="store_true", help="extract text strings from the project") | |
parser.add_argument("-l", "--lang", type=str, nargs='?', const="language.dat", help="extract text strings; language.dat", metavar="LANG_FILE") | |
parser.add_argument("-k", "--key", type=str, default='', nargs='?', help="extraction key") | |
args = parser.parse_args() | |
target = "data.dts" if args.mode == "dts" or not args.mode else "runtime.rts" | |
if args.unpack: | |
analyzer = SRPGAnalyzer(target) | |
dstruct = analyzer.analysis() | |
extractor = SRPGFiles(args.output, analyzer, args.key) | |
extractor.extract(target_type=args.mode) | |
elif args.text: | |
analyzer = SRPGAnalyzer(target) | |
dstruct = analyzer.analysis() | |
extractor = SRPGFiles("data.dts", args.output, analyzer) | |
extractor.extract_text() | |
elif args.repack: | |
analyzer = SRPGAnalyzer(target) | |
dstruct = analyzer.analysis() | |
extractor = SRPGFiles(args.output, analyzer, args.key) | |
extractor.pack(target_type=args.mode) | |
elif args.lang: | |
SRPGFiles.extract_language(args.lang) | |
else: | |
parser.print_help() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment