Last active
October 22, 2020 07:52
-
-
Save alxbl/5e66dd069c11841fff13ff77c7ec047e to your computer and use it in GitHub Desktop.
Encoder for TES Arena's MIF file format (type08) records
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This script provides a rudimentary data encoding algorithm for | |
# type08 records in MIF files. | |
# | |
# It's not exactly Huffman-coding or adaptative Huffman coding, but it | |
# behaves very similar. The developers call it "Delta Mode" | |
# | |
# @author alxbl ([email protected]) | |
# | |
# https://en.wikipedia.org/wiki/File:Huffman_coding_visualisation.svg | |
from sys import exit | |
from io import BytesIO | |
from struct import unpack, pack | |
# Valid chunk headers. | |
HEADERS = ["MAP1", "MAP2", "FLOR"] | |
# fmt: off | |
# Lookup tables for history buffer offsets. | |
offset_hi = [ | |
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, | |
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, | |
0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, | |
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, | |
0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, | |
0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, | |
0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, | |
0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x09, 0x09, 0x09, 0x09, 0x09, 0x09, 0x09, 0x09, | |
0x0A, 0x0A, 0x0A, 0x0A, 0x0A, 0x0A, 0x0A, 0x0A, 0x0B, 0x0B, 0x0B, 0x0B, 0x0B, 0x0B, 0x0B, 0x0B, | |
0x0C, 0x0C, 0x0C, 0x0C, 0x0D, 0x0D, 0x0D, 0x0D, 0x0E, 0x0E, 0x0E, 0x0E, 0x0F, 0x0F, 0x0F, 0x0F, | |
0x10, 0x10, 0x10, 0x10, 0x11, 0x11, 0x11, 0x11, 0x12, 0x12, 0x12, 0x12, 0x13, 0x13, 0x13, 0x13, | |
0x14, 0x14, 0x14, 0x14, 0x15, 0x15, 0x15, 0x15, 0x16, 0x16, 0x16, 0x16, 0x17, 0x17, 0x17, 0x17, | |
0x18, 0x18, 0x19, 0x19, 0x1A, 0x1A, 0x1B, 0x1B, 0x1C, 0x1C, 0x1D, 0x1D, 0x1E, 0x1E, 0x1F, 0x1F, | |
0x20, 0x20, 0x21, 0x21, 0x22, 0x22, 0x23, 0x23, 0x24, 0x24, 0x25, 0x25, 0x26, 0x26, 0x27, 0x27, | |
0x28, 0x28, 0x29, 0x29, 0x2A, 0x2A, 0x2B, 0x2B, 0x2C, 0x2C, 0x2D, 0x2D, 0x2E, 0x2E, 0x2F, 0x2F, | |
0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, 0x3B, 0x3C, 0x3D, 0x3E, 0x3F | |
] | |
offset_lo = [ | |
0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, | |
0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, 0x03, | |
0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, | |
0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, | |
0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, | |
0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, | |
0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, | |
0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, | |
0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, | |
0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, | |
0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, | |
0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, | |
0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, | |
0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, | |
0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, 0x07, | |
0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08, 0x08 | |
] | |
# fmt: on | |
class Type08: | |
""" | |
Encapsulate the codec state. | |
""" | |
def __init__(self): | |
# Initialize the lookup tree | |
self.freqs = self._gen_freq() | |
self.tree = self._gen_tree() | |
self.indices = self._gen_indices() | |
self.nfreqs = len(self.freqs) | |
self.nnodes = len(self.tree) | |
self.nindices = len(self.indices) | |
# Initialize blank history. | |
self.history = bytearray(b"\x20" * 0x1000) | |
self.hpos = 0 | |
# Bitstream | |
self.bits = 0 | |
self.avail = 0 | |
def decompress(self, data: BytesIO, rlen: int) -> bytes: | |
""" | |
Ported to Python from | |
https://github.com/afritz1/OpenTESArena/blob/master/OpenTESArena/src/Assets/Compression.h | |
""" | |
pos = 0 | |
dst = [] | |
while pos < rlen: | |
# STEP 1: Read a prefix-code from the bitstream. | |
node = self.tree[626] # The root of the tree. | |
# Traverse the prefix tree according to the code bits until a leaf is found. | |
while node < 627: # 0-626 are internal nodes, anything above 626 is a leaf. | |
branch = self._read(data, 1) | |
node = self.tree[node + branch] | |
code = node - 627 | |
# STEP 2: Process the code retrieved from the prefix tree. | |
if code <= 0xFF: | |
# 2.1. Code represents one byte of data. | |
dst.append(code) | |
self.history[self.hpos & 0x0FFF] = code | |
self.hpos += 1 | |
pos += 1 | |
else: | |
# 2.2. This code encodes a history lookup. | |
# The next 8 bits encode the history offset and the number of bytes to output. | |
offset = self._read(data, 8) | |
hi = offset_hi[offset] << 6 | |
# Read the required number of bits for the low part of the offset. | |
lo = offset | |
nbits = offset_lo[offset] - 2 | |
# Only keep 6 lowest bits. | |
lo = ((lo << nbits) | self._read(data, nbits)) & 0x3F | |
start = self.hpos - (hi | lo) - 1 | |
nbytes = code - 256 + 3 | |
for i in range(nbytes): | |
b = self.history[(start + i) & 0x0FFF] | |
dst.append(b) | |
self.history[self.hpos & 0x0FFF] = b | |
self.hpos += 1 | |
pos += 1 | |
# STEP 3: Update the occurence count of the code and rebalance the tree. | |
self._update(node) | |
return bytes(dst) | |
def compress(self, data: bytes) -> bytes: | |
""" | |
Compress a raw stream using Type08. | |
The algorithm is an hybrid between LZ and some form of adaptative | |
coding. Here's how it works, generally speaking: | |
The symbol size is 314, symbols 0..255 are for direct byte | |
mapping. There is a 4096 history buffer where raw bytes are | |
written to. If a symbol above 255 is encountered, the decoder must | |
process the symbol along with 8 additional bits of data that | |
encode an offset in the history table along with a number of bytes | |
to copy from the history table, similar to LZ runs. | |
For the adaptative coding, the prefix-tree is updated every after | |
each symbol is encountered, including history-lookup symbols that | |
the encoder outputs. | |
""" | |
dst = BytesIO() | |
rlen = len(data) | |
pos = 0 | |
i = 0 | |
while pos < rlen: | |
byte = data[pos] | |
# STEP 1: Find the longest matching run in the history buffer. | |
start, size = self._scan(data, pos) | |
if size < 3: # Not enough data for a run. | |
symbol = byte + 627 | |
size = 1 | |
else: | |
# Encode the run length in the symbol (256 - 313) | |
symbol = size - 3 + 256 + 627 | |
# STEP 2: Lookup the prefix code for the symbol and output it. | |
code, codesz = self._lookup(symbol) | |
self._write(dst, code, codesz) | |
# STEP 3. Encode history run offset. | |
if size > 1: | |
offset = (self.hpos & 0xFFF) - start - 1 | |
self._write_offset(dst, offset) | |
# STEP 4: Update history buffer. | |
for _ in range(size): | |
self.history[self.hpos & 0x0FFF] = data[pos] | |
self.hpos += 1 | |
pos += 1 | |
i += 1 | |
# Write any leftover bits that don't make up a byte. | |
self._flush(dst) | |
return dst.getvalue() | |
def _read(self, src: BytesIO, n: int) -> int: | |
""" | |
Read a number of bits from a compressed bitstream. | |
The bitstream is packed left to right. | |
""" | |
# Ensure we have enough bits in the work area. | |
result = 0 | |
while n > 0: | |
# Ensure that the bit buffer is filled. | |
while self.avail < 9: | |
b = src.read(1) | |
if b: | |
self.bits |= b[0] << (8 - self.avail) | |
self.avail += 8 | |
# Consume the next bit and append it to the result. | |
result <<= 1 | |
result |= (self.bits >> 15) & 1 | |
self.bits = (self.bits << 1) & 0xFFFF | |
self.avail -= 1 | |
n -= 1 | |
return result | |
def _write(self, s: BytesIO, byte: int, n: int, ltr=False): | |
""" | |
Write bits to the stream. | |
The bits are left to right if `ltr` is True, otherwise they are read from right to left, | |
as this is the natural way that `_lookup` will return them. | |
""" | |
while n > 0: | |
# Flush if needed. | |
if self.avail > 8: | |
s.write(bytes([self.bits >> 8])) | |
self.bits = (self.bits << 8) & 0xFFFF | |
self.avail -= 8 | |
# Append bit to the stream. | |
if ltr: | |
bit = (byte >> (n - 1)) & 1 | |
else: | |
bit = byte & 1 | |
byte >>= 1 | |
self.bits |= bit << (15 - self.avail) | |
self.avail += 1 | |
n -= 1 | |
def _flush(self, s: BytesIO): | |
# FIXME: Throw on write after flushing. | |
while self.avail > 0: | |
s.write(bytes([self.bits >> 8])) | |
self.bits = (self.bits << 8) & 0xFFFF | |
self.avail -= 8 | |
def _write_offset(self, dst: BytesIO, offset: int): | |
hi = offset & 0xFC0 | |
lo = offset & 0x3F | |
idx = offset_hi.index(hi >> 6) | |
used = offset_lo[idx] # Number of bits used to encode the high bit index. | |
left = 8 - used # Number of bits left in `idx` to encode low bits. | |
encodable = lo >> (6 - left) | |
idx += encodable | |
self._write(dst, idx, 8, ltr=True) | |
self._write(dst, lo, 6 - left, ltr=True) | |
def _scan(self, data: bytes, pos: int) -> (int, int): | |
mstart = 0 | |
msize = 1 # Keep track of optimal result. | |
dlen = len(data) | |
# Optimized version: Start from hpos-1 and look for the first byte, | |
# and try to move as far as possible. | |
for h in range(self.hpos & 0xFFF, 0, -1): | |
h -= 1 | |
# Find the latest occurence of the target byte in the history buffer. | |
if h < 0 or self.history[h] != data[pos]: | |
continue | |
# Now move forward, matching as many bytes as possible. | |
start = h | |
size = 0 | |
delta = self.hpos - start | |
i = 0 | |
while ( | |
pos + i < dlen | |
and data[pos + i] == self.history[(start + (i % delta)) & 0xFFF] | |
and size < 60 | |
and start + size < (self.hpos & 0xFFF) # Don't go past hpos. | |
): | |
i += 1 | |
size += 1 | |
if size > msize: | |
msize = size | |
mstart = start | |
return mstart, msize | |
def _update(self, node: int): | |
""" | |
Increment the frequencies for the given node. | |
This sorts the tree and must be done after each symbol | |
is processed to ensure that the prefix-codes are adjusted. | |
""" | |
freq_idx = self.indices[node] # Look up node index in the frequency map | |
while True: | |
self.freqs[freq_idx] += 1 | |
freq = self.freqs[freq_idx] | |
next_idx = freq_idx + 1 | |
if next_idx < self.nfreqs and self.freqs[next_idx] < freq: | |
# Find the first frequency that is less or equal to freq | |
while next_idx < self.nfreqs and self.freqs[next_idx] < freq: | |
next_idx += 1 | |
next_idx -= 1 | |
# Swap the current frequency with the one immediately before. | |
self.freqs[freq_idx] = self.freqs[next_idx] | |
self.freqs[next_idx] = freq | |
# Swap the tree's nodes. | |
tmp = self.tree[next_idx] | |
self.tree[next_idx] = self.tree[freq_idx] | |
self.tree[freq_idx] = tmp | |
# Update index lookup table | |
idx = self.tree[next_idx] | |
self.indices[idx] = next_idx | |
if idx < 627: | |
self.indices[idx + 1] = next_idx | |
idx = self.tree[freq_idx] | |
self.indices[idx] = freq_idx | |
if idx < 627: | |
self.indices[idx + 1] = freq_idx | |
freq_idx = next_idx | |
# Recurse | |
freq_idx = self.indices[freq_idx] | |
if freq_idx == 0: | |
break | |
def _lookup(self, symbol: int) -> (int, int): | |
""" | |
Lookup the prefix code for a symbol. | |
The code bits are in reverse order (`87654321`). | |
:return: (code, code_size) | |
""" | |
node = self.tree.index(symbol) | |
code = 0 # Bits that make up the code. | |
codesz = 0 # Number of bits that make up the prefix code. | |
# Traverse the tree from the leaf to the root node, | |
# building a prefix code along the way. | |
# | |
# There is probably a clever way to lookup the parent | |
# in O(1) that I'm too lazy to figure out. | |
while node != 626: | |
# The code size cannot be more than log2(n) = 10 bits (n == 314) | |
code <<= 1 | |
code |= node & 1 | |
codesz += 1 | |
# Locate the parent node. | |
node = self.tree.index(node & ~1) | |
# Update the occurence count for the symbol. | |
self._update(symbol) | |
return code, codesz | |
def _gen_indices(self): | |
nodes = [(x >> 1) + 314 for x in range(626)] | |
nodes.append(0) # idx[626] | |
nodes += [x for x in range(941 - len(nodes))] | |
return nodes | |
def _gen_tree(self): | |
tree = [x + 627 for x in range(314)] | |
tree += [x * 2 for x in range(627 - len(tree))] | |
return tree | |
def _gen_freq(self): | |
freqs = [1 for _ in range(314)] | |
n = 0 | |
for i in range(314, 627): | |
val = freqs[n] + freqs[n + 1] | |
n += 2 | |
freqs.append(val) | |
return freqs | |
def main(): | |
import argparse | |
p = argparse.ArgumentParser() | |
p.add_argument("-c", "--compress", type=str, help="The file to compress") | |
p.add_argument( | |
"-d", "--decompress", type=str, help="The file to decompress. Also requires -s" | |
) | |
p.add_argument( | |
"-t", | |
"--type", | |
type=str, | |
choices=HEADERS, | |
default=HEADERS[0], | |
help="The type of chunk. Must be specified when compressing.", | |
) | |
p.add_argument("-o", "--output", help="The destination file", required=True) | |
args = p.parse_args() | |
if (args.compress and args.decompress) or ( | |
not args.compress and not args.decompress | |
): | |
print("Error: Specify exactly one of --decompress or --compress.") | |
exit(2) | |
data = b"" | |
codec = Type08() | |
if args.compress: | |
if not args.type: | |
print("Must specify --type when compressing.") | |
exit(2) | |
with open(args.compress, "rb") as infile: | |
rdata = infile.read() | |
data = codec.compress(rdata) | |
print(f"Decompressed Size = {len(rdata):04X} ({len(rdata)})") | |
print(f"Compressed Size = {len(data):04X} ({len(data)})") | |
data = ( | |
args.type.encode() | |
+ pack("<H", len(data) + 2) | |
+ pack("<H", len(rdata)) | |
+ data | |
) | |
elif args.decompress: | |
with open(args.decompress, "rb") as infile: | |
zdata = infile.read() | |
chunk_type = zdata[:4].decode() | |
if chunk_type not in HEADERS: | |
print("Invalid Chunk: Must start with one of: " + " ".join(HEADERS)) | |
exit(2) | |
(clen,) = unpack("<H", zdata[4:6]) | |
(rlen,) = unpack("<H", zdata[6:8]) | |
# fmt: off | |
zdata = zdata[8:8 + clen - 2] | |
# fmt: on | |
data = codec.decompress(BytesIO(zdata), rlen) | |
print(f"Compressed Size = {len(zdata):04X} ({clen - 2})") | |
print(f"Decompressed Size = {len(data):04X} ({len(data)})") | |
print(f"Writing {args.output}") | |
with open(args.output, "wb") as o: | |
o.write(data) | |
print("OK") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment