Skip to content

Instantly share code, notes, and snippets.

@redraw
Created May 24, 2025 20:41
Show Gist options
  • Save redraw/dac34b8ae283c491e4e4b0979c4e2dd5 to your computer and use it in GitHub Desktop.
Save redraw/dac34b8ae283c491e4e4b0979c4e2dd5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env -S uv run -s
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "aiolimiter",
# "httpx",
# "ipython",
# "tqdm",
# ]
# ///
import mimetypes
import argparse
import logging
from datetime import datetime
import sys
import os
import re
import asyncio
import httpx
from tqdm.auto import tqdm
from aiolimiter import AsyncLimiter
def sanitize(name: str) -> str:
return re.sub(r"[^\w.-]", "_", name)
def save_response(req, resp, save_path, timestamp):
ct = resp.headers.get("content-type", "").split(";")[0].strip()
ext = mimetypes.guess_extension(ct) or ".bin"
path = sanitize(req.url.path.replace("/", "_") or "root")
filename = f"{req.url.host}_{path}_{timestamp}{ext}"
filepath = os.path.join(save_path, filename)
with open(filepath, "wb") as f:
f.write(resp.content)
logging.info(f"Saved response to {filepath}")
async def fetch(req: httpx.Request, client: httpx.AsyncClient, limiter: AsyncLimiter, retries=5, backoff=1, save_path=None,):
for i in range(retries):
req_start = datetime.now()
async with limiter:
resp = await client.send(req)
req_end = datetime.now()
duration = (req_end - req_start).total_seconds()
logging.info(f"{req.method} {req.url} {resp.status_code} {duration:.2f}s")
if resp.status_code < 400:
if save_path:
save_response(req, resp, save_path, int(req_start.timestamp()))
return resp
elif resp.status_code == 404:
break
else:
await asyncio.sleep(backoff * 2**i)
raise RuntimeError(f"gave up: {req.url}")
async def fetch_all(requests, retries=5, max_rate="5/1", save_path=None, timeout=60):
limiter = AsyncLimiter(*[int(v) for v in max_rate.split("/")])
async with httpx.AsyncClient(timeout=timeout) as client:
tasks = [fetch(req, client, limiter, retries=retries, save_path=save_path) for req in requests]
results = []
for task in tqdm.as_completed(tasks, total=len(tasks)):
try:
res = await task
results.append(res)
except Exception as e:
results.append(e)
return results
def main():
parser = argparse.ArgumentParser()
parser.add_argument("urls", nargs="*", help="URLs to fetch")
parser.add_argument("-X", "--method", default="GET", help="HTTP method")
parser.add_argument("-d", "--data", help="Request body (text)")
parser.add_argument("-H", "--header", action="append", help="Headers as 'Key: Value'")
parser.add_argument("-r", "--rate", default="5/1", help="Rate. Default: 5/1 (5 per second)")
parser.add_argument("--log", help="Log file path")
parser.add_argument("--save", help="Directory to save responses")
parser.add_argument("--retries", type=int, default=5, help="Number of retries on failure")
parser.add_argument("--repl", action="store_true", help="Start IPython")
args = parser.parse_args()
log_handlers = [logging.NullHandler()]
if args.log:
log_handlers.append(logging.FileHandler(args.log))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
handlers=log_handlers,
)
headers = {}
if args.header:
for h in args.header:
if ":" in h:
k, v = h.split(":", 1)
headers[k.strip()] = v.strip()
data = args.data if args.data else None
if args.save:
os.makedirs(args.save, exist_ok=True)
reqs = [httpx.Request(args.method.upper(), url, headers=headers, content=data) for url in args.urls]
if args.repl:
import IPython
IPython.embed(colors="neutral")
sys.exit()
asyncio.run(fetch_all(reqs, retries=args.retries, max_rate=args.rate, save_path=args.save))
if __name__ == "__main__":
main()
# vim: ft=python
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment