Last active
September 22, 2025 09:33
-
-
Save Konfekt/485da5aecbe47b0e76711fe2cfe8b6a5 to your computer and use it in GitHub Desktop.
stand-alone Python script to transcribe audio using OpenAI's 4o or whisper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
CLI transcription tool using OpenAI Audio Transcriptions REST API. | |
Requires environment variable OPENAI_API_KEY to be set. | |
- Accepts multiple inputs: files, directories, or glob patterns. | |
- Writes outputs next to inputs with .txt extension by default. | |
- If -o/--output is supplied: | |
* For multiple inputs: treated as an output directory. | |
* For a single input: treated as a file path if it has a suffix; otherwise as a directory. | |
- Skips existing outputs unless --overwrite is set. | |
- Converts from inefficient or unsupported formats (WAV/FLAC/OGG/OPUS/WEBM, raw AAC, containers with video, etc.) | |
to compact, API-friendly AAC/m4a. | |
- Configurable bitrate, sample rate, and channels for transcoding. | |
- Uses model 'gpt-4o-mini-transcribe' by default. | |
""" | |
# Section: standard library imports and typing | |
import argparse | |
import glob | |
import io | |
import json | |
import mimetypes | |
import os | |
import re | |
import socket | |
import sys | |
import time | |
import uuid | |
import shutil | |
import subprocess | |
import tempfile | |
from pathlib import Path | |
from typing import Any, Callable | |
import urllib.request | |
import urllib.error | |
# Section: argument parsing | |
def parse_args() -> argparse.Namespace: | |
""" | |
Parse command-line arguments. | |
""" | |
p = argparse.ArgumentParser( | |
description="Transcribe audio files to text using the OpenAI REST API." | |
) | |
p.add_argument( | |
"inputs", | |
nargs="+", | |
help="Audio files, directories, or glob patterns (e.g., 'in/*.mp3').", | |
) | |
p.add_argument( | |
"-o", | |
"--output", | |
type=Path, | |
help="Output directory for multiple inputs, or output file path if a single input is provided.", | |
) | |
p.add_argument( | |
"--overwrite", | |
action="store_true", | |
help="Overwrite existing outputs.", | |
) | |
p.add_argument( | |
"--model", | |
default="gpt-4o-mini-transcribe", | |
help="OpenAI model to use (default: gpt-4o-mini-transcribe). Consider 'whisper-1' for backwards compatibility.", | |
) | |
p.add_argument( | |
"--sleep", | |
type=float, | |
default=0.0, | |
help="Seconds to sleep between API calls (useful for rate limiting).", | |
) | |
p.add_argument( | |
"--extensions", | |
default="mp3,mp4,mpeg,mpga,m4a,wav,webm,ogg,opus,oga,flac,aac,aiff,aif,mka,mkv,caf,pcm", | |
help="Comma-separated list of audio file extensions to include.", | |
) | |
p.add_argument( | |
"--response-format", | |
choices=("text", "srt", "vtt", "json", "verbose_json"), | |
default="text", | |
help="Transcription response format (default: text).", | |
) | |
p.add_argument( | |
"--language", | |
default=None, | |
help="Optional language code (e.g., 'en'). If omitted, auto-detection may occur.", | |
) | |
p.add_argument( | |
"--prompt", | |
default=None, | |
help="Optional prompt to guide the transcription.", | |
) | |
p.add_argument( | |
"--temperature", | |
type=float, | |
default=0.0, | |
help="Sampling temperature for transcription (0.0-1.0).", | |
) | |
p.add_argument( | |
"--max-retries", | |
type=int, | |
default=3, | |
help="Maximum number of retries on transient errors (default: 3).", | |
) | |
p.add_argument( | |
"--retry-backoff", | |
type=float, | |
default=2.0, | |
help="Exponential backoff multiplier between retries (default: 2.0).", | |
) | |
p.add_argument( | |
"--timeout", | |
type=float, | |
default=300.0, | |
help="Client request timeout in seconds (default: 300).", | |
) | |
# Transcoding controls | |
p.add_argument( | |
"--transcode", | |
choices=("auto", "always", "never"), | |
default="auto", | |
help="Transcode inputs to compact AAC/m4a with ffmpeg. " | |
"'auto' converts from inefficient/unsupported formats (default).", | |
) | |
p.add_argument( | |
"--ffmpeg-binary", | |
default=os.getenv("FFMPEG_BIN", "ffmpeg"), | |
help="Path or name of ffmpeg binary (default: ffmpeg).", | |
) | |
p.add_argument( | |
"--ffprobe-binary", | |
default=os.getenv("FFPROBE_BIN", "ffprobe"), | |
help="Path or name of ffprobe binary (default: ffprobe).", | |
) | |
p.add_argument( | |
"--aac-bitrate", | |
default="48k", | |
help="Target AAC audio bitrate for transcoding (e.g., '32k','48k','64k'; default: 48k).", | |
) | |
p.add_argument( | |
"--sample-rate", | |
type=int, | |
default=16000, | |
help="Target sample rate in Hz for transcoding (default: 16000).", | |
) | |
p.add_argument( | |
"--channels", | |
type=int, | |
default=1, | |
help="Target number of audio channels for transcoding (1=mono, 2=stereo; default: 1).", | |
) | |
p.add_argument( | |
"--keep-transcoded", | |
action="store_true", | |
help="Keep transcoded .m4a files for reuse; otherwise use a temp file and delete.", | |
) | |
p.add_argument( | |
"--transcoded-dir", | |
type=Path, | |
default=None, | |
help="Directory to place transcoded files when --keep-transcoded is set. " | |
"If omitted, write next to inputs.", | |
) | |
p.add_argument( | |
"--segment-seconds", | |
type=int, | |
default=600, | |
help="Split audio longer than this many seconds into segments (0 disables splitting, default: 600 to satisfy limits of 2048 output tokens at faster speaker speed, input of 25 Megabyte size and 1500 seconds length).", | |
) | |
p.add_argument( | |
"--keep-segments", | |
action="store_true", | |
help="Keep generated audio segments on disk; otherwise store in a temporary folder and delete after use.", | |
) | |
p.add_argument( | |
"--segments-dir", | |
type=Path, | |
default=None, | |
help="Directory to place audio segments when --keep-segments is set. If omitted, write next to inputs.", | |
) | |
return p.parse_args() | |
# Section: input expansion and path handling | |
def parse_extensions(exts_str: str) -> set[str]: | |
""" | |
Normalize a comma-separated list of extensions to a set of lowercase suffixes with leading dots. | |
""" | |
return { | |
"." + e.strip().lower().lstrip(".") | |
for e in exts_str.split(",") | |
if e.strip() | |
} | |
def expand_inputs(inputs: list[str], exts: set[str]) -> list[Path]: | |
""" | |
Expand input tokens into concrete audio files. | |
Supports: | |
- Paths to files | |
- Paths to directories (searched recursively) | |
- Glob patterns (e.g., *.mp3) | |
""" | |
files: list[Path] = [] | |
seen: set[Path] = set() | |
def add_file(p: Path): | |
rp = p.resolve() | |
if rp not in seen and rp.suffix.lower() in exts: | |
files.append(rp) | |
seen.add(rp) | |
for token in inputs: | |
paths = [Path(token)] | |
if any(ch in token for ch in "*?[]"): | |
paths = [Path(p) for p in glob.glob(token, recursive=True)] | |
if not paths: | |
print(f"Warning: pattern matched nothing: {token}", file=sys.stderr) | |
continue | |
for p in paths: | |
if p.is_dir(): | |
for f in p.rglob("*"): | |
if f.is_file(): | |
add_file(f) | |
elif p.is_file(): | |
add_file(p) | |
else: | |
print(f"Warning: path not found: {p}", file=sys.stderr) | |
return files | |
def output_suffix_for_format(fmt: str) -> str: | |
""" | |
Return a file extension (including leading dot) appropriate for the given response format. | |
""" | |
if fmt == "text": | |
return ".txt" | |
if fmt in {"json", "verbose_json"}: | |
return ".json" | |
if fmt == "srt": | |
return ".srt" | |
if fmt == "vtt": | |
return ".vtt" | |
return ".txt" | |
def resolve_output_path(in_path: Path, out_arg: Path | None, single: bool, out_suffix: str) -> Path: | |
""" | |
Compute output path for a given input path, based on -o/--output semantics. | |
""" | |
if out_arg is None: | |
return in_path.with_suffix(out_suffix) | |
if single: | |
if out_arg.exists() and out_arg.is_dir(): | |
out_arg.mkdir(parents=True, exist_ok=True) | |
return out_arg / f"{in_path.stem}{out_suffix}" | |
treat_as_dir = (out_arg.suffix == "") or str(out_arg).endswith(("/", "\\")) | |
if treat_as_dir: | |
out_dir = Path(str(out_arg).rstrip("/\\")) | |
out_dir.mkdir(parents=True, exist_ok=True) | |
return out_dir / f"{in_path.stem}{out_suffix}" | |
out_arg.parent.mkdir(parents=True, exist_ok=True) | |
return out_arg | |
if out_arg.exists() and out_arg.is_file(): | |
raise ValueError(f"--output points to a file but multiple inputs were provided: {out_arg}") | |
out_dir = out_arg | |
out_dir.mkdir(parents=True, exist_ok=True) | |
return out_dir / f"{in_path.stem}{out_suffix}" | |
# Section: HTTP helpers and error classification | |
class HTTPStatusError(Exception): | |
""" | |
Rich HTTP error with status, headers, and body snippet. | |
""" | |
def __init__(self, status: int, reason: str, body: bytes, headers: dict[str, str] | None = None, url: str | None = None): | |
self.status = status | |
self.reason = reason | |
self.body = body or b"" | |
# Ensure case-insensitive access by normalizing keys to lowercase | |
self.headers = {k.lower(): v for k, v in (headers or {}).items()} | |
self.url = url | |
snippet = self.body[:1000].decode("utf-8", "replace") | |
super().__init__(f"HTTP {status} {reason}: {snippet}") | |
def is_transient_error(e: Exception) -> bool: | |
""" | |
Determine whether an error is likely transient and worth retrying. | |
""" | |
if isinstance(e, HTTPStatusError): | |
if e.status in {408, 409, 425, 429, 500, 502, 503, 504}: | |
return True | |
if "retry-after" in e.headers: | |
return True | |
return False | |
if isinstance(e, urllib.error.HTTPError): | |
code = getattr(e, "code", None) | |
return code in {408, 409, 425, 429, 500, 502, 503, 504} | |
if isinstance(e, urllib.error.URLError): | |
return True | |
if isinstance(e, (socket.timeout, TimeoutError, ConnectionError, ConnectionResetError)): | |
return True | |
msg = str(e).lower() | |
transient_markers = ( | |
"timeout", | |
"temporarily", | |
"try again", | |
"rate limit", | |
"rate-limit", | |
"connection reset", | |
"connection aborted", | |
"server error", | |
"bad gateway", | |
"service unavailable", | |
"gateway timeout", | |
) | |
return any(m in msg for m in transient_markers) | |
def get_retry_after_seconds(headers: dict[str, str]) -> float | None: | |
""" | |
Parse Retry-After header if present (numeric seconds only). | |
""" | |
for key in ("retry-after", "Retry-After"): | |
if key in headers: | |
val = headers[key].strip() | |
try: | |
return float(val) | |
except Exception: | |
return None | |
return None | |
def guess_mime_type(path: Path) -> str: | |
""" | |
Guess MIME type from file suffix; default to application/octet-stream. | |
""" | |
mtype, _ = mimetypes.guess_type(str(path)) | |
return mtype or "application/octet-stream" | |
def build_multipart_form(fields: dict[str, str | float | int | None], file_field_name: str, file_path: Path) -> tuple[str, bytes]: | |
""" | |
Build multipart/form-data body for one binary file + multiple text fields. | |
Returns (content_type_header_value, body_bytes). | |
Note: buffers entire body in memory. | |
""" | |
boundary = "----PythonMultipartBoundary" + uuid.uuid4().hex | |
crlf = b"\r\n" | |
buf = io.BytesIO() | |
def write_text_field(name: str, value: str): | |
buf.write(b"--" + boundary.encode("ascii") + crlf) | |
disp = f'Content-Disposition: form-data; name="{name}"'.encode("utf-8") | |
buf.write(disp + crlf) | |
buf.write(crlf) | |
buf.write(value.encode("utf-8")) | |
buf.write(crlf) | |
def write_file_field(name: str, filename: str, content_type: str, data: bytes): | |
buf.write(b"--" + boundary.encode("ascii") + crlf) | |
disp = f'Content-Disposition: form-data; name="{name}"; filename="{filename}"'.encode("utf-8") | |
buf.write(disp + crlf) | |
ctype = f"Content-Type: {content_type}".encode("utf-8") | |
buf.write(ctype + crlf) | |
buf.write(crlf) | |
buf.write(data) | |
buf.write(crlf) | |
# Write text fields | |
for k, v in fields.items(): | |
if v is None: | |
continue | |
write_text_field(k, str(v)) | |
# Write file field | |
file_bytes = file_path.read_bytes() | |
filename = file_path.name | |
ctype = guess_mime_type(file_path) | |
write_file_field(file_field_name, filename, ctype, file_bytes) | |
# Closing boundary | |
buf.write(b"--" + boundary.encode("ascii") + b"--" + crlf) | |
content_type = f"multipart/form-data; boundary={boundary}" | |
body = buf.getvalue() | |
return content_type, body | |
# Section: REST call and response formatting | |
def request_transcription( | |
in_path: Path, | |
*, | |
model: str, | |
response_format: str, | |
language: str | None, | |
prompt: str | None, | |
temperature: float, | |
timeout: float, | |
) -> tuple[bytes, dict[str, str]]: | |
""" | |
Perform a single REST call to the OpenAI Audio Transcriptions endpoint and return (body_bytes, response_headers). | |
""" | |
api_key = os.getenv("OPENAI_API_KEY") | |
if not api_key: | |
raise RuntimeError("OPENAI_API_KEY is not set in the environment.") | |
base_url = ( | |
os.getenv("OPENAI_BASE_URL") | |
or os.getenv("OPENAI_API_BASE") | |
or "https://api.openai.com/v1" | |
).rstrip("/") | |
url = f"{base_url}/audio/transcriptions" | |
fields: dict[str, str | float | int | None] = { | |
"model": model, | |
"response_format": response_format, | |
"temperature": temperature, | |
"language": language, | |
"prompt": prompt, | |
} | |
content_type, body = build_multipart_form(fields, "file", in_path) | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": content_type, | |
"Accept": "application/json" if response_format in {"json", "verbose_json"} else "*/*", | |
"User-Agent": "transcribe.py/1.1 (+https://platform.openai.com/) urllib", | |
} | |
org = os.getenv("OPENAI_ORGANIZATION") or os.getenv("OPENAI_ORG_ID") | |
if org: | |
headers["OpenAI-Organization"] = org | |
if os.getenv("OPENAI_ORG_ID"): | |
headers["OpenAI-Org-Id"] = os.getenv("OPENAI_ORG_ID") # type: ignore | |
project = os.getenv("OPENAI_PROJECT") | |
if project: | |
headers["OpenAI-Project"] = project | |
req = urllib.request.Request(url, data=body, method="POST", headers=headers) | |
try: | |
with urllib.request.urlopen(req, timeout=timeout) as resp: | |
resp_body = resp.read() | |
resp_headers = {k.lower(): v for k, v in resp.headers.items()} | |
return resp_body, resp_headers | |
except urllib.error.HTTPError as he: | |
try: | |
err_body = he.read() | |
except Exception: | |
err_body = b"" | |
hdrs = {} | |
try: | |
hdrs = {k.lower(): v for k, v in (he.headers or {}).items()} | |
except Exception: | |
hdrs = {} | |
raise HTTPStatusError(getattr(he, "code", 0) or 0, getattr(he, "reason", "") or "", err_body, hdrs, url=str(he.geturl() or url)) | |
except Exception: | |
raise | |
def format_transcription_response(body: bytes, response_format: str) -> str: | |
""" | |
Convert HTTP response bytes to a string according to response_format. | |
""" | |
if response_format in {"json", "verbose_json"}: | |
data = json.loads(body.decode("utf-8")) | |
return json.dumps(data, ensure_ascii=False, indent=2) | |
return body.decode("utf-8") | |
# Section: ffmpeg helpers | |
ACCEPTED_API_EXTS = {".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm"} | |
def _ensure_dir(p: Path) -> Path: | |
""" | |
Ensure directory exists and return it. | |
""" | |
p.mkdir(parents=True, exist_ok=True) | |
return p | |
def _resolve_executable(bin_arg: str) -> str: | |
""" | |
Resolve a binary name or absolute/relative path to an executable string. | |
""" | |
# Direct path given | |
if os.path.sep in bin_arg or (os.path.altsep and os.path.altsep in bin_arg): | |
if os.path.exists(bin_arg): | |
return bin_arg | |
raise FileNotFoundError(f"Executable not found: {bin_arg}") | |
# Search PATH | |
found = shutil.which(bin_arg) | |
if not found: | |
raise FileNotFoundError(f"Executable not found in PATH: {bin_arg}") | |
return found | |
def _run_subprocess(cmd: list[str]) -> subprocess.CompletedProcess: | |
""" | |
Run a subprocess, capturing output; raise with full stderr on failure. | |
""" | |
return subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
check=True, | |
text=True, | |
) | |
def _parse_ffmpeg_time_to_seconds(s: str) -> float: | |
# Parse "HH:MM:SS.microseconds" into seconds. | |
try: | |
hh, mm, ss = s.split(":") | |
return int(hh) * 3600 + int(mm) * 60 + float(ss) | |
except Exception: | |
return 0.0 | |
def _format_hms(seconds: float) -> str: | |
# Format seconds into "HH:MM:SS". | |
try: | |
t = max(0.0, float(seconds)) | |
except Exception: | |
t = 0.0 | |
h = int(t // 3600) | |
m = int((t % 3600) // 60) | |
s = int(t % 60) | |
return f"{h:02d}:{m:02d}:{s:02d}" | |
def _extract_input_path_from_ffmpeg_cmd(cmd: list[str]) -> str | None: | |
# Extract the input path passed to "-i" in the ffmpeg command. | |
try: | |
i = cmd.index("-i") | |
if i + 1 < len(cmd): | |
return cmd[i + 1] | |
except ValueError: | |
pass | |
return None | |
def _probe_duration_seconds(input_path: str) -> float | None: | |
# Probe input duration using ffprobe; return None if unavailable. | |
# Tries $FFPROBE_BIN or "ffprobe" on PATH; never raises. | |
try: | |
ffprobe_bin = os.getenv("FFPROBE_BIN", "ffprobe") | |
try: | |
ffprobe_bin = _resolve_executable(ffprobe_bin) | |
except Exception: | |
# Fall back to plain name; may still work if in PATH. | |
ffprobe_bin = ffprobe_bin | |
cmd = [ | |
ffprobe_bin, | |
"-v", "error", | |
"-show_entries", "format=duration", | |
"-of", "default=noprint_wrappers=1:nokey=1", | |
input_path, | |
] | |
res = _run_subprocess(cmd) | |
raw = (res.stdout or "").strip() | |
if not raw or raw.upper() == "N/A": | |
return None | |
return float(raw) | |
except Exception: | |
return None | |
def _run_ffmpeg_with_progress(cmd: list[str]) -> None: | |
# Render a single progress bar/percentage based on out_time vs total duration. | |
# Falls back to elapsed-only display if duration is unknown. | |
# Determine total duration before starting. | |
input_path = _extract_input_path_from_ffmpeg_cmd(cmd) | |
total_duration = _probe_duration_seconds(input_path) if input_path else None | |
# Add -progress to cmd before output path. | |
progress_cmd = cmd[:-1] + ["-progress", "pipe:1", "-nostats"] + cmd[-1:] | |
bar_width = 30 | |
last_percent = -1.0 | |
last_drawn = "" | |
speed_x: float | None = None | |
elapsed: float = 0.0 | |
with subprocess.Popen( | |
progress_cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
bufsize=1, # line-buffered for timely updates | |
) as p: | |
try: | |
for line in p.stdout: | |
line = line.strip() | |
if not line or "=" not in line: | |
continue | |
k, v = line.split("=", 1) | |
if k == "speed": | |
# Parse "70.1x" or "1.0x". | |
try: | |
vv = v.strip() | |
speed_x = float(vv[:-1]) if vv.endswith("x") else float(vv) | |
except Exception: | |
speed_x = None | |
continue | |
if k in ("out_time", "out_time_ms", "out_time_us", "out_time_ns"): | |
if k == "out_time": | |
elapsed = _parse_ffmpeg_time_to_seconds(v) | |
elif k == "out_time_ms": | |
# Note: ffmpeg historically used microseconds for *_ms; handle both safely. | |
try: | |
val = float(v) | |
# Heuristic: treat large values as microseconds. | |
elapsed = val / 1_000.0 if val < 1e10 else val / 1_000_000.0 | |
except Exception: | |
elapsed = 0.0 | |
elif k == "out_time_us": | |
try: | |
elapsed = float(v) / 1_000_000.0 | |
except Exception: | |
elapsed = 0.0 | |
elif k == "out_time_ns": | |
try: | |
elapsed = float(v) / 1_000_000_000.0 | |
except Exception: | |
elapsed = 0.0 | |
# Draw single-line progress. | |
if total_duration and total_duration > 0: | |
ratio = max(0.0, min(elapsed / total_duration, 1.0)) | |
percent = ratio * 100.0 | |
# Avoid overly chatty redraws. | |
if int(percent) != int(last_percent) or abs(percent - last_percent) >= 0.25: | |
filled = int(round(ratio * bar_width)) | |
bar = "#" * filled + "-" * (bar_width - filled) | |
elapsed_str = _format_hms(elapsed) | |
total_str = _format_hms(total_duration) | |
sp = f" {speed_x:.1f}x" if (speed_x and speed_x > 0) else "" | |
msg = f"\r[ffmpeg] [{bar}] {percent:5.1f}% ({elapsed_str}/{total_str}){sp}" | |
if msg != last_drawn: | |
print(msg, end="", flush=True) | |
last_drawn = msg | |
last_percent = percent | |
else: | |
# Unknown total; show elapsed only. | |
elapsed_str = _format_hms(elapsed) | |
sp = f" {speed_x:.1f}x" if (speed_x and speed_x > 0) else "" | |
msg = f"\r[ffmpeg] elapsed {elapsed_str}{sp}" | |
if msg != last_drawn: | |
print(msg, end="", flush=True) | |
last_drawn = msg | |
continue | |
if k == "progress" and v == "end": | |
# Force 100% bar if total known. | |
if total_duration and total_duration > 0: | |
bar = "#" * bar_width | |
total_str = _format_hms(total_duration) | |
sp = f" {speed_x:.1f}x" if (speed_x and speed_x > 0) else "" | |
print(f"\r[ffmpeg] [{bar}] 100.0% ({total_str}/{total_str}){sp}", end="", flush=True) | |
print() # newline after progress | |
ret = p.wait() | |
if ret != 0: | |
err = p.stderr.read() if p.stderr else "" | |
raise RuntimeError(f"ffmpeg exited with {ret}: {err}") | |
finally: | |
if p and p.poll() is None: | |
p.kill() | |
def ffprobe_audio_streams(path: Path, ffprobe_bin: str) -> dict[str, Any] | None: | |
""" | |
Probe audio streams via ffprobe; return dict with key info or None on failure. | |
""" | |
try: | |
cmd = [ | |
ffprobe_bin, | |
"-v", "error", | |
"-print_format", "json", | |
"-show_format", | |
"-show_streams", | |
"-select_streams", "a", | |
str(path), | |
] | |
res = _run_subprocess(cmd) | |
data = json.loads(res.stdout) | |
streams = data.get("streams") or [] | |
fmt = data.get("format") or {} | |
info: dict[str, Any] = {} | |
if streams: | |
s0 = streams[0] or {} | |
info["codec_name"] = s0.get("codec_name") | |
info["sample_rate"] = int(s0.get("sample_rate") or 0) if s0.get("sample_rate") else None | |
info["channels"] = int(s0.get("channels") or 0) if s0.get("channels") else None | |
info["bit_rate"] = int(s0.get("bit_rate") or 0) if s0.get("bit_rate") else None | |
info["format_name"] = fmt.get("format_name") | |
# Count video streams if present | |
v_count = 0 | |
for st in data.get("streams", []): | |
if st.get("codec_type") == "video": | |
v_count += 1 | |
info["video_streams"] = v_count | |
return info | |
except Exception: | |
return None | |
def should_transcode( | |
in_path: Path, | |
*, | |
mode: str, # "auto" | "always" | "never" | |
ffprobe_bin: str | None, | |
target_sr: int, | |
target_ch: int, | |
) -> tuple[bool, str]: | |
""" | |
Decide whether to transcode to m4a. | |
Returns (should_transcode, reason). | |
""" | |
if mode == "always": | |
return True, "mode=always" | |
if mode == "never": | |
return False, "mode=never" | |
ext = in_path.suffix.lower() | |
# Strong candidates to transcode | |
heavy_or_unlisted = { | |
".wav", ".flac", ".aiff", ".aif", ".ogg", ".opus", ".oga", ".mka", ".mkv", ".caf", ".pcm", ".aac" | |
} | |
if ext in heavy_or_unlisted: | |
return True, f"ext={ext}" | |
# For mp4/webm, drop video if present | |
probe = None | |
if ffprobe_bin: | |
probe = ffprobe_audio_streams(in_path, ffprobe_bin) | |
if ext in {".mp4", ".webm"} and probe and probe.get("video_streams", 0) > 0: | |
return True, "has_video" | |
# For m4a/mp4/mp3/webm/mpga/ mpeg: consider downmix/downsample if large | |
if probe: | |
ch = probe.get("channels") | |
sr = probe.get("sample_rate") | |
if ch and ch > target_ch: | |
return True, f"channels={ch}>target" | |
if sr and sr > target_sr * 2: # downsample from very high SRs | |
return True, f"sample_rate={sr}>>{target_sr}" | |
# For other accepted extensions, keep as is | |
return False, "already efficient/accepted" | |
def compute_transcoded_path( | |
in_path: Path, | |
keep: bool, | |
dest_dir: Path | None, | |
) -> Path: | |
""" | |
Compute output path for the transcoded .m4a file. | |
""" | |
if keep: | |
if dest_dir: | |
out_dir = dest_dir | |
else: | |
out_dir = in_path.parent | |
out_dir.mkdir(parents=True, exist_ok=True) | |
# Avoid clobbering a real .m4a source; use .transcoded.m4a suffix | |
base = f"{in_path.stem}.transcoded.m4a" | |
return out_dir / base | |
# Temp file otherwise | |
tmp_dir = Path(tempfile.mkdtemp(prefix="transcribe_ffmpeg_")) | |
return tmp_dir / (in_path.stem + ".m4a") | |
def transcode_to_m4a( | |
in_path: Path, | |
out_path: Path, | |
*, | |
ffmpeg_bin: str, | |
aac_bitrate: str, | |
sample_rate: int, | |
channels: int, | |
) -> None: | |
""" | |
Transcode input to AAC/m4a suitable for STT. | |
""" | |
# Ensure parent exists | |
out_path.parent.mkdir(parents=True, exist_ok=True) | |
# Build ffmpeg command: | |
# - Drop video, map default audio, mono, target SR, AAC-LC, specified bitrate, faststart for MP4 container. | |
cmd = [ | |
ffmpeg_bin, | |
"-hide_banner", | |
"-loglevel", "error", | |
"-nostdin", | |
"-y", | |
"-i", str(in_path), | |
"-vn", | |
"-ac", str(channels), | |
"-ar", str(sample_rate), | |
"-c:a", "aac", | |
"-b:a", aac_bitrate, | |
"-movflags", "+faststart", | |
str(out_path), | |
] | |
try: | |
ext_name = in_path.suffix.lstrip(".").upper() or "AUDIO" | |
print( | |
f"[ffmpeg] Converting {in_path} ({ext_name}) -> {out_path.name} " | |
f"[AAC {aac_bitrate}, {sample_rate} Hz, {channels} ch] ...", | |
flush=True, | |
) | |
_run_ffmpeg_with_progress(cmd) | |
print(f"[ffmpeg] Conversion complete: {out_path}", flush=True) | |
except subprocess.CalledProcessError as cpe: | |
msg = cpe.stderr.strip() if cpe.stderr else "ffmpeg failed without stderr." | |
raise RuntimeError(f"ffmpeg transcoding failed for {in_path}: {msg}") from cpe | |
def split_audio_if_needed( | |
in_path: Path, | |
*, | |
segment_seconds: int, | |
ffmpeg_binary: str, | |
keep_segments: bool, | |
segments_dir: Path | None, | |
) -> tuple[list[Path], Callable[[], None], str]: | |
""" | |
Split audio into fixed-length segments if duration exceeds segment_seconds. | |
Returns (segment_paths, cleanup_callback, note). | |
""" | |
if segment_seconds <= 0: | |
return [in_path], (lambda: None), "no-split (disabled)" | |
total_duration = _probe_duration_seconds(str(in_path)) | |
if total_duration is None or total_duration <= segment_seconds: | |
return [in_path], (lambda: None), "no-split (short)" | |
ffmpeg_bin = _resolve_executable(ffmpeg_binary) | |
# Choose destination directory for segments. | |
if keep_segments: | |
out_dir = segments_dir if segments_dir else in_path.parent | |
_ensure_dir(out_dir) | |
cleanup = (lambda: None) | |
else: | |
out_dir = Path(tempfile.mkdtemp(prefix="transcribe_segments_")) | |
def cleanup(): | |
try: | |
for f in out_dir.glob("*"): | |
f.unlink(missing_ok=True) | |
out_dir.rmdir() | |
except Exception: | |
pass | |
ext = in_path.suffix if in_path.suffix else ".m4a" | |
pattern = out_dir / f"{in_path.stem}.seg%04d{ext}" | |
# Build ffmpeg command to segment without re-encoding and reset timestamps. | |
cmd = [ | |
ffmpeg_bin, | |
"-hide_banner", | |
"-loglevel", "error", | |
"-nostdin", | |
"-y", | |
"-i", str(in_path), | |
"-vn", | |
"-c:a", "copy", | |
"-f", "segment", | |
"-segment_time", str(segment_seconds), | |
"-reset_timestamps", "1", | |
str(pattern), | |
] | |
print(f"[ffmpeg] Splitting {in_path.name} into ~{segment_seconds}s segments ...", flush=True) | |
_run_ffmpeg_with_progress(cmd) | |
# Collect produced segments in order. | |
glob_pat = str(pattern).replace("%04d", "*") | |
segs = sorted(Path(p) for p in glob.glob(glob_pat)) | |
if not segs: | |
raise RuntimeError(f"No segments produced by ffmpeg for {in_path}") | |
print(f"[ffmpeg] Produced {len(segs)} segments in {out_dir}", flush=True) | |
return segs, cleanup, f"split-into-{len(segs)}" | |
def _parse_srt_time(s: str) -> float: | |
""" | |
Parse SRT timestamp 'HH:MM:SS,mmm' to seconds. | |
""" | |
m = re.match(r"^\s*(\d{2}):(\d{2}):(\d{2}),(\d{3})\s*$", s) | |
if not m: | |
return 0.0 | |
hh, mm, ss, ms = map(int, m.groups()) | |
return hh * 3600 + mm * 60 + ss + ms / 1000.0 | |
def _format_srt_time(t: float) -> str: | |
""" | |
Format seconds to SRT timestamp 'HH:MM:SS,mmm'. | |
""" | |
t = max(0.0, float(t)) | |
hh = int(t // 3600) | |
mm = int((t % 3600) // 60) | |
ss = int(t % 60) | |
ms = int(round((t - int(t)) * 1000.0)) | |
if ms >= 1000: | |
ss += 1 | |
ms -= 1000 | |
if ss >= 60: | |
mm += 1 | |
ss -= 60 | |
if mm >= 60: | |
hh += 1 | |
mm -= 60 | |
return f"{hh:02d}:{mm:02d}:{ss:02d},{ms:03d}" | |
def _parse_vtt_time(s: str) -> float: | |
""" | |
Parse VTT timestamp 'HH:MM:SS.mmm' or 'MM:SS.mmm' to seconds. | |
""" | |
s = s.strip() | |
parts = s.split(":") | |
try: | |
if len(parts) == 3: | |
hh = int(parts[0]); mm = int(parts[1]); ss = float(parts[2]) | |
return hh * 3600 + mm * 60 + ss | |
if len(parts) == 2: | |
mm = int(parts[0]); ss = float(parts[1]) | |
return mm * 60 + ss | |
except Exception: | |
pass | |
return 0.0 | |
def _format_vtt_time(t: float) -> str: | |
""" | |
Format seconds to VTT timestamp 'HH:MM:SS.mmm'. | |
""" | |
t = max(0.0, float(t)) | |
hh = int(t // 3600) | |
mm = int((t % 3600) // 60) | |
ss = int(t % 60) | |
ms = int(round((t - int(t)) * 1000.0)) | |
if ms >= 1000: | |
ss += 1 | |
ms -= 1000 | |
if ss >= 60: | |
mm += 1 | |
ss -= 60 | |
if mm >= 60: | |
hh += 1 | |
mm -= 60 | |
return f"{hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}" | |
def _probe_durations(paths: list[Path]) -> list[float]: | |
""" | |
Probe durations for a list of paths; if probing fails return 0.0. | |
""" | |
durs: list[float] = [] | |
for p in paths: | |
d = _probe_duration_seconds(str(p)) | |
durs.append(float(d) if d is not None else 0.0) | |
return durs | |
def merge_segment_transcripts( | |
parts: list[str], | |
*, | |
response_format: str, | |
segment_paths: list[Path], | |
) -> str: | |
""" | |
Merge segment-level transcripts into a single transcript. | |
Handles text, srt, vtt. For json/verbose_json, concatenates text field when possible. | |
""" | |
rf = response_format | |
if len(parts) == 1: | |
return parts[0] | |
# Compute offsets by accumulating actual segment durations if available. | |
durs = _probe_durations(segment_paths) | |
offsets: list[float] = [] | |
acc = 0.0 | |
for d in durs: | |
offsets.append(acc) | |
acc += d if d > 0 else 0.0 | |
if rf == "text": | |
blocks = [p.strip() for p in parts] | |
return ("\n\n".join(b for b in blocks if b)).rstrip() + "\n" | |
if rf == "srt": | |
out_lines: list[str] = [] | |
idx = 1 | |
for seg_idx, text in enumerate(parts): | |
off = offsets[seg_idx] if seg_idx < len(offsets) else seg_idx * 600.0 | |
# Split into blocks separated by blank lines. | |
blocks = re.split(r"\r?\n\r?\n", text.strip(), flags=re.MULTILINE) | |
for blk in blocks: | |
lines = [ln for ln in blk.splitlines() if ln.strip() != ""] | |
if not lines: | |
continue | |
# Find the timing line. Often the pattern is: | |
# number | |
# HH:MM:SS,mmm --> HH:MM:SS,mmm | |
# text... | |
# Allow absence of numeric index. | |
time_line_idx = None | |
for i, ln in enumerate(lines[:2]): # usually within first two lines | |
if "-->" in ln: | |
time_line_idx = i | |
break | |
if time_line_idx is None and len(lines) >= 1 and "-->" in lines[0]: | |
time_line_idx = 0 | |
if time_line_idx is None: | |
# Cannot parse; append as-is with a new index. | |
out_lines.append(str(idx)) | |
out_lines.extend(lines) | |
out_lines.append("") # blank after cue | |
idx += 1 | |
continue | |
time_line = lines[time_line_idx] | |
m = re.match(r"^\s*(.*?)\s*-->\s*(.*?)\s*$", time_line) | |
if not m: | |
out_lines.append(str(idx)) | |
out_lines.extend(lines) | |
out_lines.append("") | |
idx += 1 | |
continue | |
start_s = _parse_srt_time(m.group(1)) | |
end_s = _parse_srt_time(m.group(2)) | |
start_s += off | |
end_s += off | |
# Build output cue. | |
out_lines.append(str(idx)) | |
out_lines.append(f"{_format_srt_time(start_s)} --> {_format_srt_time(end_s)}") | |
# Remaining content lines excluding index and original time line. | |
payload = [ln for i, ln in enumerate(lines) if i != time_line_idx and not ln.strip().isdigit()] | |
out_lines.extend(payload) | |
out_lines.append("") | |
idx += 1 | |
return "\n".join(out_lines).rstrip() + "\n" | |
if rf == "vtt": | |
# Keep one header, shift times, concatenate cues. | |
out: list[str] = ["WEBVTT", ""] | |
first = True | |
for seg_idx, text in enumerate(parts): | |
off = offsets[seg_idx] if seg_idx < len(offsets) else seg_idx * 600.0 | |
lines = text.splitlines() | |
i = 0 | |
# Skip header lines in subsequent segments. | |
if first: | |
# Consume potential 'WEBVTT' header and metadata, but we already placed our own header. | |
if lines and lines[0].strip().upper().startswith("WEBVTT"): | |
i = 1 | |
# Skip optional blank line after header. | |
if i < len(lines) and lines[i].strip() == "": | |
i += 1 | |
else: | |
# Drop any header-like lines. | |
while i < len(lines) and (lines[i].strip() == "" or lines[i].strip().upper().startswith("WEBVTT")): | |
i += 1 | |
first = False | |
# Process cues. | |
while i < len(lines): | |
ln = lines[i] | |
if "-->" in ln: | |
m = re.match(r"^\s*(.*?)\s*-->\s*(.*?)(\s+.*)?$", ln) | |
if m: | |
st = _parse_vtt_time(m.group(1)) + off | |
et = _parse_vtt_time(m.group(2)) + off | |
tail = m.group(3) or "" | |
out.append(f"{_format_vtt_time(st)} --> {_format_vtt_time(et)}{tail}") | |
i += 1 | |
# Copy payload lines until blank line. | |
while i < len(lines) and lines[i].strip() != "": | |
out.append(lines[i]) | |
i += 1 | |
out.append("") | |
# Skip the blank line separator. | |
while i < len(lines) and lines[i].strip() == "": | |
i += 1 | |
continue | |
# Pass through non-cue lines (e.g., NOTE) unchanged. | |
out.append(ln) | |
i += 1 | |
return "\n".join(out).rstrip() + "\n" | |
if rf in {"json", "verbose_json"}: | |
# Best-effort merge: concatenate top-level 'text' if present. | |
texts: list[str] = [] | |
segments_agg: list[dict[str, object]] = [] | |
for seg_idx, s in enumerate(parts): | |
try: | |
obj = json.loads(s) | |
except Exception: | |
continue | |
t = str(obj.get("text", "")).strip() if isinstance(obj, dict) else "" | |
if t: | |
texts.append(t) | |
# For verbose_json, merge segments with offset applied if present. | |
if rf == "verbose_json" and isinstance(obj, dict) and isinstance(obj.get("segments"), list): | |
off = offsets[seg_idx] if seg_idx < len(offsets) else seg_idx * 600.0 | |
for seg in obj["segments"]: | |
try: | |
seg2 = dict(seg) | |
if "start" in seg2: | |
seg2["start"] = float(seg2["start"]) + off | |
if "end" in seg2: | |
seg2["end"] = float(seg2["end"]) + off | |
segments_agg.append(seg2) | |
except Exception: | |
segments_agg.append(seg) | |
out_obj: dict[str, object] = {"text": (" ".join(texts).strip() if texts else "")} | |
if rf == "verbose_json": | |
out_obj["segments"] = segments_agg | |
return json.dumps(out_obj, ensure_ascii=False, indent=2) | |
# Fallback: concatenation. | |
return ("\n\n".join(p.strip() for p in parts if p.strip())).rstrip() + "\n" | |
def prepare_audio_for_upload( | |
in_path: Path, | |
*, | |
transcode_mode: str, | |
ffmpeg_binary: str, | |
ffprobe_binary: str, | |
aac_bitrate: str, | |
sample_rate: int, | |
channels: int, | |
keep_transcoded: bool, | |
transcoded_dir: Path | None, | |
) -> tuple[Path, Callable[[], None], str]: | |
""" | |
Prepare audio for upload. | |
Return (path_to_upload, cleanup_callback, note). | |
""" | |
# Resolve available binaries only if needed | |
probe_bin: str | None = None | |
if transcode_mode != "never": | |
try: | |
probe_bin = _resolve_executable(ffprobe_binary) | |
except Exception: | |
probe_bin = None # Optional in auto mode | |
do_transcode, reason = should_transcode( | |
in_path, | |
mode=transcode_mode, | |
ffprobe_bin=probe_bin, | |
target_sr=sample_rate, | |
target_ch=channels, | |
) | |
if not do_transcode: | |
return in_path, (lambda: None), f"no-transcode ({reason})" | |
# Require ffmpeg when transcoding | |
ffmpeg_bin = _resolve_executable(ffmpeg_binary) | |
out_path = compute_transcoded_path( | |
in_path, | |
keep=keep_transcoded, | |
dest_dir=transcoded_dir, | |
) | |
# Reuse if up-to-date | |
if keep_transcoded and out_path.exists(): | |
try: | |
if out_path.stat().st_mtime >= in_path.stat().st_mtime: | |
print( | |
f"[ffmpeg] Reusing existing transcoded file: {out_path}", | |
flush=True, | |
) | |
return out_path, (lambda: None), f"reuse-transcoded ({reason})" | |
except Exception: | |
pass | |
transcode_to_m4a( | |
in_path, | |
out_path, | |
ffmpeg_bin=ffmpeg_bin, | |
aac_bitrate=aac_bitrate, | |
sample_rate=sample_rate, | |
channels=channels, | |
) | |
def cleanup(): | |
# Remove temp folder if created under a temp dir; if kept, do nothing | |
if not keep_transcoded: | |
try: | |
# Remove file and parent temp dir | |
if out_path.exists(): | |
out_path.unlink(missing_ok=True) | |
parent = out_path.parent | |
# Remove temp dir if empty | |
try: | |
parent.rmdir() | |
except Exception: | |
pass | |
except Exception: | |
pass | |
return out_path, cleanup, f"transcoded ({reason})" | |
# Section: retry wrapper | |
def transcribe_with_retries( | |
upload_path: Path, | |
*, | |
model: str, | |
response_format: str, | |
language: str | None, | |
prompt: str | None, | |
temperature: float, | |
max_retries: int, | |
retry_backoff: float, | |
timeout: float, | |
) -> str: | |
""" | |
Transcribe a single (possibly transcoded) file with retry/backoff on transient errors. | |
""" | |
attempt = 0 | |
delay = 1.0 | |
while True: | |
attempt += 1 | |
try: | |
body, resp_headers = request_transcription( | |
upload_path, | |
model=model, | |
response_format=response_format, | |
language=language, | |
prompt=prompt, | |
temperature=temperature, | |
timeout=timeout, | |
) | |
return format_transcription_response(body, response_format) | |
except Exception as e: | |
is_transient = is_transient_error(e) | |
if attempt > max_retries or not is_transient: | |
raise | |
retry_after = None | |
if isinstance(e, HTTPStatusError): | |
retry_after = get_retry_after_seconds(e.headers) | |
wait = max(delay, retry_after or 0.0) | |
print( | |
f"Transient error on {upload_path.name} (attempt {attempt}/{max_retries}): {e}; retrying in {wait:.1f}s", | |
file=sys.stderr, | |
) | |
time.sleep(wait) | |
delay *= max(1.0, retry_backoff) | |
# Section: main entry | |
def main() -> int: | |
""" | |
Main entry point. | |
""" | |
args = parse_args() | |
exts = parse_extensions(args.extensions) | |
if sys.version_info < (3, 10): | |
print("Warning: Python 3.10+ is recommended for this script.", file=sys.stderr) | |
if not os.getenv("OPENAI_API_KEY"): | |
print("Error: OPENAI_API_KEY is not set in the environment.", file=sys.stderr) | |
return 2 | |
if not (0.0 <= args.temperature <= 1.0): | |
print(f"Warning: --temperature {args.temperature} is outside [0.0, 1.0]; clamping.", file=sys.stderr) | |
args.temperature = max(0.0, min(1.0, args.temperature)) | |
if args.model == "whisper-1": | |
print("Note: 'whisper-1' may be slower and costlier than newer STT models. Consider 'gpt-4o-mini-transcribe'.", file=sys.stderr) | |
files = expand_inputs(args.inputs, exts) | |
if not files: | |
print("No input audio files found.", file=sys.stderr) | |
return 1 | |
out_suffix = output_suffix_for_format(args.response_format) | |
total = len(files) | |
for idx, in_path in enumerate(files, start=1): | |
cleanup_fn: Callable[[], None] = lambda: None | |
seg_cleanup_fn: Callable[[], None] = lambda: None | |
try: | |
out_path = resolve_output_path(in_path, args.output, single=(total == 1), out_suffix=out_suffix) | |
if out_path.exists() and not args.overwrite: | |
print(f"Skipping (exists): {out_path}") | |
continue | |
# Prepare audio: transcode if needed | |
upload_path, cleanup_fn, transcode_note = prepare_audio_for_upload( | |
in_path, | |
transcode_mode=args.transcode, | |
ffmpeg_binary=args.ffmpeg_binary, | |
ffprobe_binary=args.ffprobe_binary, | |
aac_bitrate=args.aac_bitrate, | |
sample_rate=args.sample_rate, | |
channels=args.channels, | |
keep_transcoded=args.keep_transcoded, | |
transcoded_dir=args.transcoded_dir, | |
) | |
# Split long audio if needed (after transcoding decision for stability). | |
segments, seg_cleanup_fn, split_note = split_audio_if_needed( | |
upload_path, | |
segment_seconds=args.segment_seconds, | |
ffmpeg_binary=args.ffmpeg_binary, | |
keep_segments=args.keep_segments, | |
segments_dir=args.segments_dir, | |
) | |
print(f"[{idx}/{total}] Transcribing ({transcode_note}; {split_note}): {in_path} -> {out_path}") | |
if len(segments) == 1: | |
transcript_str = transcribe_with_retries( | |
segments[0], | |
model=args.model, | |
response_format=args.response_format, | |
language=args.language, | |
prompt=args.prompt, | |
temperature=args.temperature, | |
max_retries=args.max_retries, | |
retry_backoff=args.retry_backoff, | |
timeout=args.timeout, | |
) | |
else: | |
# Transcribe each segment and merge. | |
part_texts: list[str] = [] | |
for j, seg_path in enumerate(segments, start=1): | |
print(f" - Segment {j}/{len(segments)}: {seg_path.name}") | |
part = transcribe_with_retries( | |
seg_path, | |
model=args.model, | |
response_format=args.response_format, | |
language=args.language, | |
prompt=args.prompt, | |
temperature=args.temperature, | |
max_retries=args.max_retries, | |
retry_backoff=args.retry_backoff, | |
timeout=args.timeout, | |
) | |
part_texts.append(part) | |
if args.sleep: | |
time.sleep(args.sleep) | |
transcript_str = merge_segment_transcripts( | |
part_texts, | |
response_format=args.response_format, | |
segment_paths=segments, | |
) | |
out_path.parent.mkdir(parents=True, exist_ok=True) | |
tmp_path = out_path.with_suffix(out_path.suffix + ".tmp") | |
to_write = transcript_str if args.response_format != "text" else transcript_str.rstrip() + "\n" | |
tmp_path.write_text(to_write, encoding="utf-8") | |
os.replace(tmp_path, out_path) | |
print(f"Saved: {out_path}") | |
if args.sleep: | |
time.sleep(args.sleep) | |
except FileNotFoundError as ex: | |
print(f"Error: {ex}", file=sys.stderr) | |
except Exception as e: | |
print(f"Error transcribing {in_path}: {e}", file=sys.stderr) | |
finally: | |
try: | |
cleanup_fn() | |
except Exception: | |
pass | |
try: | |
seg_cleanup_fn() | |
except Exception: | |
pass | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment