Skip to content

Instantly share code, notes, and snippets.

@uniphil
Last active February 5, 2025 20:31
Show Gist options
  • Save uniphil/346acb62088022729394d2324bf2ad8a to your computer and use it in GitHub Desktop.
Save uniphil/346acb62088022729394d2324bf2ad8a to your computer and use it in GitHub Desktop.
Jetstream replay -> live-tail cutover data race checks
#!/usr/bin/env python3
# see https://github.com/bluesky-social/jetstream/pull/43
# NOTE: must prevent jetstream rate-limiter from interfering with this test. in server.go, add something like:
#
# var local_sub = 0
#
# and then around line 109
#
# subIP := fmt.Sprintf("local-%v", local_sub) //c.RealIP()
# local_sub += 1
#
# to ensure that enough concurrent connections can be established
import websockets
import asyncio
import json
import time
from collections import Counter
STREAM = 'ws://localhost:6008/subscribe'
CONCURRENT_SUBS = 100
REWIND = 1.5 # seconds in past, enough to fill outbox(?) + get a replay cutover
CATCHUP_THRESH = 0.1 # seconds, assume we've caught up if time_us is this close to now
MAX_EVENTS = 10_000 # just quit if we didn't catch up after this many events
async def check():
cursor_µs = round((time.time() - REWIND) * 1_000_000)
last_event_µs = 0
async with websockets.connect(f'{STREAM}?cursor={cursor_µs}', open_timeout=20) as ws:
started = time.time()
for i in range(MAX_EVENTS):
try:
message = await ws.recv()
except websockets.exceptions.ConnectionClosedError:
return 'different issue (connection-closed #27/#31)'
event = json.loads(message)
this_event_µs = event['time_us']
if this_event_µs <= last_event_µs:
return 'out-of-order'
last_event_µs = this_event_µs
event_dt = time.time() - (this_event_µs / 1_000_000)
if event_dt <= CATCHUP_THRESH:
break
else:
return 'did not catch up'
return 'ok'
async def together(t, n):
tasks = [t() for _ in range(n)]
results = await asyncio.gather(*tasks)
return Counter(results)
if __name__ == '__main__':
import resource
resource.setrlimit(resource.RLIMIT_NOFILE, (100_000, -1))
for attempt in range(10):
t0 = time.time()
result = asyncio.run(together(check, CONCURRENT_SUBS))
print(f'attempt {attempt} result: {result} ({time.time() - t0:.1f}s)')
# sample output before fix:
# attempt 0 result: Counter({'ok': 100}) (11.1s)
# attempt 1 result: Counter({'ok': 86, 'out-of-order': 14}) (11.0s)
# attempt 2 result: Counter({'ok': 96, 'out-of-order': 4}) (11.0s)
# attempt 3 result: Counter({'ok': 96, 'out-of-order': 4}) (11.2s)
# attempt 4 result: Counter({'ok': 96, 'out-of-order': 4}) (11.3s)
# attempt 5 result: Counter({'ok': 97, 'out-of-order': 3}) (11.1s)
# attempt 6 result: Counter({'ok': 100}) (11.1s)
# attempt 7 result: Counter({'ok': 97, 'out-of-order': 3}) (11.0s)
# attempt 8 result: Counter({'ok': 96, 'out-of-order': 4}) (11.1s)
# attempt 9 result: Counter({'ok': 97, 'out-of-order': 3}) (11.0s)
# sample output after adding lock:
# attempt 0 result: Counter({'ok': 100}) (11.1s)
# attempt 1 result: Counter({'ok': 100}) (11.4s)
# attempt 2 result: Counter({'ok': 100}) (11.6s)
# attempt 3 result: Counter({'ok': 100}) (11.5s)
# attempt 4 result: Counter({'ok': 100}) (11.5s)
# attempt 5 result: Counter({'ok': 100}) (11.7s)
# attempt 6 result: Counter({'ok': 100}) (11.3s)
# attempt 7 result: Counter({'ok': 100}) (11.5s)
# attempt 8 result: Counter({'ok': 100}) (11.7s)
# attempt 9 result: Counter({'ok': 100}) (11.6s)
#!/usr/bin/env python3
# see https://github.com/bluesky-social/jetstream/pull/43
import websockets
import asyncio
import json
import time
import gc
STREAM = 'wss://jetstream1.us-east.bsky.network/subscribe'
REWIND = 1.2 # seconds in past, enough to fill outbox(?) + get a replay cutover
UNTIL = 3 # seconds from connect, hopefully long enough to clear the cutover
async def check():
cursor_µs = round((time.time() - REWIND) * 1_000_000)
last_event_µs = 0
async with websockets.connect(f'{STREAM}?cursor={cursor_µs}') as ws:
started = time.time()
while True:
if time.time() > (started + UNTIL):
break
try:
message = await ws.recv()
except websockets.exceptions.ConnectionClosedError:
return 'different issue (connection-closed #27/#31)'
event = json.loads(message)
this_event_µs = event['time_us']
if this_event_µs <= last_event_µs:
return 'out-of-order'
last_event_µs = this_event_µs
await ws.close()
return 'ok'
if __name__ == '__main__':
for attempt in range(10):
t0 = time.time()
result = asyncio.run(check())
print(f'attempt {attempt}: {result} ({time.time() - t0:.1f}s)')
# sample output before fix:
# attempt 0: ok (13.1s)
# attempt 1: out-of-order (10.3s)
# attempt 2: ok (3.2s)
# attempt 3: ok (13.1s)
# attempt 4: out-of-order (10.3s)
# attempt 5: out-of-order (10.3s)
# attempt 6: out-of-order (10.3s)
# attempt 7: ok (13.1s)
# attempt 8: ok (13.1s)
# attempt 9: ok (13.1s)
# attempt 10: out-of-order (10.2s)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment