Skip to content

Instantly share code, notes, and snippets.

@samisalkosuo
Created November 6, 2025 12:04
Show Gist options
  • Select an option

  • Save samisalkosuo/5149e6725d6d3138298c33848cf4f855 to your computer and use it in GitHub Desktop.

Select an option

Save samisalkosuo/5149e6725d6d3138298c33848cf4f855 to your computer and use it in GitHub Desktop.
Sample app to send messages to DataStax Astra DB
import os, time, threading, random, sys
from datetime import datetime
from astrapy import DataAPIClient
TOKEN = os.getenv("ASTRA_TOKEN")
ENDPOINT = os.getenv("ASTRA_API_ENDPOINT") # e.g. https://<id>-<region>.apps.astra.datastax.com
KEYSPACE = os.getenv("ASTRA_KEYSPACE", "default_keyspace")
COLL_NAME = os.getenv("ASTRA_COLLECTION", "test_messages")
MPS = int(os.getenv("MESSAGES_PER_SECOND", "1"))
if not TOKEN or not ENDPOINT:
print("Set ASTRA_TOKEN and ASTRA_API_ENDPOINT (and optionally ASTRA_KEYSPACE, ASTRA_COLLECTION, MESSAGES_PER_SECOND).", file=sys.stderr)
sys.exit(1)
print(f"Connecting to Astra DB...",end="", flush=True)
client = DataAPIClient(TOKEN)
database = client.get_database(ENDPOINT)
col = database.create_collection(COLL_NAME)
print(f"done.", flush=True)
def one_second_sender():
while True:
try:
current_timestamp = datetime.now()
current_timestamp = current_timestamp.isoformat()
col.insert_one({
"timestamp": current_timestamp,
"value1": random.random(),
"value2": random.random(),
"value3": random.random()
})
except Exception as e:
print(f"Insert failed: {e}", file=sys.stderr)
time.sleep(1)
def main():
n = max(1, MPS)
for _ in range(n):
threading.Thread(target=one_second_sender, daemon=True).start()
print(f"Sending ~{n} msgs/sec to {ENDPOINT} [{KEYSPACE}.{COLL_NAME}].")
print(f"Ctrl+C to stop.")
try:
while True: time.sleep(3600)
except KeyboardInterrupt:
print("\nStopped.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment