Created
March 10, 2022 20:55
-
-
Save ozomer/7d215e44036eb13f8027043c130be007 to your computer and use it in GitHub Desktop.
Embla Time Fixer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Inspired from https://github.com/bwrc/embla-r | |
from datetime import datetime | |
from glob import glob | |
import os.path | |
import re | |
import struct | |
try: | |
from gooey import Gooey as main_wrapper, GooeyParser as ArgumentParser | |
gooey_installed = True | |
except ImportError: | |
from argparse import ArgumentParser | |
main_wrapper = lambda f: f | |
gooey_installed = False | |
SUFFIXES = [".ebm", ".esrc"] | |
EBM_END_OF_SIGNATURE = b"\x1a" | |
EBM_R_TIME = 0x84 | |
DATE_STRUCT_FORMAT = "HBBBBBB" | |
ESRC_DATETIME_PREFIX = b"d\x00a\x00t\x00e\x00t\x00i\x00m\x00e\x00\"\x00>" | |
ESRC_DATETIME_PATTERN = re.compile( | |
"^" + | |
"\x00".join([*"datetime\">"] + | |
[r"\d"] * 4 + [r"-"] + [r"\d"] * 2 + [r"-"] + [r"\d"] * 2 + [r"T"] + | |
[r"\d"] * 2 + [r":"] + [r"\d"] * 2 + [r":"] + [r"\d"] * 2 + [r"."] + | |
[r"\d"] * 6 + [r"<"]) + "$" | |
) | |
class ParsedEsrcFile: | |
def __init__(self, filename): | |
self.filename = filename | |
with open(filename, "rb") as stream: | |
self.buffer = stream.read() | |
@staticmethod | |
def parse_time(record_payload): | |
digits = re.findall(r"\d", record_payload) | |
return datetime(*( | |
int("".join(sub_digits)) for sub_digits in [ | |
digits[0:4], digits[4:6], digits[6:8], | |
digits[8:10], digits[10:12], digits[12:14], digits[14:20] | |
] | |
)) | |
def get_time_records(self): | |
records = [] | |
offset = -1 | |
while True: | |
offset = self.buffer.find(ESRC_DATETIME_PREFIX, offset + 1) | |
if offset < 0: | |
break | |
record_bytes = self.buffer[offset:][:len("datetime\">yyyy-mm-ddTHH:MM:SS.ZZZZZZ<") * 2 - 1] | |
record_payload = "".join(chr(c) for c in record_bytes) | |
if not ESRC_DATETIME_PATTERN.match(record_payload): | |
continue | |
records.append(self.parse_time(record_payload)) | |
return records | |
def add_time(self, time_delta): | |
offset = -1 | |
while True: | |
offset = self.buffer.find(ESRC_DATETIME_PREFIX, offset + 1) | |
if offset < 0: | |
break | |
record_bytes = self.buffer[offset:][:len("datetime\">yyyy-mm-ddTHH:MM:SS.ZZZZZZ<") * 2 - 1] | |
record_payload = "".join(chr(c) for c in record_bytes) | |
if not ESRC_DATETIME_PATTERN.match(record_payload): | |
continue | |
new_time = self.parse_time(record_payload) + time_delta | |
digits = [ord(digit) for digit in new_time.strftime("%Y%m%d%H%M%S%f")] | |
new_record_bytes = bytes( | |
digits.pop(0) if ord("0") <= c <= ord("9") else c | |
for c in record_bytes | |
) | |
self.buffer = self.buffer[:offset] + new_record_bytes + self.buffer[offset + len(new_record_bytes):] | |
def write(self): | |
with open(self.filename, "wb") as stream: | |
stream.write(self.buffer) | |
class ParsedEbmFile: | |
def __init__(self, filename): | |
self.filename = filename | |
with open(filename, "rb") as stream: | |
file_buffer = stream.read() | |
signature_end_offset = file_buffer.find(EBM_END_OF_SIGNATURE) | |
if signature_end_offset < 0: | |
raise Exception("Could not find end of signature") | |
self.signature = file_buffer[0:signature_end_offset] | |
self.endian_buffer = file_buffer[signature_end_offset + 1:][:32] | |
if len(self.endian_buffer) < 32: | |
raise Exception("Endian buffer is too short") | |
if self.endian_buffer[0] == 0xff: | |
# Big Endian | |
self.endian_format = ">" | |
elif self.endian_buffer[0] == 0x00: | |
# Little Endian | |
self.endian_format = "<" | |
else: | |
raise Exception(f"Could not parse endian {self.endian_buffer[0:1]}") | |
self.ebmvertmp = self.endian_buffer[1:][:5] | |
self.id_record_format = "L" if self.ebmvertmp == b"\xff" * 5 else "B" | |
record_header_struct = struct.Struct(f"{self.endian_format}{self.id_record_format}L") | |
self.records = [] | |
offset = signature_end_offset + 1 + 32 | |
while offset < len(file_buffer): | |
record_header_buffer = file_buffer[offset:][:record_header_struct.size] | |
if len(record_header_buffer) < record_header_struct.size: | |
raise Exception("Not enough bytes for record header") | |
(record_id, record_size) = record_header_struct.unpack(record_header_buffer) | |
record_payload = file_buffer[offset + record_header_struct.size:][:record_size] | |
if len(record_payload) < record_size: | |
raise Exception("Not enough bytes for record payload") | |
self.records.append((record_id, record_payload)) | |
offset += record_header_struct.size + record_size | |
def write(self): | |
with open(self.filename, "wb") as stream: | |
stream.write(self.signature) | |
stream.write(EBM_END_OF_SIGNATURE) | |
stream.write(self.endian_buffer) | |
record_header_struct = struct.Struct(f"{self.endian_format}{self.id_record_format}L") | |
for (record_id, record_payload) in self.records: | |
stream.write(record_header_struct.pack(record_id, len(record_payload))) | |
stream.write(record_payload) | |
def parse_time(self, record_payload): | |
(year, month, day, hour, minute, second, sec100) = struct.unpack(f"{self.endian_format}{DATE_STRUCT_FORMAT}", record_payload) | |
return datetime(year, month, day, hour, minute, second, sec100 * 10000) | |
def add_time_to_record_payload(self, record_payload, delta): | |
new_time = self.parse_time(record_payload) + delta | |
return struct.pack(f"{self.endian_format}{DATE_STRUCT_FORMAT}", new_time.year, new_time.month, new_time.day, new_time.hour, new_time.minute, new_time.second, new_time.microsecond // 10000) | |
def get_time_records(self): | |
return [self.parse_time(record_payload) for (record_id, record_payload) in self.records if record_id == EBM_R_TIME] | |
def add_time(self, delta): | |
self.records = [ | |
(record_id, self.add_time_to_record_payload(record_payload, delta) if record_id == EBM_R_TIME else record_payload) | |
for (record_id, record_payload) in self.records | |
] | |
@main_wrapper | |
def main(): | |
parser = ArgumentParser(description="Embla Time Fixer") | |
parser.add_argument_group("Input").add_argument( | |
"-i", "--input", help="path of filename or dirname with embla files", dest="input", required=True, | |
**{"widget": "DirChooser"} if gooey_installed else {} | |
) | |
target_datetime_group = parser.add_argument_group("Change Date and Time (optional)") | |
target_datetime_group.add_argument( | |
"-d", "--date", help="target start date", dest="date", | |
**{"widget": "DateChooser"} if gooey_installed else {} | |
) | |
target_datetime_group.add_argument( | |
"-t", "--time", help="target start time", dest="time", | |
**{"widget": "TimeChooser"} if gooey_installed else {} | |
) | |
args = parser.parse_args() | |
if any(args.input.lower().endswith(suffix) for suffix in SUFFIXES): | |
filenames = [args.input] | |
else: | |
filenames = sum([glob(os.path.join(args.input, f"*{suffix}")) for suffix in SUFFIXES], []) | |
target_date = None | |
if args.time and not args.date: | |
raise Exception("Time can be defined only when date is defined") | |
if args.date: | |
print(f"Parsing target date: {args.date}") | |
date_parts = [int(part) for part in re.split(r"\D", args.date) if part != ""] | |
if len(date_parts) != 3: | |
raise Exception("Date must specify year, month, day") | |
if args.time: | |
print(f"Parsing target time: {args.time}") | |
time_parts = [int(part) for part in re.split(r"\D", args.time) if part != ""] | |
if len(time_parts) > 3: | |
raise Exception("Time must must have up to 3 numbers: hour, minute, second") | |
else: | |
time_parts = [] | |
target_date = datetime(*date_parts, *time_parts) | |
print(f"Parsed date: {target_date}") | |
if len(filenames) == 0: | |
raise Exception("Could not find any .ebm or .esrc files") | |
parsed_files = [] | |
min_time_total = None | |
for filename in filenames: | |
print(f"Parsing {filename}") | |
if filename.lower().endswith(".ebm"): | |
parsed_file = ParsedEbmFile(filename) | |
elif filename.lower().endswith(".esrc"): | |
parsed_file = ParsedEsrcFile(filename) | |
else: | |
print(" Skipping file, unknown filename extension") | |
time_records = parsed_file.get_time_records() | |
min_time_record = min(time_records) if len(time_records) > 0 else None | |
print(f" {len(time_records)} time-records, min: {min_time_record} ") | |
if min_time_record is not None: | |
if min_time_total is None: | |
min_time_total = min_time_record | |
min_time_total = min([min_time_total, min_time_record]) | |
parsed_files.append(parsed_file) | |
print(f"min_time_total: {min_time_total}") | |
if target_date: | |
time_delta = target_date - min_time_total | |
print(f"time delta: {time_delta}") | |
for parsed_file in parsed_files: | |
print(f"adding time to {parsed_file.filename}") | |
parsed_file.add_time(time_delta) | |
for parsed_file in parsed_files: | |
print(f"writing {parsed_file.filename}") | |
parsed_file.write() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment