Skip to content

Instantly share code, notes, and snippets.

@lukestanley
Created June 8, 2026 10:58
Show Gist options
  • Select an option

  • Save lukestanley/cdbf1e03dc6c857504990213bf7cd331 to your computer and use it in GitHub Desktop.

Select an option

Save lukestanley/cdbf1e03dc6c857504990213bf7cd331 to your computer and use it in GitHub Desktop.
Single-file HTTPS MitM proxy (Astral uv script) for easy sandbox filtering
# /// script
# dependencies = [
# "cryptography",
# "trio",
# ]
# ///
from csv import DictReader
from datetime import datetime, timedelta, timezone
from ipaddress import ip_address
from io import StringIO
from os import execv
from pathlib import Path
from sys import argv, executable
from urllib.parse import urlsplit
import re
import ssl
import trio
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID
LISTEN_HOST = "127.0.0.1"
LISTEN_PORT = 18080
STATE_DIR = Path.home() / ".sandbox_proxy"
CA_KEY_PATH = STATE_DIR / "sandbox_proxy_ca.key"
CA_CERT_PATH = STATE_DIR / "sandbox_proxy_ca.crt"
LEAF_DIR = STATE_DIR / "leaf_certs"
DENIED_LOG_PATH = STATE_DIR / "denied.tsv"
CANDIDATE_RULES_PATH = STATE_DIR / "candidate-allow-rules.tsv"
SCRIPT_PATH = Path(__file__).resolve()
RELOAD_POLL_SECONDS = 1
LOG_TIME_FORMAT = "%H:%M:%S"
def local_time_text():
return datetime.now().astimezone().strftime(LOG_TIME_FORMAT)
def log(message):
print(f"[{local_time_text()}] {message}", flush=True)
def request_url(scheme, host, port, path):
default_port = 443 if scheme == "https" else 80
authority = host if port == default_port else f"{host}:{port}"
return f"{scheme}://{authority}{path}"
async def close_stream_quietly(stream):
try:
await stream.aclose()
except (trio.BrokenResourceError, trio.ClosedResourceError, OSError):
pass
MAX_HEADER_BYTES = 64 * 1024
MAX_REQUEST_BODY_BYTES = 200 * 1024 * 1024
ALLOW_RULES_TSV = r"""
name scheme host port methods path
Ollama local API http 127.0.0.1 11434 GET,POST,HEAD ^/.*$
Hugging Face metadata/files https huggingface.co 443 GET,HEAD ^/(api/models/[^?]+|[^/]+/[^/]+/resolve/[^?]+)(\?.*)?$
Hugging Face LFS CDN https cdn-lfs.huggingface.co 443 GET,HEAD ^/.*$
Debian HTTP packages http deb.debian.org 80 GET,HEAD ^/(debian|debian-security)/.*$
Debian HTTP fallback http ftp.debian.org 80 GET,HEAD ^/debian/.*$
Debian HTTPS packages https deb.debian.org 443 GET,HEAD ^/(debian|debian-security)/.*$
npm registry https registry.npmjs.org 443 GET,HEAD ^/.*$
npm package tarballs https *.npmjs.org 443 GET,HEAD ^/.*$
Astral uv tarballs https *.astral.sh 443 GET,HEAD ^/.*$
Astral uv tarballs https astral.sh 443 GET,HEAD ^/.*$
Deno https deno.land 443 GET,HEAD ^/.*$
Deno https *.deno.land 443 GET,HEAD ^/.*$
Deno https jsr.io 443 GET,HEAD ^/.*$
Deno https *.jsr.io 443 GET,HEAD ^/.*$
pypi https pypi.org 443 GET,HEAD ^/.*$
pypi https pythonhosted.org 443 GET,HEAD ^/.*$
pypi https *.pythonhosted.org 443 GET,HEAD ^/.*$
OpenAI API https api.openai.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
OpenAI subdomains https *.openai.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
ChatGPT domain https chatgpt.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
ChatGPT subdomains https *.chatgpt.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
Claude.com https claude.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
Claude subdomains https *.claude.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
Claude.ai https claude.ai 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
Claude.ai subdomains https *.claude.ai 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
GitHub API https api.github.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
GitHub domain https github.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
GitHub subdomains https *.github.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$
GitHub user content https raw.githubusercontent.com 443 GET,HEAD ^/.*$
GitHub objects https objects.githubusercontent.com 443 GET,HEAD ^/.*$
GitHub releases https *.githubusercontent.com 443 GET,HEAD ^/.*$
GitHub raw content https raw.githubusercontent.com 443 GET,HEAD ^/.*$
ChatGPT dump read-only host files http 127.0.0.1 18081 GET,HEAD ^/.*$
""".strip()
RULES = [
{
"name": row["name"],
"scheme": row["scheme"].lower(),
"host": row["host"].lower(),
"port": int(row["port"]),
"methods": {method.strip().upper() for method in row["methods"].split(",")},
"path_pattern": re.compile(row["path"]),
"path_text": row["path"],
}
for row in DictReader(StringIO(ALLOW_RULES_TSV), delimiter="\t")
]
def now():
return datetime.now(timezone.utc)
def write_private_key(path, key):
path.write_bytes(
key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
)
)
path.chmod(0o600)
def write_certificate(path, certificate):
path.write_bytes(certificate.public_bytes(serialization.Encoding.PEM))
path.chmod(0o644)
def certificate_name(common_name):
return x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
])
def ensure_ca():
STATE_DIR.mkdir(mode=0o700, exist_ok=True)
LEAF_DIR.mkdir(mode=0o700, exist_ok=True)
if CA_KEY_PATH.exists() and CA_CERT_PATH.exists():
ca_key = serialization.load_pem_private_key(CA_KEY_PATH.read_bytes(), None)
ca_cert = x509.load_pem_x509_certificate(CA_CERT_PATH.read_bytes())
return ca_key, ca_cert
ca_key = rsa.generate_private_key(public_exponent=65537, key_size=3072)
certificate = (
x509.CertificateBuilder()
.subject_name(certificate_name("sandbox proxy local CA"))
.issuer_name(certificate_name("sandbox proxy local CA"))
.public_key(ca_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now() - timedelta(days=1))
.not_valid_after(now() + timedelta(days=3650))
.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True)
.add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=False,
key_cert_sign=True,
key_agreement=False,
content_commitment=False,
data_encipherment=False,
encipher_only=False,
decipher_only=False,
crl_sign=True,
),
critical=True,
)
.sign(ca_key, hashes.SHA256())
)
write_private_key(CA_KEY_PATH, ca_key)
write_certificate(CA_CERT_PATH, certificate)
return ca_key, certificate
CA_KEY, CA_CERT = ensure_ca()
def safe_host_filename(host):
return re.sub(r"[^a-zA-Z0-9_.-]", "_", host.lower())
def host_alt_name(host):
try:
return x509.IPAddress(ip_address(host))
except ValueError:
return x509.DNSName(host)
def ensure_leaf_certificate(host):
safe_name = safe_host_filename(host)
key_path = LEAF_DIR / f"{safe_name}.key"
cert_path = LEAF_DIR / f"{safe_name}.crt"
if key_path.exists() and cert_path.exists():
return key_path, cert_path
leaf_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
certificate = (
x509.CertificateBuilder()
.subject_name(certificate_name(host))
.issuer_name(CA_CERT.subject)
.public_key(leaf_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now() - timedelta(days=1))
.not_valid_after(now() + timedelta(days=90))
.add_extension(x509.SubjectAlternativeName([host_alt_name(host)]), critical=False)
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.add_extension(x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False)
.sign(CA_KEY, hashes.SHA256())
)
write_private_key(key_path, leaf_key)
write_certificate(cert_path, certificate)
return key_path, cert_path
def tls_server_context_for(host):
key_path, cert_path = ensure_leaf_certificate(host)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile=cert_path, keyfile=key_path)
context.set_alpn_protocols(["http/1.1"])
return context
def tls_client_context():
context = ssl.create_default_context()
context.set_alpn_protocols(["http/1.1"])
return context
def host_matches(rule_host, request_host):
rule_host = rule_host.lower()
request_host = request_host.lower()
if rule_host == request_host:
return True
if rule_host.startswith("*."):
suffix = rule_host[1:]
return request_host.endswith(suffix) and request_host != suffix[1:]
return False
def target_matches(rule, scheme, host, port):
checks = [
scheme == rule["scheme"],
host_matches(rule["host"], host),
port == rule["port"],
]
return all(checks)
def matching_rule(scheme, host, port, method, path):
for rule in RULES:
checks = [
target_matches(rule, scheme, host, port),
method.upper() in rule["methods"],
bool(rule["path_pattern"].match(path)),
]
if all(checks):
return rule
return None
def connect_target_allowed(host, port):
for rule in RULES:
if target_matches(rule, "https", host, port):
return True
return False
def header_value(header_bytes, header_name):
wanted = header_name.lower().encode()
for line in header_bytes.split(b"\r\n")[1:]:
if b":" not in line:
continue
name, value = line.split(b":", 1)
if name.strip().lower() == wanted:
return value.strip().decode("iso-8859-1")
return ""
def content_length(header_bytes):
value = header_value(header_bytes, "content-length")
return int(value) if value.isdigit() else 0
def has_chunked_request_body(header_bytes):
value = header_value(header_bytes, "transfer-encoding").lower()
return "chunked" in value
def parse_request_line(header_bytes):
first_line = header_bytes.split(b"\r\n", 1)[0].decode("iso-8859-1")
method, target, version = first_line.split(" ", 2)
return method, target, version
def path_from_target(target):
if target.startswith("http://") or target.startswith("https://"):
parsed = urlsplit(target)
path = parsed.path or "/"
return path + ("?" + parsed.query if parsed.query else "")
return target or "/"
def host_port_from_authority(authority, default_port):
if authority.startswith("["):
host, rest = authority[1:].split("]", 1)
port = int(rest.removeprefix(":") or default_port)
return host, port
if ":" in authority:
host, port_text = authority.rsplit(":", 1)
return host, int(port_text)
return authority, default_port
def host_port_from_connect_target(target):
return host_port_from_authority(target, 443)
def host_port_from_http_target(target, headers):
parsed = urlsplit(target)
if parsed.scheme and parsed.hostname:
default_port = 443 if parsed.scheme == "https" else 80
port = parsed.port or default_port
return parsed.scheme, parsed.hostname, port
host_header = headers.get("host", "")
host, port = host_port_from_authority(host_header, 80)
return "http", host, port
def parsed_headers(header_bytes):
headers = {}
for line in header_bytes.split(b"\r\n")[1:]:
if b":" not in line:
continue
name, value = line.split(b":", 1)
header_name = name.decode("iso-8859-1").strip().lower()
header_value_text = value.decode("iso-8859-1").strip()
headers[header_name] = header_value_text
return headers
def is_websocket_request(header_bytes):
headers = parsed_headers(header_bytes)
connection = headers.get("connection", "").lower()
upgrade = headers.get("upgrade", "").lower()
checks = [
"upgrade" in connection,
upgrade == "websocket",
]
return all(checks)
def rewrite_request_for_upstream(header_bytes, body, force_close=True):
method, target, version = parse_request_line(header_bytes)
target = path_from_target(target)
lines = header_bytes.decode("iso-8859-1").split("\r\n")
rewritten_lines = [f"{method} {target} {version}"]
for line in lines[1:]:
if not line:
continue
name = line.split(":", 1)[0].strip().lower()
if name == "proxy-connection":
continue
if force_close and name == "connection":
continue
rewritten_lines.append(line)
if force_close:
rewritten_lines.append("Connection: close")
rewritten_header = "\r\n".join(rewritten_lines) + "\r\n\r\n"
return rewritten_header.encode("iso-8859-1") + body
def response_is_websocket_upgrade(header_bytes):
first_line = header_bytes.split(b"\r\n", 1)[0].decode("iso-8859-1")
status_code = first_line.split(" ", 2)[1] if " " in first_line else ""
headers = parsed_headers(header_bytes)
connection = headers.get("connection", "").lower()
upgrade = headers.get("upgrade", "").lower()
checks = [
status_code == "101",
"upgrade" in connection,
upgrade == "websocket",
]
return all(checks)
def normalised_candidate_path(path):
if "?" in path:
path = path.split("?", 1)[0] + r"(\?.*)?$"
if path.endswith(r"(\?.*)?$"):
base_path = path.removesuffix(r"(\?.*)?$")
return "^" + re.escape(base_path) + r"(\?.*)?$"
return "^" + re.escape(path) + r"(\?.*)?$"
def candidate_rule_line(scheme, host, port, method, path):
fields = [
f"Candidate {host}",
scheme,
host,
str(port),
method.upper(),
normalised_candidate_path(path),
]
return "\t".join(fields)
def append_tsv_line(path, header, line):
needs_header = not path.exists() or path.stat().st_size == 0
with path.open("a", encoding="utf-8") as output_file:
if needs_header:
output_file.write(header + "\n")
output_file.write(line + "\n")
def log_denied(reason, scheme, host, port, method, path):
denied_line = "\t".join([
now().isoformat(),
reason,
method.upper(),
scheme,
host,
str(port),
path,
])
denied_header = "time\treason\tmethod\tscheme\thost\tport\tpath"
append_tsv_line(DENIED_LOG_PATH, denied_header, denied_line)
candidate_line = candidate_rule_line(scheme, host, port, method, path)
candidate_header = "name\tscheme\thost\tport\tmethods\tpath"
append_tsv_line(CANDIDATE_RULES_PATH, candidate_header, candidate_line)
async def read_until_header_end(stream):
data = b""
while b"\r\n\r\n" not in data:
chunk = await stream.receive_some(8192)
if not chunk:
raise RuntimeError("connection closed before headers completed")
data += chunk
if len(data) > MAX_HEADER_BYTES:
raise RuntimeError("request headers too large")
header, rest = data.split(b"\r\n\r\n", 1)
return header + b"\r\n\r\n", rest
async def read_exact_body(stream, already_read, wanted_length):
body = already_read[:wanted_length]
remaining = wanted_length - len(body)
while remaining > 0:
chunk = await stream.receive_some(min(65536, remaining))
if not chunk:
raise RuntimeError("connection closed before body completed")
body += chunk
remaining -= len(chunk)
return body
async def read_request(stream):
header, rest = await read_until_header_end(stream)
if has_chunked_request_body(header):
raise RuntimeError("chunked request bodies are blocked by this minimal proxy")
length = content_length(header)
if length > MAX_REQUEST_BODY_BYTES:
raise RuntimeError("request body too large")
body = await read_exact_body(stream, rest, length)
return header, body
async def send_blocked(stream, reason, scheme="", host="", port=0, method="", path=""):
body_lines = [
"blocked by sandbox proxy",
f"reason: {reason}",
"",
]
body = "\n".join(body_lines)
response_lines = [
"HTTP/1.1 403 Forbidden",
"Content-Type: text/plain; charset=utf-8",
f"Content-Length: {len(body.encode())}",
"Connection: close",
"",
body,
]
log(f"BLOCK {reason}")
if scheme and host and method and path:
log_denied(reason, scheme, host, port, method, path)
log(f"candidate allow row written: {CANDIDATE_RULES_PATH}")
await stream.send_all("\r\n".join(response_lines).encode())
async def relay_response(upstream, client):
while True:
data = await upstream.receive_some(65536)
if not data:
return
try:
await client.send_all(data)
except trio.BrokenResourceError:
log("client closed while response was being relayed")
return
async def relay_bytes(source, target):
while True:
data = await source.receive_some(65536)
if not data:
await close_stream_quietly(target)
return
try:
await target.send_all(data)
except trio.BrokenResourceError:
await close_stream_quietly(source)
return
async def relay_bidirectional(left, right):
async with trio.open_nursery() as nursery:
nursery.start_soon(relay_bytes, left, right)
nursery.start_soon(relay_bytes, right, left)
async def read_response_header_and_rest(stream):
return await read_until_header_end(stream)
async def forward_regular_http(client, host, port, header, body):
upstream = await trio.open_tcp_stream(host, port)
await upstream.send_all(rewrite_request_for_upstream(header, body, force_close=True))
await relay_response(upstream, client)
await upstream.aclose()
async def forward_regular_https(client, host, port, header, body):
context = tls_client_context()
upstream = await trio.open_ssl_over_tcp_stream(
host,
port,
https_compatible=True,
ssl_context=context,
)
await upstream.send_all(rewrite_request_for_upstream(header, body, force_close=True))
await relay_response(upstream, client)
await upstream.aclose()
async def forward_websocket_http(client, host, port, header, body):
upstream = await trio.open_tcp_stream(host, port)
await upstream.send_all(rewrite_request_for_upstream(header, body, force_close=False))
response_header, response_rest = await read_response_header_and_rest(upstream)
await client.send_all(response_header + response_rest)
if not response_is_websocket_upgrade(response_header):
await upstream.aclose()
return
log(f"WEBSOCKET ws://{host}:{port}")
await relay_bidirectional(client, upstream)
async def forward_websocket_https(client, host, port, header, body):
context = tls_client_context()
upstream = await trio.open_ssl_over_tcp_stream(
host,
port,
https_compatible=True,
ssl_context=context,
)
await upstream.send_all(rewrite_request_for_upstream(header, body, force_close=False))
response_header, response_rest = await read_response_header_and_rest(upstream)
await client.send_all(response_header + response_rest)
if not response_is_websocket_upgrade(response_header):
await upstream.aclose()
return
log(f"WEBSOCKET wss://{host}:{port}")
await relay_bidirectional(client, upstream)
async def handle_http_request(client, header, body):
method, target, version = parse_request_line(header)
headers = parsed_headers(header)
scheme, host, port = host_port_from_http_target(target, headers)
path = path_from_target(target)
rule = matching_rule(scheme, host, port, method, path)
if not rule:
reason = f"no rule for {method} {scheme}://{host}:{port}{path}"
await send_blocked(client, reason, scheme, host, port, method, path)
return
log(f"ALLOW {rule['name']}: {method} {request_url(scheme, host, port, path)}")
if is_websocket_request(header):
await forward_websocket_http(client, host, port, header, body)
return
await forward_regular_http(client, host, port, header, body)
async def handle_connect_request(client, target):
host, port = host_port_from_connect_target(target)
if not connect_target_allowed(host, port):
reason = f"CONNECT target not allowed: {host}:{port}"
await send_blocked(client, reason, "https", host, port, "CONNECT", "/")
return
await client.send_all(b"HTTP/1.1 200 Connection Established\r\n\r\n")
server_context = tls_server_context_for(host)
tls_client = trio.SSLStream(
client,
server_context,
server_side=True,
https_compatible=True,
)
log(f"CONNECT allowed: {host}:{port}; starting local TLS interception")
try:
await tls_client.do_handshake()
except trio.BrokenResourceError:
log(f"TLS rejected by client for {host}:{port}; install this CA inside the sandbox trust store: {CA_CERT_PATH}")
await close_stream_quietly(tls_client)
return
except ssl.SSLError as error:
log(f"TLS handshake failed for {host}:{port}: {error}")
await close_stream_quietly(tls_client)
return
header, body = await read_request(tls_client)
method, request_target, version = parse_request_line(header)
path = path_from_target(request_target)
rule = matching_rule("https", host, port, method, path)
if not rule:
reason = f"no rule for {method} https://{host}:{port}{path}"
await send_blocked(tls_client, reason, "https", host, port, method, path)
return
log(f"ALLOW {rule['name']}: {method} {request_url('https', host, port, path)}")
if is_websocket_request(header):
await forward_websocket_https(tls_client, host, port, header, body)
return
await forward_regular_https(tls_client, host, port, header, body)
await tls_client.aclose()
async def handle_client(client):
try:
header, body = await read_request(client)
method, target, version = parse_request_line(header)
if method.upper() == "CONNECT":
await handle_connect_request(client, target)
else:
await handle_http_request(client, header, body)
except trio.BrokenResourceError:
log("client disconnected during proxy handling")
except (RuntimeError, ValueError) as error:
log(f"bad client request: {error}")
except OSError as error:
log(f"network error while handling client: {error}")
finally:
await close_stream_quietly(client)
async def reload_when_file_changes():
last_mtime = SCRIPT_PATH.stat().st_mtime_ns
while True:
await trio.sleep(RELOAD_POLL_SECONDS)
current_mtime = SCRIPT_PATH.stat().st_mtime_ns
if current_mtime == last_mtime:
continue
log(f"{SCRIPT_PATH} changed; restarting proxy")
await trio.sleep(0.5)
execv(executable, [executable, *argv])
async def main():
try:
listeners = await trio.open_tcp_listeners(LISTEN_PORT, host=LISTEN_HOST)
except OSError as error:
log(f"cannot listen on {LISTEN_HOST}:{LISTEN_PORT}: {error}")
log("another sandbox proxy is probably already running")
return
log(f"ready: http://{LISTEN_HOST}:{LISTEN_PORT}")
log(f"CA certificate: {CA_CERT_PATH}")
log(f"denied log: {DENIED_LOG_PATH}")
log(f"candidate allow rules: {CANDIDATE_RULES_PATH}")
log("HTTPS interception needs this CA installed inside the sandbox trust store")
async with trio.open_nursery() as nursery:
nursery.start_soon(reload_when_file_changes)
await trio.serve_listeners(handle_client, listeners)
trio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment