Last active
January 9, 2025 06:00
-
-
Save blackmius/9b60cfa51fe4528a8578e17961b153f3 to your computer and use it in GitHub Desktop.
masque client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import asyncio | |
import ssl | |
import base64 | |
import argparse | |
import sys | |
import types | |
from collections import OrderedDict | |
from urllib.parse import urlparse | |
from aioquic.asyncio import connect | |
from aioquic.asyncio.protocol import QuicConnectionProtocol | |
from aioquic.h3.connection import H3_ALPN, H3Connection | |
from aioquic.h3.events import HeadersReceived, DataReceived, H3Event, DatagramReceived | |
from aioquic.quic.connection import QuicConnection | |
from aioquic.quic.configuration import QuicConfiguration | |
from aioquic.quic.events import ConnectionTerminated, HandshakeCompleted | |
from aioquic.quic.logger import QuicFileLogger | |
import aioquic.tls as tls | |
DEFAULT_PORT = 443 | |
CONFIG = argparse.Namespace() | |
class Http3ClientError(Exception): | |
pass | |
def create_quic_configuration(): | |
config = QuicConfiguration(is_client=True) | |
config.alpn_protocols = H3_ALPN | |
if CONFIG.verbose: | |
print('* ALPN', config.alpn_protocols) | |
config.max_datagram_size = CONFIG.max_datagram_size | |
config.max_datagram_frame_size = 10000 | |
if CONFIG.insecure: | |
config.verify_mode = ssl.CERT_NONE | |
return config | |
def wrap_tls_context_methods(context): | |
_handle_reassembled_message = context._handle_reassembled_message | |
def wrapped_handle_reassembled_message( | |
self, message_type, input_buf, output_buf | |
): | |
handshake_type = tls.HandshakeType(message_type) | |
print(f'* TLSv1.3 (IN) {handshake_type.name}({handshake_type.value}):') | |
_handle_reassembled_message(message_type, input_buf, output_buf) | |
context._handle_reassembled_message = types.MethodType(wrapped_handle_reassembled_message, context) | |
_set_state = context._set_state | |
def wrapped_set_state(self, state): | |
state = tls.State(state) | |
print(f'* TLSv1.3 (STATE) {state.name}({state.value}):') | |
_set_state(state) | |
context._set_state = types.MethodType(wrapped_set_state, context) | |
def wrap_tls_events(quic): | |
if quic._connect_called: | |
return wrap_tls_context_methods(quic.tls) | |
fn = quic._connect | |
def _connect(self, now): | |
fn(now) | |
client_hello = tls.HandshakeType.CLIENT_HELLO | |
print(f'* TLSv1.3 (OUT) {client_hello.name}({client_hello.value}):') | |
wrap_tls_context_methods(self.tls) | |
quic._connect = types.MethodType(_connect, quic) | |
class H3ClientProtocol(QuicConnectionProtocol): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self._http = H3Connection(self._quic, True) | |
self._request_waiter = self._loop.create_future() | |
self.http_response_headers = OrderedDict() | |
self.http_response_data = bytearray() | |
if CONFIG.verbose: | |
wrap_tls_events(self._quic) | |
def http_event_received(self, event: H3Event) -> None: | |
if CONFIG.debug: | |
print(self.__class__.__name__, 'http_event_received', event) | |
if isinstance(event, DataReceived): | |
self.http_response_data.extend(event.data) | |
elif isinstance(event, HeadersReceived): | |
self.http_response_headers.update( | |
{k.decode(): v.decode() for k, v in event.headers} | |
) | |
if event.stream_ended and not self._request_waiter.done(): | |
self._request_waiter.set_result(True) | |
def quic_event_received(self, event): | |
if CONFIG.debug: | |
print(self.__class__.__name__, 'quic_event_received', event) | |
for http_event in self._http.handle_event(event): | |
self.http_event_received(http_event) | |
if isinstance(event, HandshakeCompleted) and CONFIG.verbose: | |
print(f'* ALPN server accepted', event.alpn_protocol) | |
cert = self._quic.tls._peer_certificate | |
print('* Server certificate:') | |
print(f'* subject: {cert.subject.rfc4514_string()}') | |
print(f'* start date: {cert.not_valid_before_utc}') | |
print(f'* expire date: {cert.not_valid_after_utc}') | |
print(f'* issuer: {cert.issuer.rfc4514_string()}') | |
for k, v in self.sent_headers: | |
print(f'> {k.decode()}: {v.decode()}') | |
print('>') | |
if isinstance(event, ConnectionTerminated) and not self._request_waiter.done(): | |
error_message = 'Connection terminated' | |
if event.reason_phrase: | |
error_message += f': {event.reason_phrase}' | |
self._request_waiter.set_exception(Http3ClientError(error_message)) | |
async def send_http_request(self, url, method='GET', headers=None, content=None, proxy=None, | |
proxy_auth=None): | |
if method == 'HEAD': | |
self._http._check_content_length = types.MethodType( | |
lambda self, stream: None, self._http) | |
self.sent_headers = [ | |
(b':method', method.encode()), | |
(b':scheme', b'https'), | |
(b':authority', url.hostname.encode()), | |
(b':path', (url.path or '/').encode()), | |
] + [(k.encode(), v.encode()) for k, v in (headers or {}).items()] | |
stream_id = self._quic.get_next_available_stream_id() | |
self._http.send_headers(stream_id, self.sent_headers, end_stream=not content) | |
if content: | |
self.sent_data = content | |
self._http.send_data(stream_id, data=content.encode(), end_stream=True) | |
self.transmit() | |
await asyncio.shield(self._request_waiter) | |
return self.http_response_data, self.http_response_headers | |
class ProxyBadStatus(Http3ClientError): | |
def __init__(self, headers): | |
self.headers = headers | |
class HTTPProxiedTransport: | |
def __init__(self, http, stream_id, transmit): | |
self.http = http | |
self.stream_id = stream_id | |
self.transmit = transmit | |
def sendto(self, data, addr): | |
self.http.send_datagram(self.stream_id, b'\x00'+data) | |
self.transmit() | |
class H3ProxyProtocol(H3ClientProtocol): | |
def http_headers_received(self, event: HeadersReceived): | |
headers = {k.decode(): v.decode() for k, v in event.headers} | |
self.http_response_headers = headers | |
if headers[':status'] == '200' and headers['capsule-protocol'] == '?1': | |
self._request_waiter.set_result(True) | |
else: | |
self._request_waiter.set_exception(ProxyBadStatus(headers)) | |
def http_event_received(self, event: H3Event) -> None: | |
if CONFIG.debug: | |
print(self.__class__.__name__, 'http_event_received', event) | |
if isinstance(event, DatagramReceived): | |
self.proxy_quic.receive_datagram(event.data[1:], self.proxy_addr, self._loop.time()) | |
elif isinstance(event, HeadersReceived): | |
self.http_headers_received(event) | |
async def send_http_request(self, url, method='GET', headers=None, content=None, proxy=None, | |
proxy_auth=None): | |
if CONFIG.verbose: | |
print('* Connecting to proxy') | |
stream_id = self._quic.get_next_available_stream_id() | |
template_url = f'/.well-known/masque/udp/{url.hostname}/{url.port or DEFAULT_PORT}/' | |
proxy_headers = [ | |
(b':method', b'CONNECT'), | |
(b':scheme', b'https'), | |
(b':authority', proxy.hostname.encode()), | |
(b':path', template_url.encode()), | |
(b':protocol', b'connect-udp'), | |
(b'capsule-protocol', b'?1'), | |
] | |
if proxy_auth: | |
token = base64.b64encode(proxy_auth.encode()) | |
proxy_headers.append((b'proxy-authorization', b'Basic '+token)) | |
self.sent_headers = proxy_headers | |
self._http.send_headers(stream_id, self.sent_headers, end_stream=False) | |
self.transmit() | |
await asyncio.shield(self._request_waiter) | |
if CONFIG.verbose: | |
print('* Request completely sent off') | |
for k, v in self.http_response_headers.items(): | |
print(f'< {k}: {v}') | |
print('<') | |
local_port = self._transport._sock.getsockname()[1] | |
print(f'* Proxy connected: 127.0.0.1:{local_port} -> {proxy.netloc} -> {url.netloc}') | |
print('* Starting request') | |
configuration = create_quic_configuration() | |
configuration.max_datagram_size = 1200 | |
configuration.server_name = url.hostname | |
self.proxy_quic = QuicConnection(configuration=configuration) | |
self.proxy_addr = (url.hostname, url.port or DEFAULT_PORT) | |
self.proxy_quic.connect(self.proxy_addr, self._loop.time()) | |
self.proxy_http = H3ClientProtocol(self.proxy_quic) | |
self.proxy_http._transport = HTTPProxiedTransport(self._http, stream_id, self.transmit) | |
return await self.proxy_http.send_http_request(url, method, headers, content) | |
async def send_request(host, port, url, method='GET', content=None, headers=None, | |
proxy=None, proxy_auth=None): | |
async with connect( | |
host=host, | |
port=port, | |
create_protocol=H3ProxyProtocol if proxy else H3ClientProtocol, | |
configuration=create_quic_configuration(), | |
wait_connected=False | |
) as client: | |
try: | |
data, headers = await client.send_http_request(url, method, headers, content, proxy, proxy_auth) | |
if CONFIG.show_headers or method == 'HEAD': | |
print("\n".join([f'{k}: {v}' for k, v in headers.items()])) | |
if data: | |
print("\n"+data.decode()) | |
except ProxyBadStatus as e: | |
print("\n".join([f'{k}: {v}' for k, v in e.headers.items()])) | |
print("Proxy responded with non-200 status") | |
except Http3ClientError as e: | |
print(f"HTTP/3 client error: {e}") | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
raise | |
def parse_args(): | |
parser = argparse.ArgumentParser(description='Make HTTP3 request to a given URL') | |
parser.add_argument('method', nargs='?', default='GET', | |
help=( | |
'The HTTP method to use for the request. The default method is GET. ' | |
'Other common methods include POST, PUT, DELETE, PATCH, etc.' | |
)) | |
parser.add_argument('url', type=str, | |
help=( | |
'The URL to which the HTTP3 request will be made. This is a required argument. ' | |
'Example: https://example.com. Note URL scheme may be only https:// or not specified and then automaticaly added https' | |
)) | |
parser.add_argument('-H', dest='headers', action='append', | |
help=( | |
'HTTP headers to send with the request. ' | |
'Provide each header in the format "Key: Value". ' | |
'Multiple headers can be provided by repeating the -H option. ' | |
'Example: -H "User-Agent: CustomAgent" -H "Authorization: Bearer <token>"' | |
)) | |
parser.add_argument('-d', '--data', type=str, | |
help=( | |
'The request payload (data) to send with the request. ' | |
'This is typically used for POST, PUT, or PATCH requests to send data in the body. ' | |
'Example: -d "name=John&age=30"' | |
)) | |
parser.add_argument('-i', '--show-headers', action='store_true', | |
help=( | |
'Shows the response headers in the output. ' | |
'This can help you debug the HTTP3 request and inspect details like status codes, content type, ' | |
'cookies, etc. The response body will not be shown unless explicitly requested.' | |
)) | |
parser.add_argument('--proxy', type=str, | |
help=( | |
'Specify the HTTP3 proxy server address to use for making the request. ' | |
'The proxy should use the CONNECT-UDP protocol, which is compatible with HTTP3 connections. ' | |
'Provide the proxy in the format "hostname:port". ' | |
'Example: --proxy "proxy.example.com:8888"' | |
)) | |
parser.add_argument('--proxy-auth', type=str, | |
help=( | |
'Proxy authentication credentials, provided in the format "username:password". ' | |
'This is used to authenticate against the proxy server if it requires authentication.' | |
)) | |
parser.add_argument('-k', '--insecure', action='store_true', | |
help=( | |
'Skips SSL certificate verification for QUIC connetions. ' | |
'This is useful when testing with self-signed certificates or untrusted certificate authorities. ' | |
'Be cautious as this reduces security.' | |
)) | |
parser.add_argument('--max-datagram-size', default=1350, type=int, | |
help=( | |
'Sets the maximum datagram size for QUIC connections. ' | |
'This can be useful for networks with a small maximum transmission unit (MTU). ' | |
'The default is 1350 bytes.' | |
)) | |
parser.add_argument('-v', '--verbose', action='store_true', | |
help=( | |
'Enables verbose output, which provides more detailed logs of the request process, ' | |
'including headers, payloads, and connection details.' | |
)) | |
parser.add_argument('--debug', action='store_true', | |
help=( | |
'Enables debugging output. This will show extremely detailed logs, including internal processes, networking details, ' | |
'and possibly lower-level debug information from the HTTP3 client.' | |
)) | |
return parser.parse_args() | |
def panic(message): | |
print(message) | |
sys.exit(1) | |
def validate_headers(headers): | |
valid_headers = {} | |
for kv in headers or []: | |
try: | |
key, value = kv.split(':', 1) | |
valid_headers[key.strip()] = value.strip() | |
except ValueError: | |
panic(f"Invalid header format: '{kv}'. Headers must be in 'key:value' format.") | |
return valid_headers | |
def validate_https_url(url, panic_message): | |
if not urlparse(url).scheme: | |
url = 'https://' + url | |
url = urlparse(url) | |
if url.scheme != 'https': | |
panic(panic_message) | |
return url | |
def main(): | |
args = parse_args() | |
for k, v in args._get_kwargs(): | |
setattr(CONFIG, k, v) | |
args.method = args.method.upper() | |
url = validate_https_url(args.url, 'Only https:// URLs are supported') | |
proxy = validate_https_url(args.proxy, 'Proxy supports only https:// scheme') \ | |
if args.proxy else None | |
headers = validate_headers(args.headers) | |
if args.data: | |
if args.method == 'GET': | |
panic('Payload cant be used with GET method') | |
headers['content-length'] = str(len(args.data)) | |
host = proxy.hostname if proxy else url.hostname | |
port = proxy.port if proxy else url.port or DEFAULT_PORT | |
asyncio.run(send_request( | |
host, port, url, method=args.method, headers=headers, | |
content=args.data, proxy=proxy, proxy_auth=args.proxy_auth | |
)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment