Last active
October 9, 2024 10:35
-
-
Save pepoluan/ad9d9f9f818361ccaf07a4af541603ba to your computer and use it in GitHub Desktop.
sponge utility -- in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# SPDX-License-Identifier: MPL-2.0 | |
# This Source Code Form is subject to the terms of the Mozilla Public | |
# License, v. 2.0. If a copy of the MPL was not distributed with this | |
# file, You can obtain one at https://mozilla.org/MPL/2.0/. | |
from __future__ import annotations | |
import argparse | |
import sys | |
import tempfile | |
from io import BytesIO | |
from pathlib import Path | |
from typing import TYPE_CHECKING, BinaryIO, Protocol, cast | |
if TYPE_CHECKING: | |
from contextlib import AbstractContextManager | |
DEFA_CHUNK_SIZE: int = 1024 * 1024 | |
class _Options(Protocol): | |
chunksize: int | |
no_clobber: bool | |
# noinspection PyUnresolvedReferences | |
tmpfile: Ellipsis | None | str | |
target: Path | |
def _get_options() -> _Options: | |
parser = argparse.ArgumentParser( | |
description="Soaks up all input (from STDIN) before saving to file (or re-emitting to STDOUT)", | |
epilog=( | |
"WARNING: If size of input is particularly large, consider using --tmpfile or you might run out of memory" | |
), | |
) | |
parser.add_argument( | |
"--chunksize", | |
"-c", | |
metavar="BYTES", | |
type=int, | |
default=DEFA_CHUNK_SIZE, | |
help=f"Chunk size when using a temp file. Default = {DEFA_CHUNK_SIZE:_}", | |
) | |
parser.add_argument( | |
"--no-clobber", | |
"-n", | |
action="store_true", | |
default=False, | |
help="Do not clobber existing file. No effect if target is STDOUT", | |
) | |
parser.add_argument( | |
"--tmpfile", | |
"-t", | |
metavar="FILE", | |
nargs="?", | |
default=Ellipsis, | |
help=( | |
"Use a tempfile to absorb stdin. Can optionally specify name of temp file. NOTE: If this option is " | |
"specified without a filename, you MUST specify '--' before specifying the target file. WARNING: If " | |
"tempfile exists, it will clobber the contents of the file!" | |
), | |
) | |
parser.add_argument("target", type=Path, nargs="?", help="Target file. If not specified, or is '-', dump to stdout") | |
return cast(_Options, parser.parse_args()) | |
def _absorb(sponge: BinaryIO) -> None: | |
inp = sys.stdin.buffer | |
while True: | |
incoming = inp.read(8192) | |
if not incoming: | |
break | |
sponge.write(incoming) | |
def _main(opts: _Options) -> None: | |
if opts.target and opts.target.name != "-": | |
if opts.no_clobber and opts.target.exists(): | |
print(f"ERROR: Target file exists: '{opts.target}'", file=sys.stderr, flush=True) | |
sys.exit(1) | |
# We annot define _output_context directly at this point, because if we do that the file will be truncated, | |
# thus defeating the purpose of sponge when both stdin and stdout are backed by the same file. | |
# E.g.: | |
# awk '{ some_awk_program }' file1 | sponge file1 | |
def _output_context() -> AbstractContextManager: | |
return Path(opts.target).open("w+b") # noqa: SIM115 | |
else: | |
def _output_context() -> AbstractContextManager: | |
return sys.stdout.buffer | |
if opts.tmpfile is Ellipsis: | |
# Ellipsis is not the same as user specifying "..." on the command line | |
# It is an object that users are unable to specify as an option, acting as the default value for the option | |
# if user does not use the "--tmpfile" option. | |
sponge_context = BytesIO() | |
elif opts.tmpfile is None: | |
# Auto-generated file | |
sponge_context = tempfile.NamedTemporaryFile(delete=False) | |
else: | |
sponge_context = Path(opts.tmpfile).open("w+b") # noqa: SIM115 | |
with sponge_context as sponge: | |
_absorb(sponge) | |
with _output_context() as fout: | |
if opts.tmpfile is Ellipsis: | |
# Use fast .getvalue() method if using in-memory buffer | |
fout.write(sponge.getvalue()) | |
else: | |
sponge.seek(0) | |
while True: | |
outgoing = sponge.read(opts.chunksize) | |
if not outgoing: | |
break | |
fout.write(outgoing) | |
if opts.tmpfile is None: | |
Path(sponge_context.name).unlink(missing_ok=True) | |
if __name__ == "__main__": | |
options = _get_options() | |
_main(options) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A minor change would be to assign a function to
sponge_context
, so we can change the firstwith
block towith sponge_context()
, making it more uniform with the innerwith
block.