Created
June 8, 2026 10:58
-
-
Save lukestanley/cdbf1e03dc6c857504990213bf7cd331 to your computer and use it in GitHub Desktop.
Single-file HTTPS MitM proxy (Astral uv script) for easy sandbox filtering
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # dependencies = [ | |
| # "cryptography", | |
| # "trio", | |
| # ] | |
| # /// | |
| from csv import DictReader | |
| from datetime import datetime, timedelta, timezone | |
| from ipaddress import ip_address | |
| from io import StringIO | |
| from os import execv | |
| from pathlib import Path | |
| from sys import argv, executable | |
| from urllib.parse import urlsplit | |
| import re | |
| import ssl | |
| import trio | |
| from cryptography import x509 | |
| from cryptography.hazmat.primitives import hashes, serialization | |
| from cryptography.hazmat.primitives.asymmetric import rsa | |
| from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID | |
| LISTEN_HOST = "127.0.0.1" | |
| LISTEN_PORT = 18080 | |
| STATE_DIR = Path.home() / ".sandbox_proxy" | |
| CA_KEY_PATH = STATE_DIR / "sandbox_proxy_ca.key" | |
| CA_CERT_PATH = STATE_DIR / "sandbox_proxy_ca.crt" | |
| LEAF_DIR = STATE_DIR / "leaf_certs" | |
| DENIED_LOG_PATH = STATE_DIR / "denied.tsv" | |
| CANDIDATE_RULES_PATH = STATE_DIR / "candidate-allow-rules.tsv" | |
| SCRIPT_PATH = Path(__file__).resolve() | |
| RELOAD_POLL_SECONDS = 1 | |
| LOG_TIME_FORMAT = "%H:%M:%S" | |
| def local_time_text(): | |
| return datetime.now().astimezone().strftime(LOG_TIME_FORMAT) | |
| def log(message): | |
| print(f"[{local_time_text()}] {message}", flush=True) | |
| def request_url(scheme, host, port, path): | |
| default_port = 443 if scheme == "https" else 80 | |
| authority = host if port == default_port else f"{host}:{port}" | |
| return f"{scheme}://{authority}{path}" | |
| async def close_stream_quietly(stream): | |
| try: | |
| await stream.aclose() | |
| except (trio.BrokenResourceError, trio.ClosedResourceError, OSError): | |
| pass | |
| MAX_HEADER_BYTES = 64 * 1024 | |
| MAX_REQUEST_BODY_BYTES = 200 * 1024 * 1024 | |
| ALLOW_RULES_TSV = r""" | |
| name scheme host port methods path | |
| Ollama local API http 127.0.0.1 11434 GET,POST,HEAD ^/.*$ | |
| Hugging Face metadata/files https huggingface.co 443 GET,HEAD ^/(api/models/[^?]+|[^/]+/[^/]+/resolve/[^?]+)(\?.*)?$ | |
| Hugging Face LFS CDN https cdn-lfs.huggingface.co 443 GET,HEAD ^/.*$ | |
| Debian HTTP packages http deb.debian.org 80 GET,HEAD ^/(debian|debian-security)/.*$ | |
| Debian HTTP fallback http ftp.debian.org 80 GET,HEAD ^/debian/.*$ | |
| Debian HTTPS packages https deb.debian.org 443 GET,HEAD ^/(debian|debian-security)/.*$ | |
| npm registry https registry.npmjs.org 443 GET,HEAD ^/.*$ | |
| npm package tarballs https *.npmjs.org 443 GET,HEAD ^/.*$ | |
| Astral uv tarballs https *.astral.sh 443 GET,HEAD ^/.*$ | |
| Astral uv tarballs https astral.sh 443 GET,HEAD ^/.*$ | |
| Deno https deno.land 443 GET,HEAD ^/.*$ | |
| Deno https *.deno.land 443 GET,HEAD ^/.*$ | |
| Deno https jsr.io 443 GET,HEAD ^/.*$ | |
| Deno https *.jsr.io 443 GET,HEAD ^/.*$ | |
| pypi https pypi.org 443 GET,HEAD ^/.*$ | |
| pypi https pythonhosted.org 443 GET,HEAD ^/.*$ | |
| pypi https *.pythonhosted.org 443 GET,HEAD ^/.*$ | |
| OpenAI API https api.openai.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| OpenAI subdomains https *.openai.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| ChatGPT domain https chatgpt.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| ChatGPT subdomains https *.chatgpt.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| Claude.com https claude.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| Claude subdomains https *.claude.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| Claude.ai https claude.ai 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| Claude.ai subdomains https *.claude.ai 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| GitHub API https api.github.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| GitHub domain https github.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| GitHub subdomains https *.github.com 443 GET,HEAD,POST,PUT,PATCH,DELETE ^/.*$ | |
| GitHub user content https raw.githubusercontent.com 443 GET,HEAD ^/.*$ | |
| GitHub objects https objects.githubusercontent.com 443 GET,HEAD ^/.*$ | |
| GitHub releases https *.githubusercontent.com 443 GET,HEAD ^/.*$ | |
| GitHub raw content https raw.githubusercontent.com 443 GET,HEAD ^/.*$ | |
| ChatGPT dump read-only host files http 127.0.0.1 18081 GET,HEAD ^/.*$ | |
| """.strip() | |
| RULES = [ | |
| { | |
| "name": row["name"], | |
| "scheme": row["scheme"].lower(), | |
| "host": row["host"].lower(), | |
| "port": int(row["port"]), | |
| "methods": {method.strip().upper() for method in row["methods"].split(",")}, | |
| "path_pattern": re.compile(row["path"]), | |
| "path_text": row["path"], | |
| } | |
| for row in DictReader(StringIO(ALLOW_RULES_TSV), delimiter="\t") | |
| ] | |
| def now(): | |
| return datetime.now(timezone.utc) | |
| def write_private_key(path, key): | |
| path.write_bytes( | |
| key.private_bytes( | |
| serialization.Encoding.PEM, | |
| serialization.PrivateFormat.TraditionalOpenSSL, | |
| serialization.NoEncryption(), | |
| ) | |
| ) | |
| path.chmod(0o600) | |
| def write_certificate(path, certificate): | |
| path.write_bytes(certificate.public_bytes(serialization.Encoding.PEM)) | |
| path.chmod(0o644) | |
| def certificate_name(common_name): | |
| return x509.Name([ | |
| x509.NameAttribute(NameOID.COMMON_NAME, common_name), | |
| ]) | |
| def ensure_ca(): | |
| STATE_DIR.mkdir(mode=0o700, exist_ok=True) | |
| LEAF_DIR.mkdir(mode=0o700, exist_ok=True) | |
| if CA_KEY_PATH.exists() and CA_CERT_PATH.exists(): | |
| ca_key = serialization.load_pem_private_key(CA_KEY_PATH.read_bytes(), None) | |
| ca_cert = x509.load_pem_x509_certificate(CA_CERT_PATH.read_bytes()) | |
| return ca_key, ca_cert | |
| ca_key = rsa.generate_private_key(public_exponent=65537, key_size=3072) | |
| certificate = ( | |
| x509.CertificateBuilder() | |
| .subject_name(certificate_name("sandbox proxy local CA")) | |
| .issuer_name(certificate_name("sandbox proxy local CA")) | |
| .public_key(ca_key.public_key()) | |
| .serial_number(x509.random_serial_number()) | |
| .not_valid_before(now() - timedelta(days=1)) | |
| .not_valid_after(now() + timedelta(days=3650)) | |
| .add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) | |
| .add_extension( | |
| x509.KeyUsage( | |
| digital_signature=True, | |
| key_encipherment=False, | |
| key_cert_sign=True, | |
| key_agreement=False, | |
| content_commitment=False, | |
| data_encipherment=False, | |
| encipher_only=False, | |
| decipher_only=False, | |
| crl_sign=True, | |
| ), | |
| critical=True, | |
| ) | |
| .sign(ca_key, hashes.SHA256()) | |
| ) | |
| write_private_key(CA_KEY_PATH, ca_key) | |
| write_certificate(CA_CERT_PATH, certificate) | |
| return ca_key, certificate | |
| CA_KEY, CA_CERT = ensure_ca() | |
| def safe_host_filename(host): | |
| return re.sub(r"[^a-zA-Z0-9_.-]", "_", host.lower()) | |
| def host_alt_name(host): | |
| try: | |
| return x509.IPAddress(ip_address(host)) | |
| except ValueError: | |
| return x509.DNSName(host) | |
| def ensure_leaf_certificate(host): | |
| safe_name = safe_host_filename(host) | |
| key_path = LEAF_DIR / f"{safe_name}.key" | |
| cert_path = LEAF_DIR / f"{safe_name}.crt" | |
| if key_path.exists() and cert_path.exists(): | |
| return key_path, cert_path | |
| leaf_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) | |
| certificate = ( | |
| x509.CertificateBuilder() | |
| .subject_name(certificate_name(host)) | |
| .issuer_name(CA_CERT.subject) | |
| .public_key(leaf_key.public_key()) | |
| .serial_number(x509.random_serial_number()) | |
| .not_valid_before(now() - timedelta(days=1)) | |
| .not_valid_after(now() + timedelta(days=90)) | |
| .add_extension(x509.SubjectAlternativeName([host_alt_name(host)]), critical=False) | |
| .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) | |
| .add_extension(x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False) | |
| .sign(CA_KEY, hashes.SHA256()) | |
| ) | |
| write_private_key(key_path, leaf_key) | |
| write_certificate(cert_path, certificate) | |
| return key_path, cert_path | |
| def tls_server_context_for(host): | |
| key_path, cert_path = ensure_leaf_certificate(host) | |
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) | |
| context.load_cert_chain(certfile=cert_path, keyfile=key_path) | |
| context.set_alpn_protocols(["http/1.1"]) | |
| return context | |
| def tls_client_context(): | |
| context = ssl.create_default_context() | |
| context.set_alpn_protocols(["http/1.1"]) | |
| return context | |
| def host_matches(rule_host, request_host): | |
| rule_host = rule_host.lower() | |
| request_host = request_host.lower() | |
| if rule_host == request_host: | |
| return True | |
| if rule_host.startswith("*."): | |
| suffix = rule_host[1:] | |
| return request_host.endswith(suffix) and request_host != suffix[1:] | |
| return False | |
| def target_matches(rule, scheme, host, port): | |
| checks = [ | |
| scheme == rule["scheme"], | |
| host_matches(rule["host"], host), | |
| port == rule["port"], | |
| ] | |
| return all(checks) | |
| def matching_rule(scheme, host, port, method, path): | |
| for rule in RULES: | |
| checks = [ | |
| target_matches(rule, scheme, host, port), | |
| method.upper() in rule["methods"], | |
| bool(rule["path_pattern"].match(path)), | |
| ] | |
| if all(checks): | |
| return rule | |
| return None | |
| def connect_target_allowed(host, port): | |
| for rule in RULES: | |
| if target_matches(rule, "https", host, port): | |
| return True | |
| return False | |
| def header_value(header_bytes, header_name): | |
| wanted = header_name.lower().encode() | |
| for line in header_bytes.split(b"\r\n")[1:]: | |
| if b":" not in line: | |
| continue | |
| name, value = line.split(b":", 1) | |
| if name.strip().lower() == wanted: | |
| return value.strip().decode("iso-8859-1") | |
| return "" | |
| def content_length(header_bytes): | |
| value = header_value(header_bytes, "content-length") | |
| return int(value) if value.isdigit() else 0 | |
| def has_chunked_request_body(header_bytes): | |
| value = header_value(header_bytes, "transfer-encoding").lower() | |
| return "chunked" in value | |
| def parse_request_line(header_bytes): | |
| first_line = header_bytes.split(b"\r\n", 1)[0].decode("iso-8859-1") | |
| method, target, version = first_line.split(" ", 2) | |
| return method, target, version | |
| def path_from_target(target): | |
| if target.startswith("http://") or target.startswith("https://"): | |
| parsed = urlsplit(target) | |
| path = parsed.path or "/" | |
| return path + ("?" + parsed.query if parsed.query else "") | |
| return target or "/" | |
| def host_port_from_authority(authority, default_port): | |
| if authority.startswith("["): | |
| host, rest = authority[1:].split("]", 1) | |
| port = int(rest.removeprefix(":") or default_port) | |
| return host, port | |
| if ":" in authority: | |
| host, port_text = authority.rsplit(":", 1) | |
| return host, int(port_text) | |
| return authority, default_port | |
| def host_port_from_connect_target(target): | |
| return host_port_from_authority(target, 443) | |
| def host_port_from_http_target(target, headers): | |
| parsed = urlsplit(target) | |
| if parsed.scheme and parsed.hostname: | |
| default_port = 443 if parsed.scheme == "https" else 80 | |
| port = parsed.port or default_port | |
| return parsed.scheme, parsed.hostname, port | |
| host_header = headers.get("host", "") | |
| host, port = host_port_from_authority(host_header, 80) | |
| return "http", host, port | |
| def parsed_headers(header_bytes): | |
| headers = {} | |
| for line in header_bytes.split(b"\r\n")[1:]: | |
| if b":" not in line: | |
| continue | |
| name, value = line.split(b":", 1) | |
| header_name = name.decode("iso-8859-1").strip().lower() | |
| header_value_text = value.decode("iso-8859-1").strip() | |
| headers[header_name] = header_value_text | |
| return headers | |
| def is_websocket_request(header_bytes): | |
| headers = parsed_headers(header_bytes) | |
| connection = headers.get("connection", "").lower() | |
| upgrade = headers.get("upgrade", "").lower() | |
| checks = [ | |
| "upgrade" in connection, | |
| upgrade == "websocket", | |
| ] | |
| return all(checks) | |
| def rewrite_request_for_upstream(header_bytes, body, force_close=True): | |
| method, target, version = parse_request_line(header_bytes) | |
| target = path_from_target(target) | |
| lines = header_bytes.decode("iso-8859-1").split("\r\n") | |
| rewritten_lines = [f"{method} {target} {version}"] | |
| for line in lines[1:]: | |
| if not line: | |
| continue | |
| name = line.split(":", 1)[0].strip().lower() | |
| if name == "proxy-connection": | |
| continue | |
| if force_close and name == "connection": | |
| continue | |
| rewritten_lines.append(line) | |
| if force_close: | |
| rewritten_lines.append("Connection: close") | |
| rewritten_header = "\r\n".join(rewritten_lines) + "\r\n\r\n" | |
| return rewritten_header.encode("iso-8859-1") + body | |
| def response_is_websocket_upgrade(header_bytes): | |
| first_line = header_bytes.split(b"\r\n", 1)[0].decode("iso-8859-1") | |
| status_code = first_line.split(" ", 2)[1] if " " in first_line else "" | |
| headers = parsed_headers(header_bytes) | |
| connection = headers.get("connection", "").lower() | |
| upgrade = headers.get("upgrade", "").lower() | |
| checks = [ | |
| status_code == "101", | |
| "upgrade" in connection, | |
| upgrade == "websocket", | |
| ] | |
| return all(checks) | |
| def normalised_candidate_path(path): | |
| if "?" in path: | |
| path = path.split("?", 1)[0] + r"(\?.*)?$" | |
| if path.endswith(r"(\?.*)?$"): | |
| base_path = path.removesuffix(r"(\?.*)?$") | |
| return "^" + re.escape(base_path) + r"(\?.*)?$" | |
| return "^" + re.escape(path) + r"(\?.*)?$" | |
| def candidate_rule_line(scheme, host, port, method, path): | |
| fields = [ | |
| f"Candidate {host}", | |
| scheme, | |
| host, | |
| str(port), | |
| method.upper(), | |
| normalised_candidate_path(path), | |
| ] | |
| return "\t".join(fields) | |
| def append_tsv_line(path, header, line): | |
| needs_header = not path.exists() or path.stat().st_size == 0 | |
| with path.open("a", encoding="utf-8") as output_file: | |
| if needs_header: | |
| output_file.write(header + "\n") | |
| output_file.write(line + "\n") | |
| def log_denied(reason, scheme, host, port, method, path): | |
| denied_line = "\t".join([ | |
| now().isoformat(), | |
| reason, | |
| method.upper(), | |
| scheme, | |
| host, | |
| str(port), | |
| path, | |
| ]) | |
| denied_header = "time\treason\tmethod\tscheme\thost\tport\tpath" | |
| append_tsv_line(DENIED_LOG_PATH, denied_header, denied_line) | |
| candidate_line = candidate_rule_line(scheme, host, port, method, path) | |
| candidate_header = "name\tscheme\thost\tport\tmethods\tpath" | |
| append_tsv_line(CANDIDATE_RULES_PATH, candidate_header, candidate_line) | |
| async def read_until_header_end(stream): | |
| data = b"" | |
| while b"\r\n\r\n" not in data: | |
| chunk = await stream.receive_some(8192) | |
| if not chunk: | |
| raise RuntimeError("connection closed before headers completed") | |
| data += chunk | |
| if len(data) > MAX_HEADER_BYTES: | |
| raise RuntimeError("request headers too large") | |
| header, rest = data.split(b"\r\n\r\n", 1) | |
| return header + b"\r\n\r\n", rest | |
| async def read_exact_body(stream, already_read, wanted_length): | |
| body = already_read[:wanted_length] | |
| remaining = wanted_length - len(body) | |
| while remaining > 0: | |
| chunk = await stream.receive_some(min(65536, remaining)) | |
| if not chunk: | |
| raise RuntimeError("connection closed before body completed") | |
| body += chunk | |
| remaining -= len(chunk) | |
| return body | |
| async def read_request(stream): | |
| header, rest = await read_until_header_end(stream) | |
| if has_chunked_request_body(header): | |
| raise RuntimeError("chunked request bodies are blocked by this minimal proxy") | |
| length = content_length(header) | |
| if length > MAX_REQUEST_BODY_BYTES: | |
| raise RuntimeError("request body too large") | |
| body = await read_exact_body(stream, rest, length) | |
| return header, body | |
| async def send_blocked(stream, reason, scheme="", host="", port=0, method="", path=""): | |
| body_lines = [ | |
| "blocked by sandbox proxy", | |
| f"reason: {reason}", | |
| "", | |
| ] | |
| body = "\n".join(body_lines) | |
| response_lines = [ | |
| "HTTP/1.1 403 Forbidden", | |
| "Content-Type: text/plain; charset=utf-8", | |
| f"Content-Length: {len(body.encode())}", | |
| "Connection: close", | |
| "", | |
| body, | |
| ] | |
| log(f"BLOCK {reason}") | |
| if scheme and host and method and path: | |
| log_denied(reason, scheme, host, port, method, path) | |
| log(f"candidate allow row written: {CANDIDATE_RULES_PATH}") | |
| await stream.send_all("\r\n".join(response_lines).encode()) | |
| async def relay_response(upstream, client): | |
| while True: | |
| data = await upstream.receive_some(65536) | |
| if not data: | |
| return | |
| try: | |
| await client.send_all(data) | |
| except trio.BrokenResourceError: | |
| log("client closed while response was being relayed") | |
| return | |
| async def relay_bytes(source, target): | |
| while True: | |
| data = await source.receive_some(65536) | |
| if not data: | |
| await close_stream_quietly(target) | |
| return | |
| try: | |
| await target.send_all(data) | |
| except trio.BrokenResourceError: | |
| await close_stream_quietly(source) | |
| return | |
| async def relay_bidirectional(left, right): | |
| async with trio.open_nursery() as nursery: | |
| nursery.start_soon(relay_bytes, left, right) | |
| nursery.start_soon(relay_bytes, right, left) | |
| async def read_response_header_and_rest(stream): | |
| return await read_until_header_end(stream) | |
| async def forward_regular_http(client, host, port, header, body): | |
| upstream = await trio.open_tcp_stream(host, port) | |
| await upstream.send_all(rewrite_request_for_upstream(header, body, force_close=True)) | |
| await relay_response(upstream, client) | |
| await upstream.aclose() | |
| async def forward_regular_https(client, host, port, header, body): | |
| context = tls_client_context() | |
| upstream = await trio.open_ssl_over_tcp_stream( | |
| host, | |
| port, | |
| https_compatible=True, | |
| ssl_context=context, | |
| ) | |
| await upstream.send_all(rewrite_request_for_upstream(header, body, force_close=True)) | |
| await relay_response(upstream, client) | |
| await upstream.aclose() | |
| async def forward_websocket_http(client, host, port, header, body): | |
| upstream = await trio.open_tcp_stream(host, port) | |
| await upstream.send_all(rewrite_request_for_upstream(header, body, force_close=False)) | |
| response_header, response_rest = await read_response_header_and_rest(upstream) | |
| await client.send_all(response_header + response_rest) | |
| if not response_is_websocket_upgrade(response_header): | |
| await upstream.aclose() | |
| return | |
| log(f"WEBSOCKET ws://{host}:{port}") | |
| await relay_bidirectional(client, upstream) | |
| async def forward_websocket_https(client, host, port, header, body): | |
| context = tls_client_context() | |
| upstream = await trio.open_ssl_over_tcp_stream( | |
| host, | |
| port, | |
| https_compatible=True, | |
| ssl_context=context, | |
| ) | |
| await upstream.send_all(rewrite_request_for_upstream(header, body, force_close=False)) | |
| response_header, response_rest = await read_response_header_and_rest(upstream) | |
| await client.send_all(response_header + response_rest) | |
| if not response_is_websocket_upgrade(response_header): | |
| await upstream.aclose() | |
| return | |
| log(f"WEBSOCKET wss://{host}:{port}") | |
| await relay_bidirectional(client, upstream) | |
| async def handle_http_request(client, header, body): | |
| method, target, version = parse_request_line(header) | |
| headers = parsed_headers(header) | |
| scheme, host, port = host_port_from_http_target(target, headers) | |
| path = path_from_target(target) | |
| rule = matching_rule(scheme, host, port, method, path) | |
| if not rule: | |
| reason = f"no rule for {method} {scheme}://{host}:{port}{path}" | |
| await send_blocked(client, reason, scheme, host, port, method, path) | |
| return | |
| log(f"ALLOW {rule['name']}: {method} {request_url(scheme, host, port, path)}") | |
| if is_websocket_request(header): | |
| await forward_websocket_http(client, host, port, header, body) | |
| return | |
| await forward_regular_http(client, host, port, header, body) | |
| async def handle_connect_request(client, target): | |
| host, port = host_port_from_connect_target(target) | |
| if not connect_target_allowed(host, port): | |
| reason = f"CONNECT target not allowed: {host}:{port}" | |
| await send_blocked(client, reason, "https", host, port, "CONNECT", "/") | |
| return | |
| await client.send_all(b"HTTP/1.1 200 Connection Established\r\n\r\n") | |
| server_context = tls_server_context_for(host) | |
| tls_client = trio.SSLStream( | |
| client, | |
| server_context, | |
| server_side=True, | |
| https_compatible=True, | |
| ) | |
| log(f"CONNECT allowed: {host}:{port}; starting local TLS interception") | |
| try: | |
| await tls_client.do_handshake() | |
| except trio.BrokenResourceError: | |
| log(f"TLS rejected by client for {host}:{port}; install this CA inside the sandbox trust store: {CA_CERT_PATH}") | |
| await close_stream_quietly(tls_client) | |
| return | |
| except ssl.SSLError as error: | |
| log(f"TLS handshake failed for {host}:{port}: {error}") | |
| await close_stream_quietly(tls_client) | |
| return | |
| header, body = await read_request(tls_client) | |
| method, request_target, version = parse_request_line(header) | |
| path = path_from_target(request_target) | |
| rule = matching_rule("https", host, port, method, path) | |
| if not rule: | |
| reason = f"no rule for {method} https://{host}:{port}{path}" | |
| await send_blocked(tls_client, reason, "https", host, port, method, path) | |
| return | |
| log(f"ALLOW {rule['name']}: {method} {request_url('https', host, port, path)}") | |
| if is_websocket_request(header): | |
| await forward_websocket_https(tls_client, host, port, header, body) | |
| return | |
| await forward_regular_https(tls_client, host, port, header, body) | |
| await tls_client.aclose() | |
| async def handle_client(client): | |
| try: | |
| header, body = await read_request(client) | |
| method, target, version = parse_request_line(header) | |
| if method.upper() == "CONNECT": | |
| await handle_connect_request(client, target) | |
| else: | |
| await handle_http_request(client, header, body) | |
| except trio.BrokenResourceError: | |
| log("client disconnected during proxy handling") | |
| except (RuntimeError, ValueError) as error: | |
| log(f"bad client request: {error}") | |
| except OSError as error: | |
| log(f"network error while handling client: {error}") | |
| finally: | |
| await close_stream_quietly(client) | |
| async def reload_when_file_changes(): | |
| last_mtime = SCRIPT_PATH.stat().st_mtime_ns | |
| while True: | |
| await trio.sleep(RELOAD_POLL_SECONDS) | |
| current_mtime = SCRIPT_PATH.stat().st_mtime_ns | |
| if current_mtime == last_mtime: | |
| continue | |
| log(f"{SCRIPT_PATH} changed; restarting proxy") | |
| await trio.sleep(0.5) | |
| execv(executable, [executable, *argv]) | |
| async def main(): | |
| try: | |
| listeners = await trio.open_tcp_listeners(LISTEN_PORT, host=LISTEN_HOST) | |
| except OSError as error: | |
| log(f"cannot listen on {LISTEN_HOST}:{LISTEN_PORT}: {error}") | |
| log("another sandbox proxy is probably already running") | |
| return | |
| log(f"ready: http://{LISTEN_HOST}:{LISTEN_PORT}") | |
| log(f"CA certificate: {CA_CERT_PATH}") | |
| log(f"denied log: {DENIED_LOG_PATH}") | |
| log(f"candidate allow rules: {CANDIDATE_RULES_PATH}") | |
| log("HTTPS interception needs this CA installed inside the sandbox trust store") | |
| async with trio.open_nursery() as nursery: | |
| nursery.start_soon(reload_when_file_changes) | |
| await trio.serve_listeners(handle_client, listeners) | |
| trio.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment