Skip to content

Instantly share code, notes, and snippets.

@ryancdotorg
Created August 24, 2025 22:33
Show Gist options
  • Save ryancdotorg/05f7f1bf3f94cd11297fc73641c85473 to your computer and use it in GitHub Desktop.
Save ryancdotorg/05f7f1bf3f94cd11297fc73641c85473 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from sys import argv, exit, stdin, stdout, stderr, version_info
from functools import partial, lru_cache
eprint = partial(print, file=stderr)
import io
import os
import re
import types
import shutil
import subprocess
from gzip import GzipFile
from subprocess import DEVNULL, STDOUT, PIPE
from tempfile import NamedTemporaryFile
from collections.abc import Iterable
Unmodified = object()
class UnreachableError(RuntimeError): pass
def unreachable(): raise UnreachableError()
FEXTRA = 1 << 2
FNAME = 1 << 3
FCOMMENT = 1 << 4
which = lru_cache(shutil.which)
# 7z a -tgzip -mx=9 -mpass=15 -mfb=258 uhubctl.gz uhubctl
class GzipMetadata(io.BufferedWriter):
def __init__(self, fileobj, filename=Unmodified, mtime=Unmodified, *, comment=Unmodified, buffer_size=io.DEFAULT_BUFFER_SIZE):
if not isinstance(fileobj, io.BufferedWriter):
fileobj = io.BufferedWriter(fileobj, buffer_size=io.DEFAULT_BUFFER_SIZE)
self._fileobj = fileobj
if filename not in (Unmodified, None):
if '\0' in filename:
raise ValueError('filename is not allowed to contain null bytes')
else:
filename = filename.encode() + b'\0'
if comment not in (Unmodified, None):
if '\0' in comment:
raise ValueError('comment is not allowed to contain null bytes')
else:
comment = comment.encode() + b'\0'
if mtime is None:
mtime = 0
elif mtime is not Unmodified:
mtime = int(mtime)
self._filename, self._mtime, self._comment = filename, mtime, comment
self._gzip_header = bytearray(10)
self._skipping_filename = False
self._skipping_comment = False
self._header_written = 0
self._extra_remaining = 0
eprint('meta', self._filename, self._mtime, self._comment)
def _write_meta(self, count=None):
if self._filename:
self._fileobj.write(self._filename)
self._filename = None
if self._comment:
self._fileobj.write(self._comment)
self._comment = None
return count
def write(self, b):
count = 0
if self._header_written < 10:
rest_start = 10 - self._header_written
head, b = b[:rest_start], b[rest_start:]
for i, c in enumerate(head, start=self._header_written):
if i == 3:
if c & FEXTRA: self._extra_remaining = 0x20000
if self._filename is not Unmodified:
if c & FNAME: self._skipping_filename = True
if self._filename is not None: c |= FNAME
if self._comment is not Unmodified:
if c & FCOMMENT: self._skipping_comment = True
if self._comment is not None: c |= FCOMMENT
elif self._mtime is not Unmodified and i >= 4 and i <= 7:
c = (self._mtime >> (i-4)*8) & 0xff
self._gzip_header[i] = c
self._header_written += 1
count += 1
if self._header_written == 10:
self._fileobj.write(self._gzip_header)
if not b: return count
if self._extra_remaining:
if self._extra_remaining == 0x20000:
self._fileobj.write(b[0:1])
c, b = b[0], b[1:]
self._extra_remaining = 0x10000 + c
count += 1
if not b: return count
if self._extra_remaining >= 0x10000:
self._fileobj.write(b[0:1])
c, b = b[0], b[1:]
self._extra_remaining &= 0xffff
self._extra_remaining += c << 8
count += 1
if not b: return count
extra, b = b[:self._extra_remaining], b[self._extra_remaining:]
self._extra_remaining -= len(extra)
count += self._fileobj.write(extra)
if not b: return count
if self._skipping_filename:
try:
# look for null byte
skip = b.index(0) + 1
count += skip
b = b[skip:]
self._skipping_filename = False
except ValueError:
return count + len(b)
if self._filename not in (None, Unmodified):
self._fileobj.write(self._filename)
self._filename = None
if self._skipping_comment:
try:
# look for null byte
skip = b.index(0) + 1
count += skip
b = b[skip:]
self._skipping_comment = False
except ValueError:
return count + len(b)
if self._comment not in (None, Unmodified):
self._fileobj.write(self._comment)
self._comment = None
if b:
count += self._fileobj.write(b)
return count
def close(self): return self._fileobj.close()
def detatch(self): return self._fileobj.detatch()
def flush(self): return self._fileobj.flush()
def isatty(self): return self._fileobj.isatty()
def writeable(self): return self._fileobj.writeable()
def fileno(self): raise OSError('fileno not supported')
# def read(self, *a, **k): raise OSError('read not supported')
# def read1(self, *a, **k): raise OSError('read1 not supported')
# def readinto(self, *a, **k): raise OSError('readinto not supported')
# def readinto1(self, *a, **k): raise OSError('readinto1 not supported')
# def readline(self, *a, **k): raise OSError('readline not supported')
# def readlines(self, *a, **k): raise OSError('readlines not supported')
# def seek(self, *a, **k): raise OSError('seek not supported')
# def tell(self): raise OSError('tell not supported')
# def truncate(self, *a, **k): raise OSError('truncate not supported')
# def writelines(self, *a, **k): raise OSError('writelines not supported')
def seekable(self): return False
def readable(self): return False
@property
def closed(self): return self._fileobj.closed
def cat(fileobj, files=None):
if files:
for name in files:
with open(name, 'rb') as f:
shutil.copyfileobj(f, fileobj)
else:
shutil.copyfileobj(stdin.buffer, fileobj)
def isiterable(obj):
if isinstance(obj, (str, bytes)): return False
elif isinstance(obj, Iterable): return True
else: return False
def run(*args, **kwargs):
# command and arguments can be passed as an iterable or a series of arguments
if len(args) == 1 and isiterable(args[0]): args = args[0]
if 'check' not in kwargs: kwargs['check'] = True
#cmd = []
#for arg in args:
# cmd
if 'stdout' in kwargs:
# subprocess.run writes to the underlying fileno for stdout/stderr, so check
# whether the file object passed has one - if not, capture the output
try:
kwargs['stdout'].fileno()
except OSError as e:
out = kwargs['stdout']
kwargs.pop('stdout')
kwargs['capture_output'] = True
proc = subprocess.run(list(args), **kwargs)
stderr.buffer.write(proc.stderr)
out.write(proc.stdout)
return proc
return subprocess.run(list(args), **kwargs)
def args_update_level(orig_args, level=9):
new_args = []
for arg in orig_args:
# if a compression level was specified in an argument, replace it
m = re.fullmatch(r'(-[^0-9]*)([1-9][0-9]*)(.*)$', arg)
if m is not None:
new_args.append(m.group(1) + str(level) + m.group(3))
else:
new_args.append(arg)
return new_args
def exec_gzip(level=9):
cmd = ['gzip'] + args_update_level(argv[1:])
eprint('exec_gzip:', 'gzip', cmd)
os.execv(which('gzip'), cmd)
unreachable()
def run_gzip(level=9, **kwargs):
"""execute gzip based on command line arguments"""
cmd = ['gzip']
if 'stdout' in kwargs or 'capture_output' in kwargs:
cmd.append('-c')
cmd += args_update_level(argv[1:])
eprint('run_gzip:', cmd)
try: run(cmd, **kwargs)
except subprocess.CalledProcessError as e:
# gzip returns 2 to indicate a warning
if e.returncode != 2:
raise e
def run_zopfli(level=15, params=None, files=None):
params = params if params is not None else {}
files = files if files is not None else []
# build command argument list
cmd = ['zopfli']
cmd.append(f'--i{comp_level}')
# rather than a bunch of special cases, always output to stdout
cmd.append('-c')
# run the command
if 'c' in params or not files:
# zopfli can't read from stdin, so use a temp file
with NamedTemporaryFile(prefix='.zopfli.', suffix='.cat') as tmp:
cat(tmp, files)
tmp.flush()
cmd.append(tmp.name)
eprint('run_zopfli:', cmd)
run(cmd)
else:
for name in files:
with open(name + params.get('S', '.gz'), 'wb') as out:
# easier to always inject
mtime = None if 'n' in params else os.stat(files[0]).st_mtime
filename = None if 'n' in params else os.path.basename(files[0])
with GzipMetadata(out, filename, mtime) as injector:
eprint(injector, injector.write, injector._fileobj)
eprint('run_zopfli:', cmd + [name])
run(cmd + [name], stdout=injector)
# zopfli doesn't delete input files, so that may need to be done
if 'k' not in params:
for name in files:
eprint('unlink:', name)
os.unlink(name)
def run_advdef(level=15, params=None, files=None):
params = params if params is not None else {}
files = files if files is not None else []
def add_ext(s):
return s + params.get('S', '.gz')
# advdef recompresses existing gzip files, so the file is gzipped first
if 'c' in params or not files:
# advdef can't read from stdin or write to stdout, so this is complicated
with NamedTemporaryFile(files, prefix='.advdef.', suffix='.gz') as tmp:
gzargs = { 'fileobj': tmp, 'mode': 'wb', 'compresslevel': 0 }
if 'n' in params or len(files) > 1:
gzargs['mtime'] = 0
else:
gzargs['mtime'] = os.stat(files[0]).st_mtime
gzargs['filename'] = os.path.basename(files[0])
with GzipFile(**gzargs) as gz:
cat(gz, files)
# tmp.close() would also delete the file
tmp.flush()
eprint('run:', ['advdef', '-qz4i', str(comp_level), tmp.name])
run('advdef', '-qz4i', str(comp_level), tmp.name, stdout=DEVNULL)
with open(tmp.name, 'rb') as tmp:
shutil.copyfileobj(tmp, stdout.buffer)
else:
pre_existing = {}
if 'f' not in params:
for name in map(add_ext, files):
if os.path.exists(name):
pre_existing[name] = os.stat(name).st_mtime
else:
pre_existing[name] = None
run_gzip(execv=False, level=1)
for name in files:
if 'f' not in params:
if pre_existing[name] == os.stat(name).st_mtime:
continue
eprint('run:', ['advdef', '-qz4i', str(comp_level), name])
run('advdef', '-qz4i', str(comp_level), name)
# the openwrt makefiles use:
# gzip -f -9n -c $@ $(1)
# gzip -9n
# gzip -n -f -S .gzip -9n
# gzip -c -9n
argl = list(argv[1:])
known_flags = 'Scfknv'
comp_level = None
params = {}
files = []
# ham-fistedly parse commandline arguments
while len(argl):
arg = argl.pop(0)
if arg == '--':
files += argl
argl = []
elif arg[0] == '-':
m = re.fullmatch(r'-(-?[a-zA-Z]*)([1-9][0-9]*)?([a-zA-Z]*)', arg)
if m is not None:
flags = m.group(1) + m.group(3)
num = m.group(2)
if num: comp_level = int(num)
for c in flags:
if c not in known_flags: run_gzip(execv=True)
params[c] = argl.pop(0) if c == 'S' else True
else:
files.append(arg)
if comp_level is not None and comp_level > 9:
if which('zopfli'):
run_zopfli(comp_level, params, files)
exit()
elif which('advdef'):
run_advdef(comp_level, params, files)
exit()
run_gzip(execv=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment