Last active
October 13, 2025 14:35
-
-
Save spvkgn/82b396ff3f9ba0c19d1536a1e5c4777c to your computer and use it in GitHub Desktop.
Capture TLS ClientHello / QUIC Initial
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| capture_tls_quic.py — Capture TLS ClientHello and QUIC Initial | |
| -t : capture TLS ClientHello (default) | |
| -q : capture QUIC Initial | |
| -a : capture both (TLS + QUIC) | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import socket | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| import shutil | |
| import platform | |
| import functools | |
| from datetime import datetime | |
| from pathlib import Path | |
| from contextlib import contextmanager | |
| from typing import Iterator, Tuple | |
| IS_WINDOWS = platform.system() == "Windows" | |
| MAX_BUFFER_SIZE = 2048 | |
| CAPTURE_TIMEOUT = 1 | |
| RETRIES = 3 | |
| OUT_DIR = Path.cwd() | |
| CURL_CMD = "curl" | |
| CURL_CMD_ALT = "curl-quiche" | |
| if shutil.which(CURL_CMD_ALT) is not None: | |
| CURL_CMD = CURL_CMD_ALT | |
| PROTOCOL_INFO = { | |
| "tls_clienthello": ("tls", "TLS ClientHello"), | |
| "quic_initial": ("quic", "QUIC Initial"), | |
| } | |
| def timestamp(): | |
| return datetime.now().strftime('%Y%m%dT%H%M%S') | |
| def resolve_host(host: str) -> str: | |
| """Resolve hostname to IP address with proper error handling""" | |
| try: | |
| for res in socket.getaddrinfo(host, 443, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM): | |
| af, socktype, proto, canonname, sa = res | |
| return sa[0] | |
| raise socket.gaierror(f"Could not resolve {host}") | |
| except socket.gaierror as e: | |
| raise RuntimeError(f'❌ Could not resolve {host}: {e}') | |
| except Exception as e: | |
| raise RuntimeError(f'❌ Unexpected error resolving {host}: {e}') | |
| def safe_bind(sock: socket.socket, addr: tuple[str, int]): | |
| """Cross-platform bind helper (prevents 'invalid argument' on Windows)""" | |
| try: | |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| sock.bind(addr) | |
| except OSError as e: | |
| print(f"❗[warn] bind failed on {addr}: {e}") | |
| if IS_WINDOWS: | |
| try: | |
| sock.bind(("127.0.0.1", 0)) | |
| except OSError as e2: | |
| print(f"❌ [fatal] Fallback bind also failed: {e2}") | |
| raise | |
| def file_operation_handler(func): | |
| """Decorator for file operation error handling""" | |
| @functools.wraps(func) | |
| def wrapper(path: Path, protocol_prefix: str = "", *args, **kwargs): | |
| tag = f"[{protocol_prefix}]" if protocol_prefix else "" | |
| func_name = func.__name__ | |
| try: | |
| return func(path, protocol_prefix, *args, **kwargs) | |
| except IOError as e: | |
| print(f"❌ {tag} [{func_name}] Error processing file {path.name}: {e}") | |
| except Exception as e: | |
| print(f"❌ {tag} [{func_name}] Unexpected error: {e}") | |
| return wrapper | |
| @file_operation_handler | |
| def trim_trailing_zeros(path: Path, protocol_prefix: str = ""): | |
| """Remove trailing zeros from binary file""" | |
| with open(path, "rb") as f: | |
| data = f.read() | |
| end = len(data) | |
| while end > 0 and data[end - 1] == 0: | |
| end -= 1 | |
| if end < len(data): | |
| with open(path, "wb") as f: | |
| f.write(data[:end]) | |
| tag = f"[{protocol_prefix}]" if protocol_prefix else "" | |
| print(f"🔄 {tag} [trim] Removed {len(data) - end} trailing zero bytes from {path.name}") | |
| @file_operation_handler | |
| def hexdump_bin_file(filename: Path, protocol_prefix: str = "", bytes_count=16): | |
| """Display hexdump of binary file (first line only) similar to 'hexdump -C -n 16'""" | |
| with open(filename, 'rb') as f: | |
| data = f.read(bytes_count) | |
| if not data: | |
| print(f"[{protocol_prefix}] File {filename.name} is empty") | |
| return | |
| hex_groups = [] | |
| for i in range(0, len(data), 8): | |
| group = data[i:i+8] | |
| hex_groups.append(' '.join(f'{b:02x}' for b in group)) | |
| hex_line = ' '.join(hex_groups).ljust(47) | |
| ascii_part = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in data) | |
| tag = f"[{protocol_prefix}]" if protocol_prefix else "" | |
| print(f"✅ {tag} [hexdump] 00000000 {hex_line} |{ascii_part}|") | |
| class ManagedSocket: | |
| """Wrapper for socket that ensures proper cleanup""" | |
| def __init__(self, sock_type: int): | |
| try: | |
| self._sock = socket.socket(socket.AF_INET, sock_type) | |
| self._is_closed = False | |
| except socket.error as e: | |
| raise RuntimeError(f"Failed to create socket: {e}") | |
| @property | |
| def sock(self) -> socket.socket: | |
| if self._is_closed: | |
| raise RuntimeError("Socket is closed") | |
| return self._sock | |
| def close(self): | |
| if not self._is_closed and self._sock: | |
| try: | |
| self._sock.close() | |
| except socket.error as e: | |
| print(f"⚠️ [warn] Error closing socket: {e}") | |
| finally: | |
| self._is_closed = True | |
| def __del__(self): | |
| self.close() | |
| @contextmanager | |
| def managed_tcp_connection(addr: Tuple[str, int]) -> Iterator[socket.socket]: | |
| """Context manager for TCP connection that ensures proper cleanup""" | |
| sock = None | |
| try: | |
| sock = socket.create_connection(addr, timeout=CAPTURE_TIMEOUT) | |
| yield sock | |
| except socket.timeout: | |
| print(f"❌ [tcp] Connection timeout to {addr}") | |
| raise | |
| except socket.error as e: | |
| print(f"❌ [tcp] Connection failed to {addr}: {e}") | |
| raise | |
| finally: | |
| if sock: | |
| try: | |
| sock.close() | |
| except socket.error: | |
| pass | |
| class BaseProxyCapture: | |
| """Base class for proxy capture implementations""" | |
| def __init__(self, domain: str, remote_ip: str, protocol: str): | |
| self.domain = domain | |
| self.remote_ip = remote_ip | |
| self.protocol = protocol | |
| self.saved_path = OUT_DIR / f"{protocol}_{domain}_{timestamp()}.bin" | |
| self.captured = None | |
| self.stop_event = threading.Event() | |
| self.local_port = None | |
| self.managed_sockets = [] | |
| @property | |
| def prefix(self) -> str: | |
| return PROTOCOL_INFO.get(self.protocol, (self.protocol, ""))[0] | |
| @property | |
| def display_name(self) -> str: | |
| return PROTOCOL_INFO.get(self.protocol, (self.protocol, self.protocol.replace('_', ' ').title()))[1] | |
| def setup_socket(self, sock_type: int, bind_addr: tuple[str, int] = ("127.0.0.1", 0)) -> ManagedSocket: | |
| """Create and bind a socket safely, register it for cleanup""" | |
| sock_mgr = ManagedSocket(sock_type) | |
| safe_bind(sock_mgr.sock, bind_addr) | |
| self.managed_sockets.append(sock_mgr) | |
| return sock_mgr | |
| def get_curl_command(self) -> list[str]: | |
| """Get curl command for the specific protocol""" | |
| base_cmd = [ | |
| CURL_CMD, | |
| "-ISs", | |
| "--connect-to", f"{self.domain}:443:127.0.0.1:{self.local_port}", | |
| "--max-time", str(CAPTURE_TIMEOUT), | |
| "--curves", "X25519", | |
| f"https://{self.domain}", | |
| ] | |
| if self.protocol == "quic_initial": | |
| base_cmd.insert(2, "--http3-only") | |
| return base_cmd | |
| def save_captured_data(self, data: bytes) -> None: | |
| """Save captured data to file""" | |
| try: | |
| self.captured = bytes(data) | |
| with open(self.saved_path, "wb") as f: | |
| f.write(self.captured) | |
| print(f"✅ [{self.prefix}] Saved {self.display_name} to {self.saved_path.name}") | |
| if self.protocol == "quic_initial": | |
| trim_trailing_zeros(self.saved_path, self.prefix) | |
| hexdump_bin_file(self.saved_path, self.prefix) | |
| except Exception as e: | |
| print(f"❌ [{self.prefix}] Error saving data: {e}") | |
| def run_capture(self) -> Path | None: | |
| """Main capture execution flow""" | |
| try: | |
| self.create_sockets() | |
| print(f"🔄 [{self.prefix}] Listening on 127.0.0.1:{self.local_port}, forwarding to {self.remote_ip}:443") | |
| th = threading.Thread(target=self.handle_proxy_loop, daemon=True) | |
| th.start() | |
| cmd = self.get_curl_command() | |
| print(f"🔄 [{self.prefix}] Running:", " ".join(cmd)) | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| check=False, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.PIPE, | |
| timeout=CAPTURE_TIMEOUT + 1 | |
| ) | |
| if result.returncode not in (0, 28): | |
| stderr_text = result.stderr.decode(errors="ignore").strip() | |
| msg = f"⚠️ [{self.prefix}] curl exited ({result.returncode})" | |
| if stderr_text: | |
| msg += f": {stderr_text.splitlines()[-1]}" | |
| print(msg) | |
| except FileNotFoundError: | |
| print(f"❌ [{self.prefix}] curl not found; please install curl") | |
| self.stop_event.set() | |
| return None | |
| except subprocess.TimeoutExpired: | |
| print(f"⚠️ [{self.prefix}] curl command timed out") | |
| except Exception as e: | |
| print(f"⚠️ [{self.prefix}] curl execution warning: {e}") | |
| th.join(timeout=2) | |
| self.stop_event.set() | |
| if self.saved_path.exists() and self.saved_path.stat().st_size > 0: | |
| return self.saved_path | |
| else: | |
| print(f"❌ [{self.prefix}] No data captured or file is empty") | |
| return None | |
| except socket.error as e: | |
| print(f"❌ [{self.prefix}] Network error: {e}") | |
| return None | |
| except IOError as e: | |
| print(f"❌ [{self.prefix}] I/O error: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"❌ [{self.prefix}] Unexpected error: {e}") | |
| return None | |
| finally: | |
| self.cleanup() | |
| def cleanup(self) -> None: | |
| for managed_sock in self.managed_sockets: | |
| try: | |
| managed_sock.close() | |
| except Exception as e: | |
| print(f"⚠️ [{self.prefix}] Warning during socket cleanup: {e}") | |
| self.managed_sockets.clear() | |
| class TCPProxyCapture(BaseProxyCapture): | |
| """TCP proxy for TLS ClientHello capture""" | |
| def __init__(self, domain: str, remote_ip: str): | |
| super().__init__(domain, remote_ip, "tls_clienthello") | |
| self.server_sock_manager = None | |
| def create_sockets(self) -> None: | |
| try: | |
| self.server_sock_manager = self.setup_socket(socket.SOCK_STREAM) | |
| self.server_sock_manager.sock.listen(1) | |
| self.local_port = self.server_sock_manager.sock.getsockname()[1] | |
| except Exception as e: | |
| print(f"❌ [tcp] Failed to create server socket: {e}") | |
| raise | |
| def handle_proxy_loop(self) -> None: | |
| try: | |
| server_sock = self.server_sock_manager.sock | |
| server_sock.settimeout(CAPTURE_TIMEOUT) | |
| try: | |
| client_sock, client_addr = server_sock.accept() | |
| except socket.timeout: | |
| print("⚠️ [tcp] No client connection received (timeout)") | |
| return | |
| with managed_tcp_connection((self.remote_ip, 443)) as remote_sock: | |
| first_data = client_sock.recv(MAX_BUFFER_SIZE) | |
| if first_data: | |
| self.save_captured_data(first_data) | |
| remote_sock.sendall(first_data) | |
| def forward(src, dst): | |
| try: | |
| while True: | |
| data = src.recv(MAX_BUFFER_SIZE) | |
| if not data: | |
| break | |
| dst.sendall(data) | |
| except Exception: | |
| pass | |
| t1 = threading.Thread(target=forward, args=(client_sock, remote_sock), daemon=True) | |
| t2 = threading.Thread(target=forward, args=(remote_sock, client_sock), daemon=True) | |
| t1.start() | |
| t2.start() | |
| t1.join(timeout=CAPTURE_TIMEOUT) | |
| t2.join(timeout=CAPTURE_TIMEOUT) | |
| except Exception as e: | |
| print(f"❌ [tcp] Proxy error: {e}") | |
| finally: | |
| self.stop_event.set() | |
| class UDPProxyCapture(BaseProxyCapture): | |
| """UDP proxy for QUIC Initial capture""" | |
| def __init__(self, domain: str, remote_ip: str): | |
| super().__init__(domain, remote_ip, "quic_initial") | |
| self.sock_in_manager = None | |
| self.sock_out_manager = None | |
| def create_sockets(self) -> None: | |
| try: | |
| self.sock_in_manager = self.setup_socket(socket.SOCK_DGRAM) | |
| self.local_port = self.sock_in_manager.sock.getsockname()[1] | |
| self.sock_out_manager = self.setup_socket(socket.SOCK_DGRAM, ("0.0.0.0", 0)) | |
| except Exception as e: | |
| print(f"❌ [udp] Failed to create UDP sockets: {e}") | |
| raise | |
| def handle_proxy_loop(self) -> None: | |
| try: | |
| sock_in = self.sock_in_manager.sock | |
| sock_out = self.sock_out_manager.sock | |
| sock_in.settimeout(CAPTURE_TIMEOUT) | |
| while not self.stop_event.is_set(): | |
| try: | |
| data, _ = sock_in.recvfrom(MAX_BUFFER_SIZE) | |
| except socket.timeout: | |
| break | |
| if self.captured is None: | |
| self.save_captured_data(data) | |
| try: | |
| sock_out.sendto(data, (self.remote_ip, 443)) | |
| except socket.error as e: | |
| print(f"⚠️ [udp] Failed to send data: {e}") | |
| except Exception as e: | |
| print(f"❌ [udp] Proxy error: {e}") | |
| finally: | |
| self.stop_event.set() | |
| def run_tcp_proxy_capture(domain: str, remote_ip: str) -> Path | None: | |
| """Capture TLS ClientHello using TCP proxy""" | |
| try: | |
| proxy = TCPProxyCapture(domain, remote_ip) | |
| return proxy.run_capture() | |
| except Exception as e: | |
| print(f"❌ [tcp] Capture setup failed: {e}") | |
| return None | |
| def run_udp_proxy_capture(domain: str, remote_ip: str) -> Path | None: | |
| """Capture QUIC Initial using UDP proxy""" | |
| try: | |
| proxy = UDPProxyCapture(domain, remote_ip) | |
| return proxy.run_capture() | |
| except Exception as e: | |
| print(f"❌ [udp] Capture setup failed: {e}") | |
| return None | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Capture TLS ClientHello / QUIC Initial") | |
| parser.add_argument("-t", action="store_true", help="Capture TLS ClientHello (default)") | |
| parser.add_argument("-q", action="store_true", help="Capture QUIC Initial") | |
| parser.add_argument("-a", action="store_true", help="Capture both TLS and QUIC") | |
| parser.add_argument("host", help="Target hostname (example.com)") | |
| args = parser.parse_args() | |
| if not (args.t or args.q or args.a): | |
| args.t = True | |
| host = args.host.strip() | |
| if not host: | |
| print("❌ Error: Host cannot be empty") | |
| return | |
| try: | |
| remote_ip = resolve_host(host) | |
| print(f"🔄 Resolved {host} -> {remote_ip}") | |
| except Exception as e: | |
| print(f"❌ {e}") | |
| return | |
| if args.a: | |
| run_tcp_proxy_capture(host, remote_ip) | |
| run_udp_proxy_capture(host, remote_ip) | |
| return | |
| success = False | |
| for attempt in range(1, RETRIES + 1): | |
| print(f"🔄 Attempt {attempt}/{RETRIES}") | |
| result = run_udp_proxy_capture(host, remote_ip) if args.q else run_tcp_proxy_capture(host, remote_ip) | |
| if result: | |
| success = True | |
| break | |
| if attempt < RETRIES: | |
| print("⏳ Retrying...") | |
| time.sleep(1) | |
| else: | |
| print("❌ All attempts failed") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\n⏹️ Operation cancelled by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"❌ Fatal error: {e}") | |
| sys.exit(1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env bash | |
| set -euo pipefail | |
| usage() { | |
| cat <<EOF | |
| Usage: ${0##*/} [-t|-q|-a] example.com | |
| Options: | |
| -t Capture TLS ClientHello (default) | |
| -q Capture QUIC Initial | |
| -a Capture both TLS and QUIC | |
| EOF | |
| exit 1 | |
| } | |
| MODE=tls | |
| while getopts "tqa" opt; do | |
| case $opt in | |
| t) MODE=tls ;; | |
| q) MODE=quic ;; | |
| a) MODE=all ;; | |
| *) usage ;; | |
| esac | |
| done | |
| shift $((OPTIND-1)) | |
| [ -z "$1" ] && usage | |
| TOOL=curl # supported: curl (Debian 13 Trixie), gocurl | |
| DOMAIN="$1" | |
| TLS_FILE="tls_clienthello_${DOMAIN//./_}.bin" | |
| QUIC_FILE="quic_initial_${DOMAIN//./_}.bin" | |
| LOG_FILE="${DOMAIN//./_}.socat.log" | |
| cleanup() { | |
| pids=$(pgrep -f "socat.*-LISTEN:*" | tr '\n' ' ') | |
| [ -n "$pids" ] && kill -TERM $pids 2>/dev/null | |
| rm -f "$LOG_FILE" | |
| } | |
| show_result() { | |
| local file="$1" | |
| local type="$2" | |
| [ -s "$file" ] || { echo -e "\n❌ No $type captured"; rm -f "$file"; return 1; } | |
| cat <<EOF | |
| ✅ $type saved to $file | |
| Size: $(wc -c < "$file") bytes | |
| First 32 bytes: | |
| $(hexdump -C -n 32 "$file" | head -n 2) | |
| EOF | |
| } | |
| capture_tls() { | |
| local port=$((RANDOM % 63001 + 2000)) | |
| local curl_opts=(--tlsv1.3 -k --connect-to $DOMAIN:443:127.0.0.1:$port -k https://$DOMAIN) | |
| local curl_cmd | |
| case "$TOOL" in | |
| curl*) curl_cmd=(-IS "${curl_opts[@]}") ;; | |
| gocurl) curl_cmd=(-I "${curl_opts[@]}") ;; | |
| *) echo "❌ Unknown tool: $TOOL (supported: curl, gocurl)" >&2; return 1 ;; | |
| esac | |
| socat TCP-LISTEN:"$port",reuseaddr,shut-none - > "$TLS_FILE" & sleep 0.5 | |
| $TOOL "${curl_cmd[@]}" >/dev/null 2>&1 || true | |
| show_result "$TLS_FILE" "TLS ClientHello" | |
| } | |
| capture_quic() { | |
| local port=$((RANDOM % 63001 + 2000)) | |
| local ip retries=10 attempt=1 tmp_file="${QUIC_FILE}.tmp" | |
| local curl_opts=(-k --connect-to $DOMAIN:443:127.0.0.1:$port -k https://$DOMAIN) | |
| local size pkt_length pkt_from curl_cmd | |
| case "$TOOL" in | |
| curl*) | |
| if [ -n "${TERMUX_VERSION:-}" ]; then | |
| pkt_length=1200 pkt_from=0 | |
| else | |
| pkt_length=1200 pkt_from=1200 | |
| fi | |
| curl_cmd=(-IS --http3-only "${curl_opts[@]}") | |
| ;; | |
| gocurl) | |
| pkt_length=1252 pkt_from=0 | |
| curl_cmd=(-I --http3 "${curl_opts[@]}") | |
| ;; | |
| *) | |
| echo "❌ Unknown tool: $TOOL (supported: curl, gocurl)" | |
| return 1 | |
| ;; | |
| esac | |
| local pkt_to=$((pkt_from + pkt_length - 1)) | |
| ip=$(ping -4 -c 1 "$DOMAIN" | awk -F'[()]' '/PING/{ print $2 }') | |
| [ -z "$ip" ] && { echo "❌ Cannot resolve $DOMAIN"; return 1; } | |
| socat -v -x UDP4-LISTEN:$port,reuseaddr,fork UDP4:$ip:443 > "$LOG_FILE" 2>&1 & sleep 0.5 | |
| while [ $attempt -le $retries ]; do | |
| echo "🔄 QUIC Initial capture attempt #$attempt" | |
| rm -f "$tmp_file" | |
| $TOOL "${curl_cmd[@]}" >/dev/null 2>&1 || true | |
| if awk -v len="$pkt_length" -v fr="$pkt_from" -v to="$pkt_to" ' | |
| $0 ~ /^> .*length=[0-9]+ from=[0-9]+ to=[0-9]+$/ { | |
| if ($0 ~ "length=" len " from=" fr " to=" to) { flag=1; next } | |
| } | |
| flag && /^--$/ { exit } | |
| flag { print substr($0,2,48) } | |
| ' "$LOG_FILE" | tr -d ' \n' | xxd -r -p > "$tmp_file" | |
| then | |
| size=$(wc -c < "$tmp_file" 2>/dev/null || echo 0) | |
| if [ "$size" -eq "$pkt_length" ]; then | |
| mv "$tmp_file" "$QUIC_FILE" | |
| show_result "$QUIC_FILE" "QUIC Initial" | |
| return 0 | |
| else | |
| echo "❌ Received $size bytes (need $pkt_length), retrying..." | |
| fi | |
| else | |
| echo "❌ Failed to extract QUIC Initial on attempt #$attempt" | |
| fi | |
| ((attempt++)) | |
| done | |
| echo -e "\n❌ All $retries attempts failed to capture ${pkt_length}-byte QUIC Initial" | |
| return 1 | |
| } | |
| trap cleanup EXIT INT TERM | |
| case $MODE in | |
| tls) capture_tls ;; | |
| quic) capture_quic ;; | |
| all) capture_tls; capture_quic ;; | |
| esac |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env bash | |
| if ! command -v quicly >/dev/null 2>&1; then | |
| echo "❌ Error: quicly not found!" | |
| echo "You can install it from: https://github.com/h2o/quicly" | |
| exit 1 | |
| fi | |
| if [ $# -lt 1 ]; then | |
| echo "Usage: $0 domain [port]" | |
| echo "Example: $0 example.com" | |
| exit 1 | |
| fi | |
| QUICLY_PID="" | |
| DOMAIN="$1" | |
| PORT="${2:-443}" | |
| LOG_FILE="quicly.log.txt" | |
| BIN_FILE="quic_initial_${DOMAIN//./_}.bin" | |
| cleanup() { | |
| if [ -n "$QUICLY_PID" ]; then | |
| kill -TERM "$QUICLY_PID" 2>/dev/null | |
| sleep 0.1 | |
| if kill -0 "$QUICLY_PID" 2>/dev/null; then | |
| kill -KILL "$QUICLY_PID" 2>/dev/null | |
| fi | |
| wait "$QUICLY_PID" 2>/dev/null | |
| fi | |
| rm -f "$LOG_FILE" | |
| } | |
| trap cleanup EXIT INT TERM | |
| DOMAIN_LENGTH=${#DOMAIN} | |
| if [ "$DOMAIN_LENGTH" -gt 26 ]; then | |
| UDP_SIZE=$((554 + DOMAIN_LENGTH + 32)) | |
| else | |
| UDP_SIZE=$((554 + DOMAIN_LENGTH)) | |
| fi | |
| echo "Domain: $DOMAIN" | |
| echo "Port: $PORT" | |
| rm -f "$BIN_FILE" | |
| echo "Running quicly in background..." | |
| quicly -vv -u "$UDP_SIZE" -x x25519 -y aes128gcmsha256 "$DOMAIN" "$PORT" > "$LOG_FILE" 2>&1 & | |
| QUICLY_PID=$! | |
| sleep 1 | |
| if [ ! -f "$LOG_FILE" ]; then | |
| echo "❌ Error: Log file $LOG_FILE was not created!" | |
| exit 1 | |
| fi | |
| echo "Extracting binary data..." | |
| awk ' | |
| /^sendmsg \([0-9]+ bytes\):/ { | |
| if (found) exit | |
| found = 1 | |
| next | |
| } | |
| found && /^[[:xdigit:][:space:]]+$/ { | |
| gsub(/[^0-9a-fA-F]/, "") | |
| printf "%s", $0 | |
| } | |
| ' "$LOG_FILE" | head -c "$((UDP_SIZE * 2))" | xxd -r -p > "$BIN_FILE" | |
| size=$(wc -c < "$BIN_FILE") | |
| if [ "$size" -eq "$UDP_SIZE" ]; then | |
| echo "✅ QUIC Initial saved to $BIN_FILE ($size bytes)" | |
| echo "First 16 bytes:" | |
| xxd -l 16 "$BIN_FILE" | |
| else | |
| echo "❌ File size $size != expected $UDP_SIZE bytes" | |
| exit 1 | |
| fi |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment