Created
August 24, 2025 22:33
-
-
Save ryancdotorg/05f7f1bf3f94cd11297fc73641c85473 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from sys import argv, exit, stdin, stdout, stderr, version_info | |
| from functools import partial, lru_cache | |
| eprint = partial(print, file=stderr) | |
| import io | |
| import os | |
| import re | |
| import types | |
| import shutil | |
| import subprocess | |
| from gzip import GzipFile | |
| from subprocess import DEVNULL, STDOUT, PIPE | |
| from tempfile import NamedTemporaryFile | |
| from collections.abc import Iterable | |
| Unmodified = object() | |
| class UnreachableError(RuntimeError): pass | |
| def unreachable(): raise UnreachableError() | |
| FEXTRA = 1 << 2 | |
| FNAME = 1 << 3 | |
| FCOMMENT = 1 << 4 | |
| which = lru_cache(shutil.which) | |
| # 7z a -tgzip -mx=9 -mpass=15 -mfb=258 uhubctl.gz uhubctl | |
| class GzipMetadata(io.BufferedWriter): | |
| def __init__(self, fileobj, filename=Unmodified, mtime=Unmodified, *, comment=Unmodified, buffer_size=io.DEFAULT_BUFFER_SIZE): | |
| if not isinstance(fileobj, io.BufferedWriter): | |
| fileobj = io.BufferedWriter(fileobj, buffer_size=io.DEFAULT_BUFFER_SIZE) | |
| self._fileobj = fileobj | |
| if filename not in (Unmodified, None): | |
| if '\0' in filename: | |
| raise ValueError('filename is not allowed to contain null bytes') | |
| else: | |
| filename = filename.encode() + b'\0' | |
| if comment not in (Unmodified, None): | |
| if '\0' in comment: | |
| raise ValueError('comment is not allowed to contain null bytes') | |
| else: | |
| comment = comment.encode() + b'\0' | |
| if mtime is None: | |
| mtime = 0 | |
| elif mtime is not Unmodified: | |
| mtime = int(mtime) | |
| self._filename, self._mtime, self._comment = filename, mtime, comment | |
| self._gzip_header = bytearray(10) | |
| self._skipping_filename = False | |
| self._skipping_comment = False | |
| self._header_written = 0 | |
| self._extra_remaining = 0 | |
| eprint('meta', self._filename, self._mtime, self._comment) | |
| def _write_meta(self, count=None): | |
| if self._filename: | |
| self._fileobj.write(self._filename) | |
| self._filename = None | |
| if self._comment: | |
| self._fileobj.write(self._comment) | |
| self._comment = None | |
| return count | |
| def write(self, b): | |
| count = 0 | |
| if self._header_written < 10: | |
| rest_start = 10 - self._header_written | |
| head, b = b[:rest_start], b[rest_start:] | |
| for i, c in enumerate(head, start=self._header_written): | |
| if i == 3: | |
| if c & FEXTRA: self._extra_remaining = 0x20000 | |
| if self._filename is not Unmodified: | |
| if c & FNAME: self._skipping_filename = True | |
| if self._filename is not None: c |= FNAME | |
| if self._comment is not Unmodified: | |
| if c & FCOMMENT: self._skipping_comment = True | |
| if self._comment is not None: c |= FCOMMENT | |
| elif self._mtime is not Unmodified and i >= 4 and i <= 7: | |
| c = (self._mtime >> (i-4)*8) & 0xff | |
| self._gzip_header[i] = c | |
| self._header_written += 1 | |
| count += 1 | |
| if self._header_written == 10: | |
| self._fileobj.write(self._gzip_header) | |
| if not b: return count | |
| if self._extra_remaining: | |
| if self._extra_remaining == 0x20000: | |
| self._fileobj.write(b[0:1]) | |
| c, b = b[0], b[1:] | |
| self._extra_remaining = 0x10000 + c | |
| count += 1 | |
| if not b: return count | |
| if self._extra_remaining >= 0x10000: | |
| self._fileobj.write(b[0:1]) | |
| c, b = b[0], b[1:] | |
| self._extra_remaining &= 0xffff | |
| self._extra_remaining += c << 8 | |
| count += 1 | |
| if not b: return count | |
| extra, b = b[:self._extra_remaining], b[self._extra_remaining:] | |
| self._extra_remaining -= len(extra) | |
| count += self._fileobj.write(extra) | |
| if not b: return count | |
| if self._skipping_filename: | |
| try: | |
| # look for null byte | |
| skip = b.index(0) + 1 | |
| count += skip | |
| b = b[skip:] | |
| self._skipping_filename = False | |
| except ValueError: | |
| return count + len(b) | |
| if self._filename not in (None, Unmodified): | |
| self._fileobj.write(self._filename) | |
| self._filename = None | |
| if self._skipping_comment: | |
| try: | |
| # look for null byte | |
| skip = b.index(0) + 1 | |
| count += skip | |
| b = b[skip:] | |
| self._skipping_comment = False | |
| except ValueError: | |
| return count + len(b) | |
| if self._comment not in (None, Unmodified): | |
| self._fileobj.write(self._comment) | |
| self._comment = None | |
| if b: | |
| count += self._fileobj.write(b) | |
| return count | |
| def close(self): return self._fileobj.close() | |
| def detatch(self): return self._fileobj.detatch() | |
| def flush(self): return self._fileobj.flush() | |
| def isatty(self): return self._fileobj.isatty() | |
| def writeable(self): return self._fileobj.writeable() | |
| def fileno(self): raise OSError('fileno not supported') | |
| # def read(self, *a, **k): raise OSError('read not supported') | |
| # def read1(self, *a, **k): raise OSError('read1 not supported') | |
| # def readinto(self, *a, **k): raise OSError('readinto not supported') | |
| # def readinto1(self, *a, **k): raise OSError('readinto1 not supported') | |
| # def readline(self, *a, **k): raise OSError('readline not supported') | |
| # def readlines(self, *a, **k): raise OSError('readlines not supported') | |
| # def seek(self, *a, **k): raise OSError('seek not supported') | |
| # def tell(self): raise OSError('tell not supported') | |
| # def truncate(self, *a, **k): raise OSError('truncate not supported') | |
| # def writelines(self, *a, **k): raise OSError('writelines not supported') | |
| def seekable(self): return False | |
| def readable(self): return False | |
| @property | |
| def closed(self): return self._fileobj.closed | |
| def cat(fileobj, files=None): | |
| if files: | |
| for name in files: | |
| with open(name, 'rb') as f: | |
| shutil.copyfileobj(f, fileobj) | |
| else: | |
| shutil.copyfileobj(stdin.buffer, fileobj) | |
| def isiterable(obj): | |
| if isinstance(obj, (str, bytes)): return False | |
| elif isinstance(obj, Iterable): return True | |
| else: return False | |
| def run(*args, **kwargs): | |
| # command and arguments can be passed as an iterable or a series of arguments | |
| if len(args) == 1 and isiterable(args[0]): args = args[0] | |
| if 'check' not in kwargs: kwargs['check'] = True | |
| #cmd = [] | |
| #for arg in args: | |
| # cmd | |
| if 'stdout' in kwargs: | |
| # subprocess.run writes to the underlying fileno for stdout/stderr, so check | |
| # whether the file object passed has one - if not, capture the output | |
| try: | |
| kwargs['stdout'].fileno() | |
| except OSError as e: | |
| out = kwargs['stdout'] | |
| kwargs.pop('stdout') | |
| kwargs['capture_output'] = True | |
| proc = subprocess.run(list(args), **kwargs) | |
| stderr.buffer.write(proc.stderr) | |
| out.write(proc.stdout) | |
| return proc | |
| return subprocess.run(list(args), **kwargs) | |
| def args_update_level(orig_args, level=9): | |
| new_args = [] | |
| for arg in orig_args: | |
| # if a compression level was specified in an argument, replace it | |
| m = re.fullmatch(r'(-[^0-9]*)([1-9][0-9]*)(.*)$', arg) | |
| if m is not None: | |
| new_args.append(m.group(1) + str(level) + m.group(3)) | |
| else: | |
| new_args.append(arg) | |
| return new_args | |
| def exec_gzip(level=9): | |
| cmd = ['gzip'] + args_update_level(argv[1:]) | |
| eprint('exec_gzip:', 'gzip', cmd) | |
| os.execv(which('gzip'), cmd) | |
| unreachable() | |
| def run_gzip(level=9, **kwargs): | |
| """execute gzip based on command line arguments""" | |
| cmd = ['gzip'] | |
| if 'stdout' in kwargs or 'capture_output' in kwargs: | |
| cmd.append('-c') | |
| cmd += args_update_level(argv[1:]) | |
| eprint('run_gzip:', cmd) | |
| try: run(cmd, **kwargs) | |
| except subprocess.CalledProcessError as e: | |
| # gzip returns 2 to indicate a warning | |
| if e.returncode != 2: | |
| raise e | |
| def run_zopfli(level=15, params=None, files=None): | |
| params = params if params is not None else {} | |
| files = files if files is not None else [] | |
| # build command argument list | |
| cmd = ['zopfli'] | |
| cmd.append(f'--i{comp_level}') | |
| # rather than a bunch of special cases, always output to stdout | |
| cmd.append('-c') | |
| # run the command | |
| if 'c' in params or not files: | |
| # zopfli can't read from stdin, so use a temp file | |
| with NamedTemporaryFile(prefix='.zopfli.', suffix='.cat') as tmp: | |
| cat(tmp, files) | |
| tmp.flush() | |
| cmd.append(tmp.name) | |
| eprint('run_zopfli:', cmd) | |
| run(cmd) | |
| else: | |
| for name in files: | |
| with open(name + params.get('S', '.gz'), 'wb') as out: | |
| # easier to always inject | |
| mtime = None if 'n' in params else os.stat(files[0]).st_mtime | |
| filename = None if 'n' in params else os.path.basename(files[0]) | |
| with GzipMetadata(out, filename, mtime) as injector: | |
| eprint(injector, injector.write, injector._fileobj) | |
| eprint('run_zopfli:', cmd + [name]) | |
| run(cmd + [name], stdout=injector) | |
| # zopfli doesn't delete input files, so that may need to be done | |
| if 'k' not in params: | |
| for name in files: | |
| eprint('unlink:', name) | |
| os.unlink(name) | |
| def run_advdef(level=15, params=None, files=None): | |
| params = params if params is not None else {} | |
| files = files if files is not None else [] | |
| def add_ext(s): | |
| return s + params.get('S', '.gz') | |
| # advdef recompresses existing gzip files, so the file is gzipped first | |
| if 'c' in params or not files: | |
| # advdef can't read from stdin or write to stdout, so this is complicated | |
| with NamedTemporaryFile(files, prefix='.advdef.', suffix='.gz') as tmp: | |
| gzargs = { 'fileobj': tmp, 'mode': 'wb', 'compresslevel': 0 } | |
| if 'n' in params or len(files) > 1: | |
| gzargs['mtime'] = 0 | |
| else: | |
| gzargs['mtime'] = os.stat(files[0]).st_mtime | |
| gzargs['filename'] = os.path.basename(files[0]) | |
| with GzipFile(**gzargs) as gz: | |
| cat(gz, files) | |
| # tmp.close() would also delete the file | |
| tmp.flush() | |
| eprint('run:', ['advdef', '-qz4i', str(comp_level), tmp.name]) | |
| run('advdef', '-qz4i', str(comp_level), tmp.name, stdout=DEVNULL) | |
| with open(tmp.name, 'rb') as tmp: | |
| shutil.copyfileobj(tmp, stdout.buffer) | |
| else: | |
| pre_existing = {} | |
| if 'f' not in params: | |
| for name in map(add_ext, files): | |
| if os.path.exists(name): | |
| pre_existing[name] = os.stat(name).st_mtime | |
| else: | |
| pre_existing[name] = None | |
| run_gzip(execv=False, level=1) | |
| for name in files: | |
| if 'f' not in params: | |
| if pre_existing[name] == os.stat(name).st_mtime: | |
| continue | |
| eprint('run:', ['advdef', '-qz4i', str(comp_level), name]) | |
| run('advdef', '-qz4i', str(comp_level), name) | |
| # the openwrt makefiles use: | |
| # gzip -f -9n -c $@ $(1) | |
| # gzip -9n | |
| # gzip -n -f -S .gzip -9n | |
| # gzip -c -9n | |
| argl = list(argv[1:]) | |
| known_flags = 'Scfknv' | |
| comp_level = None | |
| params = {} | |
| files = [] | |
| # ham-fistedly parse commandline arguments | |
| while len(argl): | |
| arg = argl.pop(0) | |
| if arg == '--': | |
| files += argl | |
| argl = [] | |
| elif arg[0] == '-': | |
| m = re.fullmatch(r'-(-?[a-zA-Z]*)([1-9][0-9]*)?([a-zA-Z]*)', arg) | |
| if m is not None: | |
| flags = m.group(1) + m.group(3) | |
| num = m.group(2) | |
| if num: comp_level = int(num) | |
| for c in flags: | |
| if c not in known_flags: run_gzip(execv=True) | |
| params[c] = argl.pop(0) if c == 'S' else True | |
| else: | |
| files.append(arg) | |
| if comp_level is not None and comp_level > 9: | |
| if which('zopfli'): | |
| run_zopfli(comp_level, params, files) | |
| exit() | |
| elif which('advdef'): | |
| run_advdef(comp_level, params, files) | |
| exit() | |
| run_gzip(execv=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment