Last active
February 5, 2025 20:31
-
-
Save uniphil/346acb62088022729394d2324bf2ad8a to your computer and use it in GitHub Desktop.
Jetstream replay -> live-tail cutover data race checks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# see https://github.com/bluesky-social/jetstream/pull/43 | |
# NOTE: must prevent jetstream rate-limiter from interfering with this test. in server.go, add something like: | |
# | |
# var local_sub = 0 | |
# | |
# and then around line 109 | |
# | |
# subIP := fmt.Sprintf("local-%v", local_sub) //c.RealIP() | |
# local_sub += 1 | |
# | |
# to ensure that enough concurrent connections can be established | |
import websockets | |
import asyncio | |
import json | |
import time | |
from collections import Counter | |
STREAM = 'ws://localhost:6008/subscribe' | |
CONCURRENT_SUBS = 100 | |
REWIND = 1.5 # seconds in past, enough to fill outbox(?) + get a replay cutover | |
CATCHUP_THRESH = 0.1 # seconds, assume we've caught up if time_us is this close to now | |
MAX_EVENTS = 10_000 # just quit if we didn't catch up after this many events | |
async def check(): | |
cursor_µs = round((time.time() - REWIND) * 1_000_000) | |
last_event_µs = 0 | |
async with websockets.connect(f'{STREAM}?cursor={cursor_µs}', open_timeout=20) as ws: | |
started = time.time() | |
for i in range(MAX_EVENTS): | |
try: | |
message = await ws.recv() | |
except websockets.exceptions.ConnectionClosedError: | |
return 'different issue (connection-closed #27/#31)' | |
event = json.loads(message) | |
this_event_µs = event['time_us'] | |
if this_event_µs <= last_event_µs: | |
return 'out-of-order' | |
last_event_µs = this_event_µs | |
event_dt = time.time() - (this_event_µs / 1_000_000) | |
if event_dt <= CATCHUP_THRESH: | |
break | |
else: | |
return 'did not catch up' | |
return 'ok' | |
async def together(t, n): | |
tasks = [t() for _ in range(n)] | |
results = await asyncio.gather(*tasks) | |
return Counter(results) | |
if __name__ == '__main__': | |
import resource | |
resource.setrlimit(resource.RLIMIT_NOFILE, (100_000, -1)) | |
for attempt in range(10): | |
t0 = time.time() | |
result = asyncio.run(together(check, CONCURRENT_SUBS)) | |
print(f'attempt {attempt} result: {result} ({time.time() - t0:.1f}s)') | |
# sample output before fix: | |
# attempt 0 result: Counter({'ok': 100}) (11.1s) | |
# attempt 1 result: Counter({'ok': 86, 'out-of-order': 14}) (11.0s) | |
# attempt 2 result: Counter({'ok': 96, 'out-of-order': 4}) (11.0s) | |
# attempt 3 result: Counter({'ok': 96, 'out-of-order': 4}) (11.2s) | |
# attempt 4 result: Counter({'ok': 96, 'out-of-order': 4}) (11.3s) | |
# attempt 5 result: Counter({'ok': 97, 'out-of-order': 3}) (11.1s) | |
# attempt 6 result: Counter({'ok': 100}) (11.1s) | |
# attempt 7 result: Counter({'ok': 97, 'out-of-order': 3}) (11.0s) | |
# attempt 8 result: Counter({'ok': 96, 'out-of-order': 4}) (11.1s) | |
# attempt 9 result: Counter({'ok': 97, 'out-of-order': 3}) (11.0s) | |
# sample output after adding lock: | |
# attempt 0 result: Counter({'ok': 100}) (11.1s) | |
# attempt 1 result: Counter({'ok': 100}) (11.4s) | |
# attempt 2 result: Counter({'ok': 100}) (11.6s) | |
# attempt 3 result: Counter({'ok': 100}) (11.5s) | |
# attempt 4 result: Counter({'ok': 100}) (11.5s) | |
# attempt 5 result: Counter({'ok': 100}) (11.7s) | |
# attempt 6 result: Counter({'ok': 100}) (11.3s) | |
# attempt 7 result: Counter({'ok': 100}) (11.5s) | |
# attempt 8 result: Counter({'ok': 100}) (11.7s) | |
# attempt 9 result: Counter({'ok': 100}) (11.6s) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# see https://github.com/bluesky-social/jetstream/pull/43 | |
import websockets | |
import asyncio | |
import json | |
import time | |
import gc | |
STREAM = 'wss://jetstream1.us-east.bsky.network/subscribe' | |
REWIND = 1.2 # seconds in past, enough to fill outbox(?) + get a replay cutover | |
UNTIL = 3 # seconds from connect, hopefully long enough to clear the cutover | |
async def check(): | |
cursor_µs = round((time.time() - REWIND) * 1_000_000) | |
last_event_µs = 0 | |
async with websockets.connect(f'{STREAM}?cursor={cursor_µs}') as ws: | |
started = time.time() | |
while True: | |
if time.time() > (started + UNTIL): | |
break | |
try: | |
message = await ws.recv() | |
except websockets.exceptions.ConnectionClosedError: | |
return 'different issue (connection-closed #27/#31)' | |
event = json.loads(message) | |
this_event_µs = event['time_us'] | |
if this_event_µs <= last_event_µs: | |
return 'out-of-order' | |
last_event_µs = this_event_µs | |
await ws.close() | |
return 'ok' | |
if __name__ == '__main__': | |
for attempt in range(10): | |
t0 = time.time() | |
result = asyncio.run(check()) | |
print(f'attempt {attempt}: {result} ({time.time() - t0:.1f}s)') | |
# sample output before fix: | |
# attempt 0: ok (13.1s) | |
# attempt 1: out-of-order (10.3s) | |
# attempt 2: ok (3.2s) | |
# attempt 3: ok (13.1s) | |
# attempt 4: out-of-order (10.3s) | |
# attempt 5: out-of-order (10.3s) | |
# attempt 6: out-of-order (10.3s) | |
# attempt 7: ok (13.1s) | |
# attempt 8: ok (13.1s) | |
# attempt 9: ok (13.1s) | |
# attempt 10: out-of-order (10.2s) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment