Created
February 24, 2026 14:17
-
-
Save ipeirotis/646b8c10c00503febdfa70cf2a9a020d to your computer and use it in GitHub Desktop.
Download exam transcripts from ElevenLabs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import csv | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import time | |
| from dataclasses import asdict, is_dataclass | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, List, Optional, Tuple | |
| try: | |
| from zoneinfo import ZoneInfo | |
| except ImportError: | |
| ZoneInfo = None # type: ignore | |
| from elevenlabs.client import ElevenLabs # official SDK | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| NY_TZ = "America/New_York" | |
| CUTOFF_LOCAL = "2025-12-11" # (c) do not download before this date (NY local time) | |
| IGNORE_NETID = "kr888" # (d) ignore this net_id (case-insensitive) | |
| def to_dict(obj: Any) -> Dict[str, Any]: | |
| if obj is None: | |
| return {} | |
| if isinstance(obj, dict): | |
| return obj | |
| if hasattr(obj, "model_dump"): | |
| return obj.model_dump() | |
| if hasattr(obj, "dict"): | |
| return obj.dict() | |
| if is_dataclass(obj): | |
| return asdict(obj) | |
| return json.loads(json.dumps(obj, default=str)) | |
| def safe_get_attr(obj: Any, *names: str) -> Any: | |
| for n in names: | |
| if obj is None: | |
| continue | |
| if isinstance(obj, dict) and n in obj: | |
| return obj[n] | |
| if hasattr(obj, n): | |
| return getattr(obj, n) | |
| return None | |
| def sanitize(s: str, max_len: int = 140) -> str: | |
| # (a) ensure no spaces in filename components | |
| s = (s or "").strip().replace(" ", "_") | |
| s = re.sub(r"[^A-Za-z0-9._-]+", "-", s) | |
| s = re.sub(r"-{2,}", "-", s).strip("-._") | |
| return s[:max_len] if len(s) > max_len else s | |
| def resolve_agent_id(client: ElevenLabs, agent_id: Optional[str], agent_name: Optional[str]) -> str: | |
| if agent_id: | |
| return agent_id | |
| if not agent_name: | |
| raise ValueError("Provide --agent-id or --agent-name (or env ORAL_EXAM_AGENT_ID/ORAL_EXAM_AGENT_NAME).") | |
| resp = client.conversational_ai.agents.list() | |
| agents = safe_get_attr(resp, "agents") or safe_get_attr(resp, "data") or [] | |
| agent_name_l = agent_name.strip().lower() | |
| matches = [] | |
| for a in agents: | |
| name = (safe_get_attr(a, "name") or "").strip() | |
| if name.lower() == agent_name_l: | |
| matches.append(a) | |
| if not matches: | |
| for a in agents: | |
| name = (safe_get_attr(a, "name") or "").strip() | |
| if agent_name_l in name.lower(): | |
| matches.append(a) | |
| if not matches: | |
| raise ValueError(f"No agent found matching name '{agent_name}'.") | |
| def created_key(a: Any) -> int: | |
| meta = safe_get_attr(a, "metadata") or {} | |
| return int(safe_get_attr(meta, "created_at_unix_secs", "created_at", "created_at_unix") or 0) | |
| matches.sort(key=created_key, reverse=True) | |
| return str(safe_get_attr(matches[0], "agent_id") or safe_get_attr(matches[0], "id")) | |
| def list_all_conversations(client: ElevenLabs, agent_id: str, page_size: int = 100) -> List[Any]: | |
| conversations: List[Any] = [] | |
| cursor = None | |
| while True: | |
| resp = client.conversational_ai.conversations.list( | |
| agent_id=agent_id, | |
| cursor=cursor, | |
| page_size=min(max(page_size, 1), 100), | |
| ) | |
| batch = safe_get_attr(resp, "conversations") or [] | |
| conversations.extend(batch) | |
| has_more = bool(safe_get_attr(resp, "has_more")) | |
| cursor = safe_get_attr(resp, "next_cursor") | |
| if not has_more or not cursor: | |
| break | |
| time.sleep(0.05) | |
| return conversations | |
| def format_local_date(ts_unix: int, tz_name: str = NY_TZ) -> Tuple[str, str]: | |
| tz = None | |
| if ZoneInfo is not None: | |
| try: | |
| tz = ZoneInfo(tz_name) | |
| except Exception: | |
| tz = None | |
| dt_utc = datetime.fromtimestamp(ts_unix, tz=timezone.utc) | |
| dt_local = dt_utc.astimezone(tz) if tz else dt_utc | |
| return dt_local.strftime("%Y-%m-%d"), dt_local.strftime("%Y-%m-%d_%H%M%S") | |
| def cutoff_unix_seconds(tz_name: str = NY_TZ) -> int: | |
| # cutoff at local midnight of CUTOFF_LOCAL | |
| cutoff_naive = datetime.strptime(CUTOFF_LOCAL, "%Y-%m-%d").replace(hour=0, minute=0, second=0) | |
| if ZoneInfo is not None: | |
| try: | |
| dt_local = cutoff_naive.replace(tzinfo=ZoneInfo(tz_name)) | |
| return int(dt_local.timestamp()) | |
| except Exception: | |
| pass | |
| # fallback: treat as UTC if zoneinfo unavailable | |
| return int(cutoff_naive.replace(tzinfo=timezone.utc).timestamp()) | |
| def transcript_to_text(transcript: List[Dict[str, Any]]) -> str: | |
| lines = [] | |
| for msg in transcript: | |
| role = (msg.get("role") or "unknown").upper() | |
| t = msg.get("time_in_call_secs") | |
| t_part = f"[{int(t)}s] " if isinstance(t, (int, float)) else "" | |
| content = msg.get("message") or msg.get("text") or msg.get("content") or "" | |
| lines.append(f"{t_part}{role}: {content}".rstrip()) | |
| return "\n".join(lines).strip() + "\n" | |
| def main() -> int: | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--out", default="elevenlabs_transcripts", help="Output directory") | |
| ap.add_argument("--agent-id", default=os.getenv("ORAL_EXAM_AGENT_ID")) | |
| ap.add_argument("--agent-name", default=os.getenv("ORAL_EXAM_AGENT_NAME")) | |
| ap.add_argument("--page-size", type=int, default=100) | |
| ap.add_argument("--write-json", action="store_true", help="Also write raw JSON per conversation") | |
| ap.add_argument("--sleep", type=float, default=0.05, help="Sleep seconds between GET calls") | |
| args = ap.parse_args() | |
| api_key = os.getenv("ELEVENLABS_API_KEY") | |
| if not api_key: | |
| print("Missing ELEVENLABS_API_KEY in environment.", file=sys.stderr) | |
| return 2 | |
| client = ElevenLabs(api_key=api_key) | |
| agent_id = resolve_agent_id(client, args.agent_id, args.agent_name) | |
| os.makedirs(args.out, exist_ok=True) | |
| cutoff_ts = cutoff_unix_seconds(NY_TZ) | |
| conv_summaries = list_all_conversations(client, agent_id=agent_id, page_size=args.page_size) | |
| records = [] | |
| for c in conv_summaries: | |
| conversation_id = str(safe_get_attr(c, "conversation_id") or safe_get_attr(c, "id")) | |
| start_time_unix = int(safe_get_attr(c, "start_time_unix_secs") or 0) | |
| # (c) skip early based on summary timestamp when available (saves API calls) | |
| if start_time_unix and start_time_unix < cutoff_ts: | |
| continue | |
| call_duration_secs = int(safe_get_attr(c, "call_duration_secs") or 0) | |
| message_count = int(safe_get_attr(c, "message_count") or 0) | |
| details = client.conversational_ai.conversations.get(conversation_id) | |
| details_d = to_dict(details) | |
| cicd = details_d.get("conversation_initiation_client_data") or {} | |
| dyn = (cicd.get("dynamic_variables") or {}) if isinstance(cicd, dict) else {} | |
| student = str(dyn.get("student") or dyn.get("Student") or "unknown_student") | |
| # Prefer net_id first, then netid variants | |
| net_id = str( | |
| dyn.get("net_id") | |
| or dyn.get("netid") | |
| or dyn.get("NetID") | |
| or dyn.get("netId") | |
| or "unknown_netid" | |
| ) | |
| projectid = str(dyn.get("projectid") or dyn.get("project_id") or dyn.get("ProjectID") or "unknown_projectid") | |
| # (d) ignore conversations by kr888 | |
| if net_id.strip().lower() == IGNORE_NETID: | |
| continue | |
| # If summary lacked start time, fall back to details if present | |
| if not start_time_unix: | |
| start_time_unix = int(details_d.get("start_time_unix_secs") or 0) | |
| # (c) enforce cutoff again after we have details (handles missing/zero summary timestamps) | |
| if start_time_unix and start_time_unix < cutoff_ts: | |
| continue | |
| transcript = details_d.get("transcript") or [] | |
| if not isinstance(transcript, list): | |
| transcript = [] | |
| date_ymd, date_stamp = format_local_date(start_time_unix, NY_TZ) | |
| records.append( | |
| { | |
| "conversation_id": conversation_id, | |
| "start_time_unix_secs": start_time_unix, | |
| "date_ymd": date_ymd, | |
| "date_stamp": date_stamp, | |
| "call_duration_secs": call_duration_secs, | |
| "message_count": message_count or len(transcript), | |
| "student": student, | |
| "netid": net_id, # keep column name stable | |
| "projectid": projectid, | |
| "dynamic_variables": dyn, | |
| "transcript": transcript, | |
| "raw": details_d, | |
| } | |
| ) | |
| time.sleep(max(0.0, args.sleep)) | |
| # Attempt numbers per (student, netid, projectid), ordered by start time. | |
| records.sort(key=lambda r: (r["student"], r["netid"], r["projectid"], r["start_time_unix_secs"])) | |
| attempt_counter: Dict[Tuple[str, str, str], int] = {} | |
| for r in records: | |
| key = (r["student"], r["netid"], r["projectid"]) | |
| attempt_counter[key] = attempt_counter.get(key, 0) + 1 | |
| r["attempt"] = attempt_counter[key] | |
| index_path = os.path.join(args.out, "index.csv") | |
| with open(index_path, "w", newline="", encoding="utf-8") as fcsv: | |
| w = csv.DictWriter( | |
| fcsv, | |
| fieldnames=[ | |
| "student", | |
| "netid", | |
| "projectid", | |
| "attempt", | |
| "date_ymd", | |
| "call_duration_secs", | |
| "message_count", | |
| "conversation_id", | |
| "transcript_file", | |
| "json_file", | |
| ], | |
| ) | |
| w.writeheader() | |
| for r in records: | |
| student_s = sanitize(r["student"]) | |
| netid_s = sanitize(r["netid"]) # (a) no spaces | |
| projectid_s = sanitize(r["projectid"]) | |
| attempt = int(r["attempt"]) | |
| dur = int(r["call_duration_secs"]) | |
| msgs = int(r["message_count"]) | |
| date_stamp = sanitize(r["date_stamp"]) # defensive | |
| # (b) filename starts with net_id, then student, then attempt, then the rest | |
| base = ( | |
| f"{netid_s}__{student_s}__attempt{attempt:02d}" | |
| f"__{date_stamp}__proj{projectid_s}" | |
| f"__dur{dur:04d}s__msgs{msgs:03d}" | |
| f"__{sanitize(r['conversation_id'], 60)}" | |
| ) | |
| txt_name = f"{base}.txt" | |
| txt_path = os.path.join(args.out, txt_name) | |
| txt = transcript_to_text([to_dict(x) for x in r["transcript"]]) | |
| header = ( | |
| f"Conversation ID: {r['conversation_id']}\n" | |
| f"Date (local {NY_TZ}): {r['date_stamp']}\n" | |
| f"Student/NetID/ProjectID: {r['student']} / {r['netid']} / {r['projectid']}\n" | |
| f"Attempt: {attempt}\n" | |
| f"Duration (secs): {dur}\n" | |
| f"Messages: {msgs}\n" | |
| f"{'-'*60}\n" | |
| ) | |
| with open(txt_path, "w", encoding="utf-8") as ftxt: | |
| ftxt.write(header) | |
| ftxt.write(txt) | |
| json_name = "" | |
| if args.write_json: | |
| json_name = f"{base}.json" | |
| json_path = os.path.join(args.out, json_name) | |
| with open(json_path, "w", encoding="utf-8") as fjson: | |
| json.dump(r["raw"], fjson, ensure_ascii=False, indent=2) | |
| w.writerow( | |
| { | |
| "student": r["student"], | |
| "netid": r["netid"], | |
| "projectid": r["projectid"], | |
| "attempt": attempt, | |
| "date_ymd": r["date_ymd"], | |
| "call_duration_secs": dur, | |
| "message_count": msgs, | |
| "conversation_id": r["conversation_id"], | |
| "transcript_file": txt_name, | |
| "json_file": json_name, | |
| } | |
| ) | |
| print(f"Wrote {len(records)} transcripts to: {args.out}") | |
| print(f"Index: {index_path}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment