Skip to content

Instantly share code, notes, and snippets.

@ipeirotis
Created February 24, 2026 14:17
Show Gist options
  • Select an option

  • Save ipeirotis/646b8c10c00503febdfa70cf2a9a020d to your computer and use it in GitHub Desktop.

Select an option

Save ipeirotis/646b8c10c00503febdfa70cf2a9a020d to your computer and use it in GitHub Desktop.
Download exam transcripts from ElevenLabs
import argparse
import csv
import json
import os
import re
import sys
import time
from dataclasses import asdict, is_dataclass
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
try:
from zoneinfo import ZoneInfo
except ImportError:
ZoneInfo = None # type: ignore
from elevenlabs.client import ElevenLabs # official SDK
from dotenv import load_dotenv
load_dotenv()
NY_TZ = "America/New_York"
CUTOFF_LOCAL = "2025-12-11" # (c) do not download before this date (NY local time)
IGNORE_NETID = "kr888" # (d) ignore this net_id (case-insensitive)
def to_dict(obj: Any) -> Dict[str, Any]:
if obj is None:
return {}
if isinstance(obj, dict):
return obj
if hasattr(obj, "model_dump"):
return obj.model_dump()
if hasattr(obj, "dict"):
return obj.dict()
if is_dataclass(obj):
return asdict(obj)
return json.loads(json.dumps(obj, default=str))
def safe_get_attr(obj: Any, *names: str) -> Any:
for n in names:
if obj is None:
continue
if isinstance(obj, dict) and n in obj:
return obj[n]
if hasattr(obj, n):
return getattr(obj, n)
return None
def sanitize(s: str, max_len: int = 140) -> str:
# (a) ensure no spaces in filename components
s = (s or "").strip().replace(" ", "_")
s = re.sub(r"[^A-Za-z0-9._-]+", "-", s)
s = re.sub(r"-{2,}", "-", s).strip("-._")
return s[:max_len] if len(s) > max_len else s
def resolve_agent_id(client: ElevenLabs, agent_id: Optional[str], agent_name: Optional[str]) -> str:
if agent_id:
return agent_id
if not agent_name:
raise ValueError("Provide --agent-id or --agent-name (or env ORAL_EXAM_AGENT_ID/ORAL_EXAM_AGENT_NAME).")
resp = client.conversational_ai.agents.list()
agents = safe_get_attr(resp, "agents") or safe_get_attr(resp, "data") or []
agent_name_l = agent_name.strip().lower()
matches = []
for a in agents:
name = (safe_get_attr(a, "name") or "").strip()
if name.lower() == agent_name_l:
matches.append(a)
if not matches:
for a in agents:
name = (safe_get_attr(a, "name") or "").strip()
if agent_name_l in name.lower():
matches.append(a)
if not matches:
raise ValueError(f"No agent found matching name '{agent_name}'.")
def created_key(a: Any) -> int:
meta = safe_get_attr(a, "metadata") or {}
return int(safe_get_attr(meta, "created_at_unix_secs", "created_at", "created_at_unix") or 0)
matches.sort(key=created_key, reverse=True)
return str(safe_get_attr(matches[0], "agent_id") or safe_get_attr(matches[0], "id"))
def list_all_conversations(client: ElevenLabs, agent_id: str, page_size: int = 100) -> List[Any]:
conversations: List[Any] = []
cursor = None
while True:
resp = client.conversational_ai.conversations.list(
agent_id=agent_id,
cursor=cursor,
page_size=min(max(page_size, 1), 100),
)
batch = safe_get_attr(resp, "conversations") or []
conversations.extend(batch)
has_more = bool(safe_get_attr(resp, "has_more"))
cursor = safe_get_attr(resp, "next_cursor")
if not has_more or not cursor:
break
time.sleep(0.05)
return conversations
def format_local_date(ts_unix: int, tz_name: str = NY_TZ) -> Tuple[str, str]:
tz = None
if ZoneInfo is not None:
try:
tz = ZoneInfo(tz_name)
except Exception:
tz = None
dt_utc = datetime.fromtimestamp(ts_unix, tz=timezone.utc)
dt_local = dt_utc.astimezone(tz) if tz else dt_utc
return dt_local.strftime("%Y-%m-%d"), dt_local.strftime("%Y-%m-%d_%H%M%S")
def cutoff_unix_seconds(tz_name: str = NY_TZ) -> int:
# cutoff at local midnight of CUTOFF_LOCAL
cutoff_naive = datetime.strptime(CUTOFF_LOCAL, "%Y-%m-%d").replace(hour=0, minute=0, second=0)
if ZoneInfo is not None:
try:
dt_local = cutoff_naive.replace(tzinfo=ZoneInfo(tz_name))
return int(dt_local.timestamp())
except Exception:
pass
# fallback: treat as UTC if zoneinfo unavailable
return int(cutoff_naive.replace(tzinfo=timezone.utc).timestamp())
def transcript_to_text(transcript: List[Dict[str, Any]]) -> str:
lines = []
for msg in transcript:
role = (msg.get("role") or "unknown").upper()
t = msg.get("time_in_call_secs")
t_part = f"[{int(t)}s] " if isinstance(t, (int, float)) else ""
content = msg.get("message") or msg.get("text") or msg.get("content") or ""
lines.append(f"{t_part}{role}: {content}".rstrip())
return "\n".join(lines).strip() + "\n"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--out", default="elevenlabs_transcripts", help="Output directory")
ap.add_argument("--agent-id", default=os.getenv("ORAL_EXAM_AGENT_ID"))
ap.add_argument("--agent-name", default=os.getenv("ORAL_EXAM_AGENT_NAME"))
ap.add_argument("--page-size", type=int, default=100)
ap.add_argument("--write-json", action="store_true", help="Also write raw JSON per conversation")
ap.add_argument("--sleep", type=float, default=0.05, help="Sleep seconds between GET calls")
args = ap.parse_args()
api_key = os.getenv("ELEVENLABS_API_KEY")
if not api_key:
print("Missing ELEVENLABS_API_KEY in environment.", file=sys.stderr)
return 2
client = ElevenLabs(api_key=api_key)
agent_id = resolve_agent_id(client, args.agent_id, args.agent_name)
os.makedirs(args.out, exist_ok=True)
cutoff_ts = cutoff_unix_seconds(NY_TZ)
conv_summaries = list_all_conversations(client, agent_id=agent_id, page_size=args.page_size)
records = []
for c in conv_summaries:
conversation_id = str(safe_get_attr(c, "conversation_id") or safe_get_attr(c, "id"))
start_time_unix = int(safe_get_attr(c, "start_time_unix_secs") or 0)
# (c) skip early based on summary timestamp when available (saves API calls)
if start_time_unix and start_time_unix < cutoff_ts:
continue
call_duration_secs = int(safe_get_attr(c, "call_duration_secs") or 0)
message_count = int(safe_get_attr(c, "message_count") or 0)
details = client.conversational_ai.conversations.get(conversation_id)
details_d = to_dict(details)
cicd = details_d.get("conversation_initiation_client_data") or {}
dyn = (cicd.get("dynamic_variables") or {}) if isinstance(cicd, dict) else {}
student = str(dyn.get("student") or dyn.get("Student") or "unknown_student")
# Prefer net_id first, then netid variants
net_id = str(
dyn.get("net_id")
or dyn.get("netid")
or dyn.get("NetID")
or dyn.get("netId")
or "unknown_netid"
)
projectid = str(dyn.get("projectid") or dyn.get("project_id") or dyn.get("ProjectID") or "unknown_projectid")
# (d) ignore conversations by kr888
if net_id.strip().lower() == IGNORE_NETID:
continue
# If summary lacked start time, fall back to details if present
if not start_time_unix:
start_time_unix = int(details_d.get("start_time_unix_secs") or 0)
# (c) enforce cutoff again after we have details (handles missing/zero summary timestamps)
if start_time_unix and start_time_unix < cutoff_ts:
continue
transcript = details_d.get("transcript") or []
if not isinstance(transcript, list):
transcript = []
date_ymd, date_stamp = format_local_date(start_time_unix, NY_TZ)
records.append(
{
"conversation_id": conversation_id,
"start_time_unix_secs": start_time_unix,
"date_ymd": date_ymd,
"date_stamp": date_stamp,
"call_duration_secs": call_duration_secs,
"message_count": message_count or len(transcript),
"student": student,
"netid": net_id, # keep column name stable
"projectid": projectid,
"dynamic_variables": dyn,
"transcript": transcript,
"raw": details_d,
}
)
time.sleep(max(0.0, args.sleep))
# Attempt numbers per (student, netid, projectid), ordered by start time.
records.sort(key=lambda r: (r["student"], r["netid"], r["projectid"], r["start_time_unix_secs"]))
attempt_counter: Dict[Tuple[str, str, str], int] = {}
for r in records:
key = (r["student"], r["netid"], r["projectid"])
attempt_counter[key] = attempt_counter.get(key, 0) + 1
r["attempt"] = attempt_counter[key]
index_path = os.path.join(args.out, "index.csv")
with open(index_path, "w", newline="", encoding="utf-8") as fcsv:
w = csv.DictWriter(
fcsv,
fieldnames=[
"student",
"netid",
"projectid",
"attempt",
"date_ymd",
"call_duration_secs",
"message_count",
"conversation_id",
"transcript_file",
"json_file",
],
)
w.writeheader()
for r in records:
student_s = sanitize(r["student"])
netid_s = sanitize(r["netid"]) # (a) no spaces
projectid_s = sanitize(r["projectid"])
attempt = int(r["attempt"])
dur = int(r["call_duration_secs"])
msgs = int(r["message_count"])
date_stamp = sanitize(r["date_stamp"]) # defensive
# (b) filename starts with net_id, then student, then attempt, then the rest
base = (
f"{netid_s}__{student_s}__attempt{attempt:02d}"
f"__{date_stamp}__proj{projectid_s}"
f"__dur{dur:04d}s__msgs{msgs:03d}"
f"__{sanitize(r['conversation_id'], 60)}"
)
txt_name = f"{base}.txt"
txt_path = os.path.join(args.out, txt_name)
txt = transcript_to_text([to_dict(x) for x in r["transcript"]])
header = (
f"Conversation ID: {r['conversation_id']}\n"
f"Date (local {NY_TZ}): {r['date_stamp']}\n"
f"Student/NetID/ProjectID: {r['student']} / {r['netid']} / {r['projectid']}\n"
f"Attempt: {attempt}\n"
f"Duration (secs): {dur}\n"
f"Messages: {msgs}\n"
f"{'-'*60}\n"
)
with open(txt_path, "w", encoding="utf-8") as ftxt:
ftxt.write(header)
ftxt.write(txt)
json_name = ""
if args.write_json:
json_name = f"{base}.json"
json_path = os.path.join(args.out, json_name)
with open(json_path, "w", encoding="utf-8") as fjson:
json.dump(r["raw"], fjson, ensure_ascii=False, indent=2)
w.writerow(
{
"student": r["student"],
"netid": r["netid"],
"projectid": r["projectid"],
"attempt": attempt,
"date_ymd": r["date_ymd"],
"call_duration_secs": dur,
"message_count": msgs,
"conversation_id": r["conversation_id"],
"transcript_file": txt_name,
"json_file": json_name,
}
)
print(f"Wrote {len(records)} transcripts to: {args.out}")
print(f"Index: {index_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment