Skip to content

Instantly share code, notes, and snippets.

@chapmanjacobd
Created September 13, 2025 21:30
Show Gist options
  • Save chapmanjacobd/e557c6df92ea9cb0886986f5f7c1ff44 to your computer and use it in GitHub Desktop.
Save chapmanjacobd/e557c6df92ea9cb0886986f5f7c1ff44 to your computer and use it in GitHub Desktop.
kinda sad I can't use this... too many complications: post-actions, printer can mark rows deleted, etc
class ArgparseDBsOrPaths(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if values is None:
return None
databases = []
if values == [STDIN_DASH]:
print(f"{parser.prog}: Reading from stdin...", file=sys.stderr)
stdin_paths = sys.stdin.readlines()
paths = None
if not stdin_paths or (len(stdin_paths) == 1 and stdin_paths[0].strip() == ""):
paths = None
else:
paths = [s.strip() for s in stdin_paths]
setattr(namespace, "databases", None)
setattr(namespace, self.dest, paths)
return
split_index = 0
for i, val in enumerate(values):
if is_sqlite(val):
databases.append(val)
split_index = i + 1
else:
break
setattr(namespace, "database", True)
setattr(namespace, "databases", databases if databases else None)
setattr(namespace, self.dest, values[split_index:] if split_index < len(values) else None)
## I guess instead do something like this:
function scandb --argument-names path
lb-dev fsadd --filesystem ~/(path basename -- $path).db $path
lb-dev fsadd --video --audio ~/(path basename -- $path).wt.db $path
end
parallel scandb ::: *.db
OR
parallel lb du {} --parents -D=-7 --folder-size=+500G ::: ~/lb/fs/d*.db
@chapmanjacobd
Copy link
Author

chapmanjacobd commented Sep 13, 2025

This probably would work mostly fine if I had used ProcessPoolExecutor:

    class TableProxy:
        def __init__(self, manager: "ThreadedDBManager", table_name: str):
            self.manager = manager
            self.table_name = table_name

        def detect_fts(self) -> bool:
            def task(db_path):
                db = DB(db_path, tracer=tracer, **kwargs)
                return db[self.table_name].detect_fts()  # type: ignore

            futures = {db_path: self.manager.thread_pool.submit(task, db_path) for db_path in self.manager.db_paths}
            return all(fut.result() for fut in futures.values())

        def __getattr__(self, name):
            def task(db_path):
                db = DB(db_path, tracer=tracer, **kwargs)
                return getattr(db[self.table_name], name)()

            futures = {db_path: self.manager.thread_pool.submit(task, db_path) for db_path in self.manager.db_paths}
            return {db_path: fut.result() for db_path, fut in futures.items()}

    class ThreadedDBManager:
        def __init__(self, db_paths, max_threads=8):
            self.db_paths = db_paths
            self.thread_pool = ThreadPoolExecutor(max_workers=max_threads)

        def __getattr__(self, name):
            def wrapper(*args, **kwargs):
                def task(db_path):
                    db = DB(db_path, tracer=tracer, **kwargs)
                    return getattr(db, name)(*args, **kwargs)

                futures = {db_path: self.thread_pool.submit(task, db_path) for db_path in self.db_paths}
                results = {}
                for db_path, fut in futures.items():
                    result = fut.result()
                    if name == "query":
                        result = list(result)
                    results[db_path] = result
                if name in ("execute", "query"):
                    flat = []
                    for value in results.values():
                        flat.extend(value)
                    return flat
                else:
                    return results

            return wrapper

        def __getitem__(self, table_name: str) -> TableProxy:
            return TableProxy(self, table_name)

        def execute(self, sql: str, params: Iterable | dict | None = None):
            return self.__getattr__("execute")(sql, params)

        def query(self, sql: str, params: Iterable | dict | None = None):
            return self.__getattr__("query")(sql, params)

        def __del__(self):
            self.thread_pool.shutdown(wait=True)

But it's making the code messy and hard to reason about

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment