Skip to content

Instantly share code, notes, and snippets.

@vladimirlagunov
Created March 20, 2025 10:33
Show Gist options
  • Save vladimirlagunov/19cd7c3406cf0f7f1bc7d950bd70372a to your computer and use it in GitHub Desktop.
Save vladimirlagunov/19cd7c3406cf0f7f1bc7d950bd70372a to your computer and use it in GitHub Desktop.
import hashlib
import random
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
class BuggyHandler(BaseHTTPRequestHandler):
rnd_states = []
seed_step = 1024 * 1024
size = 1024 * 1024 * 1024 + random.randint(0, 1024 * 1024)
def do_GET(self):
start = 0
end = self.size
if self.headers.get("Range"):
start, end = self.headers["Range"].split("=")[1].split("-")
start = int(start) if start else 0
end = min(self.size, int(end)) if end else self.size
to_send = max(0, end - start)
self.send_response(200 if end == self.size else 206)
self.send_header("Content-Type", "application/octet-stream")
self.send_header("Content-Length", str(to_send))
self.send_header("Connection", "close")
self.end_headers()
if to_send == 0:
return
# Making the life harder.
time.sleep(random.randint(1, 100) * 0.01)
seeds_idx, skip = divmod(start, self.seed_step)
remains = end - start + skip
while remains:
data_gen = random.Random()
data_gen.setstate(self.rnd_states[seeds_idx])
seeds_idx += 1
chunk = data_gen.randbytes(min(self.seed_step, remains))
remains -= len(chunk)
chunk = chunk[skip:]
skip = 0
# Making the life harder.
oops = random.randint(0, 49) == 0
if oops and len(chunk) > 64 * 1024:
chunk = chunk[:random.randrange(64 * 1024, len(chunk))]
self.wfile.write(chunk)
if oops:
break
def run_server():
host = "127.0.0.1"
port = 8080
print("Length of data:", BuggyHandler.size)
h = hashlib.sha256()
remains = BuggyHandler.size
while remains:
rnd = random.Random()
BuggyHandler.rnd_states.append(rnd.getstate())
chunk = rnd.randbytes(min(BuggyHandler.seed_step, remains))
h.update(chunk)
remains -= len(chunk)
print("SHA-256 hash of the data:", h.hexdigest())
httpd = HTTPServer((host, port), BuggyHandler, bind_and_activate=True)
httpd.allow_reuse_address = True
print(f"Starting HTTP server on port {host}:{port}")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nStopping server...")
httpd.server_close()
if __name__ == "__main__":
run_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment