Created
May 11, 2024 20:11
-
-
Save matteobertozzi/31767b48745c9c36675266cbcfb94456 to your computer and use it in GitHub Desktop.
HTTP Long Tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
# Licensed to the Apache Software Foundation (ASF) under one | |
# or more contributor license agreements. See the NOTICE file | |
# distributed with this work for additional information | |
# regarding copyright ownership. The ASF licenses this file | |
# to you under the Apache License, Version 2.0 (the | |
# "License"); you may not use this file except in compliance | |
# with the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# Handling Long Tasks in terms of execution times and large uploads or downloads with HTTP | |
# - If the endpoint execution is fast and there is a single chunk to download, | |
# the behaviour is the same as a normal http req/resp. (/fast/small example) | |
# - If the endpoint execution is fast and there are multiple chunks to download (/fast/large example) | |
# the response will contain an header "X-Task-HasMore-Chunks: True" which means there is more data to fetch. | |
# to fetch the data you need the Task ExecutionId that you can find in the response header "X-Task-ExecutionId". | |
# the endpoint /runtime/download?executionId=ID&chunkId=N will return the data for the specific executionId/chunkId. | |
# - If the endpoint execution is slow, instead of receiving the response back you'll get a "202 Accepted" | |
# with an ExecutionId that will be used to poll the result. | |
# You can use the /runtime/download with chunkId=0 or you can use the /runtime/poll | |
# If the task is not yet comlpeted you'll get back a "202 Accepted" with state WAITING | |
# and once ready you'll get back the response. (example /slow/large) | |
# - If the endpoint takes a large input you can chunk the upload | |
# the first call to the endpoint will specify the header "X-Task-HasMore-Chunks: True" | |
# and it will get back a "202 Accepted" with the ExecutionId. | |
# Using /runtime/upload?executionId=ID&chunkId=N you can upload the chunks and remember the proper "X-Task-HasMore-Chunks". | |
# Once all the chunks are available the server will start the execution, | |
# and you'll receive the direct response if the execution is fast, or a "202 Accepted" if the execution is still in progress. | |
# ------------------------------------------------------------------------------------------------------------------- | |
# Long Tasks Demo Nov 2021 | |
# ------------------------------------------------------------------------------------------------------------------- | |
# Simulate a normal fast call with a single chunk result | |
# curl -v http://localhost:57025/fast/small | |
# Simulate a fast task with a large number of chunks to download | |
# curl -v http://localhost:57025/fast/large | |
# curl -v "http://localhost:57025/runtime/download?executionId=71cd1ca0-7fca-4ab0-95ce-38dc512a58fa&chunkId=1" | |
# Simulate a slow task with a small number or no of chunks to download | |
# curl -v http://localhost:57025/slow/small | |
# curl -v "http://localhost:57025/runtime/poll?executionId=bcee18e7-e600-48a9-9ee1-2b6185e14677" | |
# Simulate a slow task with a large numer of chunks to download | |
# curl -v http://localhost:57025/slow/large | |
# curl -v "http://localhost:57025/runtime/poll?executionId=b1a4feda-e89d-46d1-a070-cb4bd92751fe" | |
# curl -v "http://localhost:57025/runtime/download?executionId=b1a4feda-e89d-46d1-a070-cb4bd92751fe&chunkId=1" | |
# Simulate a fast task with 3 chunks to upload and a small number or no of chunks to download | |
# curl -v http://localhost:57025/fast/small -H "X-Task-HasMore-Chunks: true" -d '[1, 2, 3]' | |
# curl -v "http://localhost:57025/runtime/upload?executionId=9e50ac92-4c19-4173-997a-f716391d074a&chunkId=1" -H "X-Task-HasMore-Chunks: true" -d '[4, 5, 6]' | |
# curl -v "http://localhost:57025/runtime/upload?executionId=9e50ac92-4c19-4173-997a-f716391d074a&chunkId=3" -H "X-Task-HasMore-Chunks: false" -d '[10, 11, 12]' | |
# curl -v "http://localhost:57025/runtime/upload?executionId=9e50ac92-4c19-4173-997a-f716391d074a&chunkId=2" -H "X-Task-HasMore-Chunks: true" -d '[7, 8, 9]' | |
from collections.abc import Iterable | |
from http.server import SimpleHTTPRequestHandler | |
from urllib.parse import parse_qs, urlparse | |
from socketserver import TCPServer | |
from random import randint | |
from uuid import uuid4 | |
from time import time | |
import logging | |
import json | |
class ExecutionData: | |
def __init__(self, path: str) -> None: | |
self.execution_id = str(uuid4()) | |
self.start_time = time() | |
self.update_time = self.start_time | |
self.path = path | |
self.chunks = {} | |
self.last_chunk = None | |
# dummy function result | |
if path.endswith('/large'): | |
self.download_chunks = randint(2, 8) | |
else: | |
self.download_chunks = randint(0, 1) | |
def add_chunk(self, chunk_id: int, chunk: bytes, has_more: bool) -> None: | |
logger = logging.getLogger('state-store') | |
self.update_time = time() | |
logger.info('execId:%s - add chunk chunkId:%d hasMore:%d length:%d', self.execution_id, chunk_id, has_more, len(chunk)) | |
if not has_more: | |
self.last_chunk = chunk_id | |
self.chunks[chunk_id] = True | |
def is_ready(self) -> bool: | |
logger = logging.getLogger('state-store') | |
if self.last_chunk is None: | |
logger.debug('execId:%s - last chunk not yet available', self.execution_id) | |
return False | |
for chunk_id in range(self.last_chunk): | |
if not self.chunks.get(chunk_id): | |
logger.debug('execId:%s - chunk %d not yet available', self.execution_id, chunk_id) | |
return False | |
# dummy function fast/slow simulation | |
if self.path.startswith('/fast/'): | |
return True | |
elapsed = time() - self.update_time | |
logger.debug('execId:%s - the function takes 20sec, %.2felapsed', self.execution_id, elapsed) | |
return elapsed > 20 | |
class TaskRuntime: | |
def __init__(self) -> None: | |
self.state_store = {} | |
def add(self, execution_data: ExecutionData) -> None: | |
self.state_store[execution_data.execution_id] = execution_data | |
def get_state(self, execution_id: str) -> ExecutionData: | |
return self.state_store.get(execution_id) | |
task_runtime = TaskRuntime() | |
class HttpHandler(SimpleHTTPRequestHandler): | |
def __init__(self, *args, **kwargs): | |
self.trace_id = str(uuid4()) | |
super().__init__(*args, **kwargs) | |
def do_GET(self): | |
self._do_exec() | |
def do_POST(self): | |
self._do_exec() | |
def _do_exec(self) -> None: | |
logger = logging.getLogger('http-handler') | |
try: | |
uri = urlparse(self.path) | |
query = parse_qs(uri.query) | |
content_length = int(self.headers.get('Content-Length', 0)) | |
body = self.rfile.read(content_length) | |
logger.debug('received request: %s - query: %s - content-length:%d', uri.path, query, content_length) | |
# dummy endpoints & /runtime endpoinds | |
if uri.path in ('/slow/small', '/slow/large', '/fast/small', '/fast/large'): | |
self.runtime_exec(uri.path, body) | |
elif uri.path == '/runtime/poll': | |
self.runtime_poll(query) | |
elif uri.path == '/runtime/upload': | |
self.runtime_upload(query, body) | |
elif uri.path == '/runtime/download': | |
self.runtime_download(query) | |
else: | |
self.respond_not_found() | |
except Exception as e: | |
logger.error('unhandled exception during execution: %s', e, stack_info=True) | |
self.respond_internal_server_error() | |
# ----------------------------------------------------------------------------------------------- | |
# Runtime Handlers | |
# ----------------------------------------------------------------------------------------------- | |
def runtime_poll(self, query: dict[str, list[str]]) -> None: | |
params = self.verify_query_args_avail(query, ['executionId']) | |
if not params: return | |
execution_data = self._get_ready_execution_data(params['executionId'][0]) | |
if not execution_data: return | |
self._fetch_first_page(execution_data) | |
def runtime_upload(self, query: dict[str, list[str]], body: bytes) -> None: | |
params = self.verify_query_args_avail(query, ('executionId', 'chunkId')) | |
if not params: return | |
executor = self._get_execution_data(params['executionId'][0]) | |
if not executor: | |
return | |
chunk_id = int(params['chunkId'][0]) | |
has_more = self.headers.get('X-Task-HasMore-Chunks', 'False').lower() == 'true' | |
executor.add_chunk(chunk_id, body, has_more) | |
if executor.is_ready(): | |
self._fetch_first_page(executor) | |
else: | |
self.respond_json(202, {'executionId': executor.execution_id, 'state': 'WAITING'}) | |
def runtime_download(self, query: dict[str, list[str]]) -> None: | |
params = self.verify_query_args_avail(query, ('executionId', 'chunkId')) | |
if not params: return | |
executor = self._get_ready_execution_data(params['executionId'][0]) | |
if not executor: | |
return | |
chunk_id = int(params['chunkId'][0]) | |
if chunk_id < executor.download_chunks: | |
has_more = (chunk_id + 1) < executor.download_chunks | |
self.respond_json(200, {'result': list(range(chunk_id * 10, (chunk_id + 1) * 10))}, { | |
'X-Task-HasMore-Chunks': has_more, | |
'X-Task-ExecutionId': executor.execution_id | |
}) | |
else: | |
self.respond_status(404, 'CHUNK_NOT_FOUND', 'chunk not found') | |
def runtime_exec(self, path: str, body: bytes): | |
executor = ExecutionData(path) | |
has_more = self.headers.get('X-Task-HasMore-Chunks', 'False').lower() == 'true' | |
executor.add_chunk(0, body, has_more) | |
task_runtime.add(executor) | |
if executor.is_ready(): | |
self._fetch_first_page(executor) | |
else: | |
self.respond_json(202, {'executionId': executor.execution_id, 'state': 'WAITING'}) | |
# ----------------------------------------------------------------------------------------------- | |
# Runtime Helpers | |
# ----------------------------------------------------------------------------------------------- | |
def _get_execution_data(self, execution_id: str) -> ExecutionData: | |
executor = task_runtime.get_state(execution_id) | |
if not executor: | |
self.respond_status(400, 'INVALID_EXECUTION_ID', 'invalid executionId') | |
return None | |
return executor | |
def _get_ready_execution_data(self, execution_id: str) -> ExecutionData: | |
executor = self._get_execution_data(execution_id) | |
if not executor: | |
return None | |
if not executor.is_ready(): | |
self.respond_json(202, {'executionId': executor.execution_id, 'state': 'WAITING'}) | |
return None | |
return executor | |
def _fetch_first_page(self, execution_data: ExecutionData): | |
if execution_data.download_chunks > 0: | |
has_more = execution_data.download_chunks > 1 | |
self.respond_json(200, {'result': list(range(0, 10))}, { | |
'X-Task-HasMore-Chunks': has_more, | |
'X-Task-ExecutionId': execution_data.execution_id | |
}) | |
else: | |
self.respond_json(204, b'') | |
# ----------------------------------------------------------------------------------------------- | |
# HTTP Helpers | |
# ----------------------------------------------------------------------------------------------- | |
def respond_not_found(self): | |
self.respond_status(404, 'NOT_FOUND', 'page not found') | |
def respond_internal_server_error(self): | |
self.respond_status(500, 'INTERNAL_SERVER_ERROR', 'internal server error') | |
def respond_status(self, http_code: int, status: str, message: str): | |
self.respond_json(http_code, {'status': status, 'message': message, 'traceId': self.trace_id}) | |
def respond_json(self, http_code: int, json_data, headers=None): | |
content = json.dumps(json_data).encode('utf-8') if json_data else b'' | |
self.send_response(http_code) | |
self.send_header('content-type', 'application/json') | |
self.send_header('content-length', len(content)) | |
if headers: | |
for key, value in headers.items(): | |
self.send_header(key, value) | |
self.end_headers() | |
self.wfile.write(content) | |
self.wfile.flush() | |
# ----------------------------------------------------------------------------------------------- | |
# Param Utils | |
# ----------------------------------------------------------------------------------------------- | |
def verify_query_args_avail(self, query: dict[str, list[str]], keys: Iterable[str]): | |
params = {} | |
for k in keys: | |
values = query.get(k) | |
if not values or len(values) == 0: | |
self.respond_status(400, 'VERIFY_ARG', k + ' is missing') | |
return None | |
params[k] = values | |
return params | |
def get_query_param(self, query: dict[str, list[str]], key: str): | |
values = query.get(key) | |
if values and len(values) == 1: | |
return values[0] | |
return None | |
class ReuseAddrTCPServer(TCPServer): | |
allow_reuse_address = True | |
if __name__ == '__main__': | |
log_format = '%(asctime)-15s %(levelname)s %(name)-8s %(filename)s %(funcName)s():%(lineno)s - %(message)s' | |
logging.basicConfig(level=logging.DEBUG, format=log_format) | |
root_logger = logging.getLogger('') | |
PORT = 57025 | |
with ReuseAddrTCPServer(("", PORT), HttpHandler) as httpd: | |
try: | |
print("serving at port", PORT) | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
print("Keyboard interrupt received, exiting.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment