Skip to content

Instantly share code, notes, and snippets.

@Konfekt
Last active September 22, 2025 09:33
Show Gist options
  • Save Konfekt/485da5aecbe47b0e76711fe2cfe8b6a5 to your computer and use it in GitHub Desktop.
Save Konfekt/485da5aecbe47b0e76711fe2cfe8b6a5 to your computer and use it in GitHub Desktop.
stand-alone Python script to transcribe audio using OpenAI's 4o or whisper
#!/usr/bin/env python3
"""
CLI transcription tool using OpenAI Audio Transcriptions REST API.
Requires environment variable OPENAI_API_KEY to be set.
- Accepts multiple inputs: files, directories, or glob patterns.
- Writes outputs next to inputs with .txt extension by default.
- If -o/--output is supplied:
* For multiple inputs: treated as an output directory.
* For a single input: treated as a file path if it has a suffix; otherwise as a directory.
- Skips existing outputs unless --overwrite is set.
- Converts from inefficient or unsupported formats (WAV/FLAC/OGG/OPUS/WEBM, raw AAC, containers with video, etc.)
to compact, API-friendly AAC/m4a.
- Configurable bitrate, sample rate, and channels for transcoding.
- Uses model 'gpt-4o-mini-transcribe' by default.
"""
# Section: standard library imports and typing
import argparse
import glob
import io
import json
import mimetypes
import os
import re
import socket
import sys
import time
import uuid
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Any, Callable
import urllib.request
import urllib.error
# Section: argument parsing
def parse_args() -> argparse.Namespace:
"""
Parse command-line arguments.
"""
p = argparse.ArgumentParser(
description="Transcribe audio files to text using the OpenAI REST API."
)
p.add_argument(
"inputs",
nargs="+",
help="Audio files, directories, or glob patterns (e.g., 'in/*.mp3').",
)
p.add_argument(
"-o",
"--output",
type=Path,
help="Output directory for multiple inputs, or output file path if a single input is provided.",
)
p.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing outputs.",
)
p.add_argument(
"--model",
default="gpt-4o-mini-transcribe",
help="OpenAI model to use (default: gpt-4o-mini-transcribe). Consider 'whisper-1' for backwards compatibility.",
)
p.add_argument(
"--sleep",
type=float,
default=0.0,
help="Seconds to sleep between API calls (useful for rate limiting).",
)
p.add_argument(
"--extensions",
default="mp3,mp4,mpeg,mpga,m4a,wav,webm,ogg,opus,oga,flac,aac,aiff,aif,mka,mkv,caf,pcm",
help="Comma-separated list of audio file extensions to include.",
)
p.add_argument(
"--response-format",
choices=("text", "srt", "vtt", "json", "verbose_json"),
default="text",
help="Transcription response format (default: text).",
)
p.add_argument(
"--language",
default=None,
help="Optional language code (e.g., 'en'). If omitted, auto-detection may occur.",
)
p.add_argument(
"--prompt",
default=None,
help="Optional prompt to guide the transcription.",
)
p.add_argument(
"--temperature",
type=float,
default=0.0,
help="Sampling temperature for transcription (0.0-1.0).",
)
p.add_argument(
"--max-retries",
type=int,
default=3,
help="Maximum number of retries on transient errors (default: 3).",
)
p.add_argument(
"--retry-backoff",
type=float,
default=2.0,
help="Exponential backoff multiplier between retries (default: 2.0).",
)
p.add_argument(
"--timeout",
type=float,
default=300.0,
help="Client request timeout in seconds (default: 300).",
)
# Transcoding controls
p.add_argument(
"--transcode",
choices=("auto", "always", "never"),
default="auto",
help="Transcode inputs to compact AAC/m4a with ffmpeg. "
"'auto' converts from inefficient/unsupported formats (default).",
)
p.add_argument(
"--ffmpeg-binary",
default=os.getenv("FFMPEG_BIN", "ffmpeg"),
help="Path or name of ffmpeg binary (default: ffmpeg).",
)
p.add_argument(
"--ffprobe-binary",
default=os.getenv("FFPROBE_BIN", "ffprobe"),
help="Path or name of ffprobe binary (default: ffprobe).",
)
p.add_argument(
"--aac-bitrate",
default="48k",
help="Target AAC audio bitrate for transcoding (e.g., '32k','48k','64k'; default: 48k).",
)
p.add_argument(
"--sample-rate",
type=int,
default=16000,
help="Target sample rate in Hz for transcoding (default: 16000).",
)
p.add_argument(
"--channels",
type=int,
default=1,
help="Target number of audio channels for transcoding (1=mono, 2=stereo; default: 1).",
)
p.add_argument(
"--keep-transcoded",
action="store_true",
help="Keep transcoded .m4a files for reuse; otherwise use a temp file and delete.",
)
p.add_argument(
"--transcoded-dir",
type=Path,
default=None,
help="Directory to place transcoded files when --keep-transcoded is set. "
"If omitted, write next to inputs.",
)
p.add_argument(
"--segment-seconds",
type=int,
default=600,
help="Split audio longer than this many seconds into segments (0 disables splitting, default: 600 to satisfy limits of 2048 output tokens at faster speaker speed, input of 25 Megabyte size and 1500 seconds length).",
)
p.add_argument(
"--keep-segments",
action="store_true",
help="Keep generated audio segments on disk; otherwise store in a temporary folder and delete after use.",
)
p.add_argument(
"--segments-dir",
type=Path,
default=None,
help="Directory to place audio segments when --keep-segments is set. If omitted, write next to inputs.",
)
return p.parse_args()
# Section: input expansion and path handling
def parse_extensions(exts_str: str) -> set[str]:
"""
Normalize a comma-separated list of extensions to a set of lowercase suffixes with leading dots.
"""
return {
"." + e.strip().lower().lstrip(".")
for e in exts_str.split(",")
if e.strip()
}
def expand_inputs(inputs: list[str], exts: set[str]) -> list[Path]:
"""
Expand input tokens into concrete audio files.
Supports:
- Paths to files
- Paths to directories (searched recursively)
- Glob patterns (e.g., *.mp3)
"""
files: list[Path] = []
seen: set[Path] = set()
def add_file(p: Path):
rp = p.resolve()
if rp not in seen and rp.suffix.lower() in exts:
files.append(rp)
seen.add(rp)
for token in inputs:
paths = [Path(token)]
if any(ch in token for ch in "*?[]"):
paths = [Path(p) for p in glob.glob(token, recursive=True)]
if not paths:
print(f"Warning: pattern matched nothing: {token}", file=sys.stderr)
continue
for p in paths:
if p.is_dir():
for f in p.rglob("*"):
if f.is_file():
add_file(f)
elif p.is_file():
add_file(p)
else:
print(f"Warning: path not found: {p}", file=sys.stderr)
return files
def output_suffix_for_format(fmt: str) -> str:
"""
Return a file extension (including leading dot) appropriate for the given response format.
"""
if fmt == "text":
return ".txt"
if fmt in {"json", "verbose_json"}:
return ".json"
if fmt == "srt":
return ".srt"
if fmt == "vtt":
return ".vtt"
return ".txt"
def resolve_output_path(in_path: Path, out_arg: Path | None, single: bool, out_suffix: str) -> Path:
"""
Compute output path for a given input path, based on -o/--output semantics.
"""
if out_arg is None:
return in_path.with_suffix(out_suffix)
if single:
if out_arg.exists() and out_arg.is_dir():
out_arg.mkdir(parents=True, exist_ok=True)
return out_arg / f"{in_path.stem}{out_suffix}"
treat_as_dir = (out_arg.suffix == "") or str(out_arg).endswith(("/", "\\"))
if treat_as_dir:
out_dir = Path(str(out_arg).rstrip("/\\"))
out_dir.mkdir(parents=True, exist_ok=True)
return out_dir / f"{in_path.stem}{out_suffix}"
out_arg.parent.mkdir(parents=True, exist_ok=True)
return out_arg
if out_arg.exists() and out_arg.is_file():
raise ValueError(f"--output points to a file but multiple inputs were provided: {out_arg}")
out_dir = out_arg
out_dir.mkdir(parents=True, exist_ok=True)
return out_dir / f"{in_path.stem}{out_suffix}"
# Section: HTTP helpers and error classification
class HTTPStatusError(Exception):
"""
Rich HTTP error with status, headers, and body snippet.
"""
def __init__(self, status: int, reason: str, body: bytes, headers: dict[str, str] | None = None, url: str | None = None):
self.status = status
self.reason = reason
self.body = body or b""
# Ensure case-insensitive access by normalizing keys to lowercase
self.headers = {k.lower(): v for k, v in (headers or {}).items()}
self.url = url
snippet = self.body[:1000].decode("utf-8", "replace")
super().__init__(f"HTTP {status} {reason}: {snippet}")
def is_transient_error(e: Exception) -> bool:
"""
Determine whether an error is likely transient and worth retrying.
"""
if isinstance(e, HTTPStatusError):
if e.status in {408, 409, 425, 429, 500, 502, 503, 504}:
return True
if "retry-after" in e.headers:
return True
return False
if isinstance(e, urllib.error.HTTPError):
code = getattr(e, "code", None)
return code in {408, 409, 425, 429, 500, 502, 503, 504}
if isinstance(e, urllib.error.URLError):
return True
if isinstance(e, (socket.timeout, TimeoutError, ConnectionError, ConnectionResetError)):
return True
msg = str(e).lower()
transient_markers = (
"timeout",
"temporarily",
"try again",
"rate limit",
"rate-limit",
"connection reset",
"connection aborted",
"server error",
"bad gateway",
"service unavailable",
"gateway timeout",
)
return any(m in msg for m in transient_markers)
def get_retry_after_seconds(headers: dict[str, str]) -> float | None:
"""
Parse Retry-After header if present (numeric seconds only).
"""
for key in ("retry-after", "Retry-After"):
if key in headers:
val = headers[key].strip()
try:
return float(val)
except Exception:
return None
return None
def guess_mime_type(path: Path) -> str:
"""
Guess MIME type from file suffix; default to application/octet-stream.
"""
mtype, _ = mimetypes.guess_type(str(path))
return mtype or "application/octet-stream"
def build_multipart_form(fields: dict[str, str | float | int | None], file_field_name: str, file_path: Path) -> tuple[str, bytes]:
"""
Build multipart/form-data body for one binary file + multiple text fields.
Returns (content_type_header_value, body_bytes).
Note: buffers entire body in memory.
"""
boundary = "----PythonMultipartBoundary" + uuid.uuid4().hex
crlf = b"\r\n"
buf = io.BytesIO()
def write_text_field(name: str, value: str):
buf.write(b"--" + boundary.encode("ascii") + crlf)
disp = f'Content-Disposition: form-data; name="{name}"'.encode("utf-8")
buf.write(disp + crlf)
buf.write(crlf)
buf.write(value.encode("utf-8"))
buf.write(crlf)
def write_file_field(name: str, filename: str, content_type: str, data: bytes):
buf.write(b"--" + boundary.encode("ascii") + crlf)
disp = f'Content-Disposition: form-data; name="{name}"; filename="{filename}"'.encode("utf-8")
buf.write(disp + crlf)
ctype = f"Content-Type: {content_type}".encode("utf-8")
buf.write(ctype + crlf)
buf.write(crlf)
buf.write(data)
buf.write(crlf)
# Write text fields
for k, v in fields.items():
if v is None:
continue
write_text_field(k, str(v))
# Write file field
file_bytes = file_path.read_bytes()
filename = file_path.name
ctype = guess_mime_type(file_path)
write_file_field(file_field_name, filename, ctype, file_bytes)
# Closing boundary
buf.write(b"--" + boundary.encode("ascii") + b"--" + crlf)
content_type = f"multipart/form-data; boundary={boundary}"
body = buf.getvalue()
return content_type, body
# Section: REST call and response formatting
def request_transcription(
in_path: Path,
*,
model: str,
response_format: str,
language: str | None,
prompt: str | None,
temperature: float,
timeout: float,
) -> tuple[bytes, dict[str, str]]:
"""
Perform a single REST call to the OpenAI Audio Transcriptions endpoint and return (body_bytes, response_headers).
"""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY is not set in the environment.")
base_url = (
os.getenv("OPENAI_BASE_URL")
or os.getenv("OPENAI_API_BASE")
or "https://api.openai.com/v1"
).rstrip("/")
url = f"{base_url}/audio/transcriptions"
fields: dict[str, str | float | int | None] = {
"model": model,
"response_format": response_format,
"temperature": temperature,
"language": language,
"prompt": prompt,
}
content_type, body = build_multipart_form(fields, "file", in_path)
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": content_type,
"Accept": "application/json" if response_format in {"json", "verbose_json"} else "*/*",
"User-Agent": "transcribe.py/1.1 (+https://platform.openai.com/) urllib",
}
org = os.getenv("OPENAI_ORGANIZATION") or os.getenv("OPENAI_ORG_ID")
if org:
headers["OpenAI-Organization"] = org
if os.getenv("OPENAI_ORG_ID"):
headers["OpenAI-Org-Id"] = os.getenv("OPENAI_ORG_ID") # type: ignore
project = os.getenv("OPENAI_PROJECT")
if project:
headers["OpenAI-Project"] = project
req = urllib.request.Request(url, data=body, method="POST", headers=headers)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
resp_body = resp.read()
resp_headers = {k.lower(): v for k, v in resp.headers.items()}
return resp_body, resp_headers
except urllib.error.HTTPError as he:
try:
err_body = he.read()
except Exception:
err_body = b""
hdrs = {}
try:
hdrs = {k.lower(): v for k, v in (he.headers or {}).items()}
except Exception:
hdrs = {}
raise HTTPStatusError(getattr(he, "code", 0) or 0, getattr(he, "reason", "") or "", err_body, hdrs, url=str(he.geturl() or url))
except Exception:
raise
def format_transcription_response(body: bytes, response_format: str) -> str:
"""
Convert HTTP response bytes to a string according to response_format.
"""
if response_format in {"json", "verbose_json"}:
data = json.loads(body.decode("utf-8"))
return json.dumps(data, ensure_ascii=False, indent=2)
return body.decode("utf-8")
# Section: ffmpeg helpers
ACCEPTED_API_EXTS = {".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm"}
def _ensure_dir(p: Path) -> Path:
"""
Ensure directory exists and return it.
"""
p.mkdir(parents=True, exist_ok=True)
return p
def _resolve_executable(bin_arg: str) -> str:
"""
Resolve a binary name or absolute/relative path to an executable string.
"""
# Direct path given
if os.path.sep in bin_arg or (os.path.altsep and os.path.altsep in bin_arg):
if os.path.exists(bin_arg):
return bin_arg
raise FileNotFoundError(f"Executable not found: {bin_arg}")
# Search PATH
found = shutil.which(bin_arg)
if not found:
raise FileNotFoundError(f"Executable not found in PATH: {bin_arg}")
return found
def _run_subprocess(cmd: list[str]) -> subprocess.CompletedProcess:
"""
Run a subprocess, capturing output; raise with full stderr on failure.
"""
return subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True,
)
def _parse_ffmpeg_time_to_seconds(s: str) -> float:
# Parse "HH:MM:SS.microseconds" into seconds.
try:
hh, mm, ss = s.split(":")
return int(hh) * 3600 + int(mm) * 60 + float(ss)
except Exception:
return 0.0
def _format_hms(seconds: float) -> str:
# Format seconds into "HH:MM:SS".
try:
t = max(0.0, float(seconds))
except Exception:
t = 0.0
h = int(t // 3600)
m = int((t % 3600) // 60)
s = int(t % 60)
return f"{h:02d}:{m:02d}:{s:02d}"
def _extract_input_path_from_ffmpeg_cmd(cmd: list[str]) -> str | None:
# Extract the input path passed to "-i" in the ffmpeg command.
try:
i = cmd.index("-i")
if i + 1 < len(cmd):
return cmd[i + 1]
except ValueError:
pass
return None
def _probe_duration_seconds(input_path: str) -> float | None:
# Probe input duration using ffprobe; return None if unavailable.
# Tries $FFPROBE_BIN or "ffprobe" on PATH; never raises.
try:
ffprobe_bin = os.getenv("FFPROBE_BIN", "ffprobe")
try:
ffprobe_bin = _resolve_executable(ffprobe_bin)
except Exception:
# Fall back to plain name; may still work if in PATH.
ffprobe_bin = ffprobe_bin
cmd = [
ffprobe_bin,
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
input_path,
]
res = _run_subprocess(cmd)
raw = (res.stdout or "").strip()
if not raw or raw.upper() == "N/A":
return None
return float(raw)
except Exception:
return None
def _run_ffmpeg_with_progress(cmd: list[str]) -> None:
# Render a single progress bar/percentage based on out_time vs total duration.
# Falls back to elapsed-only display if duration is unknown.
# Determine total duration before starting.
input_path = _extract_input_path_from_ffmpeg_cmd(cmd)
total_duration = _probe_duration_seconds(input_path) if input_path else None
# Add -progress to cmd before output path.
progress_cmd = cmd[:-1] + ["-progress", "pipe:1", "-nostats"] + cmd[-1:]
bar_width = 30
last_percent = -1.0
last_drawn = ""
speed_x: float | None = None
elapsed: float = 0.0
with subprocess.Popen(
progress_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # line-buffered for timely updates
) as p:
try:
for line in p.stdout:
line = line.strip()
if not line or "=" not in line:
continue
k, v = line.split("=", 1)
if k == "speed":
# Parse "70.1x" or "1.0x".
try:
vv = v.strip()
speed_x = float(vv[:-1]) if vv.endswith("x") else float(vv)
except Exception:
speed_x = None
continue
if k in ("out_time", "out_time_ms", "out_time_us", "out_time_ns"):
if k == "out_time":
elapsed = _parse_ffmpeg_time_to_seconds(v)
elif k == "out_time_ms":
# Note: ffmpeg historically used microseconds for *_ms; handle both safely.
try:
val = float(v)
# Heuristic: treat large values as microseconds.
elapsed = val / 1_000.0 if val < 1e10 else val / 1_000_000.0
except Exception:
elapsed = 0.0
elif k == "out_time_us":
try:
elapsed = float(v) / 1_000_000.0
except Exception:
elapsed = 0.0
elif k == "out_time_ns":
try:
elapsed = float(v) / 1_000_000_000.0
except Exception:
elapsed = 0.0
# Draw single-line progress.
if total_duration and total_duration > 0:
ratio = max(0.0, min(elapsed / total_duration, 1.0))
percent = ratio * 100.0
# Avoid overly chatty redraws.
if int(percent) != int(last_percent) or abs(percent - last_percent) >= 0.25:
filled = int(round(ratio * bar_width))
bar = "#" * filled + "-" * (bar_width - filled)
elapsed_str = _format_hms(elapsed)
total_str = _format_hms(total_duration)
sp = f" {speed_x:.1f}x" if (speed_x and speed_x > 0) else ""
msg = f"\r[ffmpeg] [{bar}] {percent:5.1f}% ({elapsed_str}/{total_str}){sp}"
if msg != last_drawn:
print(msg, end="", flush=True)
last_drawn = msg
last_percent = percent
else:
# Unknown total; show elapsed only.
elapsed_str = _format_hms(elapsed)
sp = f" {speed_x:.1f}x" if (speed_x and speed_x > 0) else ""
msg = f"\r[ffmpeg] elapsed {elapsed_str}{sp}"
if msg != last_drawn:
print(msg, end="", flush=True)
last_drawn = msg
continue
if k == "progress" and v == "end":
# Force 100% bar if total known.
if total_duration and total_duration > 0:
bar = "#" * bar_width
total_str = _format_hms(total_duration)
sp = f" {speed_x:.1f}x" if (speed_x and speed_x > 0) else ""
print(f"\r[ffmpeg] [{bar}] 100.0% ({total_str}/{total_str}){sp}", end="", flush=True)
print() # newline after progress
ret = p.wait()
if ret != 0:
err = p.stderr.read() if p.stderr else ""
raise RuntimeError(f"ffmpeg exited with {ret}: {err}")
finally:
if p and p.poll() is None:
p.kill()
def ffprobe_audio_streams(path: Path, ffprobe_bin: str) -> dict[str, Any] | None:
"""
Probe audio streams via ffprobe; return dict with key info or None on failure.
"""
try:
cmd = [
ffprobe_bin,
"-v", "error",
"-print_format", "json",
"-show_format",
"-show_streams",
"-select_streams", "a",
str(path),
]
res = _run_subprocess(cmd)
data = json.loads(res.stdout)
streams = data.get("streams") or []
fmt = data.get("format") or {}
info: dict[str, Any] = {}
if streams:
s0 = streams[0] or {}
info["codec_name"] = s0.get("codec_name")
info["sample_rate"] = int(s0.get("sample_rate") or 0) if s0.get("sample_rate") else None
info["channels"] = int(s0.get("channels") or 0) if s0.get("channels") else None
info["bit_rate"] = int(s0.get("bit_rate") or 0) if s0.get("bit_rate") else None
info["format_name"] = fmt.get("format_name")
# Count video streams if present
v_count = 0
for st in data.get("streams", []):
if st.get("codec_type") == "video":
v_count += 1
info["video_streams"] = v_count
return info
except Exception:
return None
def should_transcode(
in_path: Path,
*,
mode: str, # "auto" | "always" | "never"
ffprobe_bin: str | None,
target_sr: int,
target_ch: int,
) -> tuple[bool, str]:
"""
Decide whether to transcode to m4a.
Returns (should_transcode, reason).
"""
if mode == "always":
return True, "mode=always"
if mode == "never":
return False, "mode=never"
ext = in_path.suffix.lower()
# Strong candidates to transcode
heavy_or_unlisted = {
".wav", ".flac", ".aiff", ".aif", ".ogg", ".opus", ".oga", ".mka", ".mkv", ".caf", ".pcm", ".aac"
}
if ext in heavy_or_unlisted:
return True, f"ext={ext}"
# For mp4/webm, drop video if present
probe = None
if ffprobe_bin:
probe = ffprobe_audio_streams(in_path, ffprobe_bin)
if ext in {".mp4", ".webm"} and probe and probe.get("video_streams", 0) > 0:
return True, "has_video"
# For m4a/mp4/mp3/webm/mpga/ mpeg: consider downmix/downsample if large
if probe:
ch = probe.get("channels")
sr = probe.get("sample_rate")
if ch and ch > target_ch:
return True, f"channels={ch}>target"
if sr and sr > target_sr * 2: # downsample from very high SRs
return True, f"sample_rate={sr}>>{target_sr}"
# For other accepted extensions, keep as is
return False, "already efficient/accepted"
def compute_transcoded_path(
in_path: Path,
keep: bool,
dest_dir: Path | None,
) -> Path:
"""
Compute output path for the transcoded .m4a file.
"""
if keep:
if dest_dir:
out_dir = dest_dir
else:
out_dir = in_path.parent
out_dir.mkdir(parents=True, exist_ok=True)
# Avoid clobbering a real .m4a source; use .transcoded.m4a suffix
base = f"{in_path.stem}.transcoded.m4a"
return out_dir / base
# Temp file otherwise
tmp_dir = Path(tempfile.mkdtemp(prefix="transcribe_ffmpeg_"))
return tmp_dir / (in_path.stem + ".m4a")
def transcode_to_m4a(
in_path: Path,
out_path: Path,
*,
ffmpeg_bin: str,
aac_bitrate: str,
sample_rate: int,
channels: int,
) -> None:
"""
Transcode input to AAC/m4a suitable for STT.
"""
# Ensure parent exists
out_path.parent.mkdir(parents=True, exist_ok=True)
# Build ffmpeg command:
# - Drop video, map default audio, mono, target SR, AAC-LC, specified bitrate, faststart for MP4 container.
cmd = [
ffmpeg_bin,
"-hide_banner",
"-loglevel", "error",
"-nostdin",
"-y",
"-i", str(in_path),
"-vn",
"-ac", str(channels),
"-ar", str(sample_rate),
"-c:a", "aac",
"-b:a", aac_bitrate,
"-movflags", "+faststart",
str(out_path),
]
try:
ext_name = in_path.suffix.lstrip(".").upper() or "AUDIO"
print(
f"[ffmpeg] Converting {in_path} ({ext_name}) -> {out_path.name} "
f"[AAC {aac_bitrate}, {sample_rate} Hz, {channels} ch] ...",
flush=True,
)
_run_ffmpeg_with_progress(cmd)
print(f"[ffmpeg] Conversion complete: {out_path}", flush=True)
except subprocess.CalledProcessError as cpe:
msg = cpe.stderr.strip() if cpe.stderr else "ffmpeg failed without stderr."
raise RuntimeError(f"ffmpeg transcoding failed for {in_path}: {msg}") from cpe
def split_audio_if_needed(
in_path: Path,
*,
segment_seconds: int,
ffmpeg_binary: str,
keep_segments: bool,
segments_dir: Path | None,
) -> tuple[list[Path], Callable[[], None], str]:
"""
Split audio into fixed-length segments if duration exceeds segment_seconds.
Returns (segment_paths, cleanup_callback, note).
"""
if segment_seconds <= 0:
return [in_path], (lambda: None), "no-split (disabled)"
total_duration = _probe_duration_seconds(str(in_path))
if total_duration is None or total_duration <= segment_seconds:
return [in_path], (lambda: None), "no-split (short)"
ffmpeg_bin = _resolve_executable(ffmpeg_binary)
# Choose destination directory for segments.
if keep_segments:
out_dir = segments_dir if segments_dir else in_path.parent
_ensure_dir(out_dir)
cleanup = (lambda: None)
else:
out_dir = Path(tempfile.mkdtemp(prefix="transcribe_segments_"))
def cleanup():
try:
for f in out_dir.glob("*"):
f.unlink(missing_ok=True)
out_dir.rmdir()
except Exception:
pass
ext = in_path.suffix if in_path.suffix else ".m4a"
pattern = out_dir / f"{in_path.stem}.seg%04d{ext}"
# Build ffmpeg command to segment without re-encoding and reset timestamps.
cmd = [
ffmpeg_bin,
"-hide_banner",
"-loglevel", "error",
"-nostdin",
"-y",
"-i", str(in_path),
"-vn",
"-c:a", "copy",
"-f", "segment",
"-segment_time", str(segment_seconds),
"-reset_timestamps", "1",
str(pattern),
]
print(f"[ffmpeg] Splitting {in_path.name} into ~{segment_seconds}s segments ...", flush=True)
_run_ffmpeg_with_progress(cmd)
# Collect produced segments in order.
glob_pat = str(pattern).replace("%04d", "*")
segs = sorted(Path(p) for p in glob.glob(glob_pat))
if not segs:
raise RuntimeError(f"No segments produced by ffmpeg for {in_path}")
print(f"[ffmpeg] Produced {len(segs)} segments in {out_dir}", flush=True)
return segs, cleanup, f"split-into-{len(segs)}"
def _parse_srt_time(s: str) -> float:
"""
Parse SRT timestamp 'HH:MM:SS,mmm' to seconds.
"""
m = re.match(r"^\s*(\d{2}):(\d{2}):(\d{2}),(\d{3})\s*$", s)
if not m:
return 0.0
hh, mm, ss, ms = map(int, m.groups())
return hh * 3600 + mm * 60 + ss + ms / 1000.0
def _format_srt_time(t: float) -> str:
"""
Format seconds to SRT timestamp 'HH:MM:SS,mmm'.
"""
t = max(0.0, float(t))
hh = int(t // 3600)
mm = int((t % 3600) // 60)
ss = int(t % 60)
ms = int(round((t - int(t)) * 1000.0))
if ms >= 1000:
ss += 1
ms -= 1000
if ss >= 60:
mm += 1
ss -= 60
if mm >= 60:
hh += 1
mm -= 60
return f"{hh:02d}:{mm:02d}:{ss:02d},{ms:03d}"
def _parse_vtt_time(s: str) -> float:
"""
Parse VTT timestamp 'HH:MM:SS.mmm' or 'MM:SS.mmm' to seconds.
"""
s = s.strip()
parts = s.split(":")
try:
if len(parts) == 3:
hh = int(parts[0]); mm = int(parts[1]); ss = float(parts[2])
return hh * 3600 + mm * 60 + ss
if len(parts) == 2:
mm = int(parts[0]); ss = float(parts[1])
return mm * 60 + ss
except Exception:
pass
return 0.0
def _format_vtt_time(t: float) -> str:
"""
Format seconds to VTT timestamp 'HH:MM:SS.mmm'.
"""
t = max(0.0, float(t))
hh = int(t // 3600)
mm = int((t % 3600) // 60)
ss = int(t % 60)
ms = int(round((t - int(t)) * 1000.0))
if ms >= 1000:
ss += 1
ms -= 1000
if ss >= 60:
mm += 1
ss -= 60
if mm >= 60:
hh += 1
mm -= 60
return f"{hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}"
def _probe_durations(paths: list[Path]) -> list[float]:
"""
Probe durations for a list of paths; if probing fails return 0.0.
"""
durs: list[float] = []
for p in paths:
d = _probe_duration_seconds(str(p))
durs.append(float(d) if d is not None else 0.0)
return durs
def merge_segment_transcripts(
parts: list[str],
*,
response_format: str,
segment_paths: list[Path],
) -> str:
"""
Merge segment-level transcripts into a single transcript.
Handles text, srt, vtt. For json/verbose_json, concatenates text field when possible.
"""
rf = response_format
if len(parts) == 1:
return parts[0]
# Compute offsets by accumulating actual segment durations if available.
durs = _probe_durations(segment_paths)
offsets: list[float] = []
acc = 0.0
for d in durs:
offsets.append(acc)
acc += d if d > 0 else 0.0
if rf == "text":
blocks = [p.strip() for p in parts]
return ("\n\n".join(b for b in blocks if b)).rstrip() + "\n"
if rf == "srt":
out_lines: list[str] = []
idx = 1
for seg_idx, text in enumerate(parts):
off = offsets[seg_idx] if seg_idx < len(offsets) else seg_idx * 600.0
# Split into blocks separated by blank lines.
blocks = re.split(r"\r?\n\r?\n", text.strip(), flags=re.MULTILINE)
for blk in blocks:
lines = [ln for ln in blk.splitlines() if ln.strip() != ""]
if not lines:
continue
# Find the timing line. Often the pattern is:
# number
# HH:MM:SS,mmm --> HH:MM:SS,mmm
# text...
# Allow absence of numeric index.
time_line_idx = None
for i, ln in enumerate(lines[:2]): # usually within first two lines
if "-->" in ln:
time_line_idx = i
break
if time_line_idx is None and len(lines) >= 1 and "-->" in lines[0]:
time_line_idx = 0
if time_line_idx is None:
# Cannot parse; append as-is with a new index.
out_lines.append(str(idx))
out_lines.extend(lines)
out_lines.append("") # blank after cue
idx += 1
continue
time_line = lines[time_line_idx]
m = re.match(r"^\s*(.*?)\s*-->\s*(.*?)\s*$", time_line)
if not m:
out_lines.append(str(idx))
out_lines.extend(lines)
out_lines.append("")
idx += 1
continue
start_s = _parse_srt_time(m.group(1))
end_s = _parse_srt_time(m.group(2))
start_s += off
end_s += off
# Build output cue.
out_lines.append(str(idx))
out_lines.append(f"{_format_srt_time(start_s)} --> {_format_srt_time(end_s)}")
# Remaining content lines excluding index and original time line.
payload = [ln for i, ln in enumerate(lines) if i != time_line_idx and not ln.strip().isdigit()]
out_lines.extend(payload)
out_lines.append("")
idx += 1
return "\n".join(out_lines).rstrip() + "\n"
if rf == "vtt":
# Keep one header, shift times, concatenate cues.
out: list[str] = ["WEBVTT", ""]
first = True
for seg_idx, text in enumerate(parts):
off = offsets[seg_idx] if seg_idx < len(offsets) else seg_idx * 600.0
lines = text.splitlines()
i = 0
# Skip header lines in subsequent segments.
if first:
# Consume potential 'WEBVTT' header and metadata, but we already placed our own header.
if lines and lines[0].strip().upper().startswith("WEBVTT"):
i = 1
# Skip optional blank line after header.
if i < len(lines) and lines[i].strip() == "":
i += 1
else:
# Drop any header-like lines.
while i < len(lines) and (lines[i].strip() == "" or lines[i].strip().upper().startswith("WEBVTT")):
i += 1
first = False
# Process cues.
while i < len(lines):
ln = lines[i]
if "-->" in ln:
m = re.match(r"^\s*(.*?)\s*-->\s*(.*?)(\s+.*)?$", ln)
if m:
st = _parse_vtt_time(m.group(1)) + off
et = _parse_vtt_time(m.group(2)) + off
tail = m.group(3) or ""
out.append(f"{_format_vtt_time(st)} --> {_format_vtt_time(et)}{tail}")
i += 1
# Copy payload lines until blank line.
while i < len(lines) and lines[i].strip() != "":
out.append(lines[i])
i += 1
out.append("")
# Skip the blank line separator.
while i < len(lines) and lines[i].strip() == "":
i += 1
continue
# Pass through non-cue lines (e.g., NOTE) unchanged.
out.append(ln)
i += 1
return "\n".join(out).rstrip() + "\n"
if rf in {"json", "verbose_json"}:
# Best-effort merge: concatenate top-level 'text' if present.
texts: list[str] = []
segments_agg: list[dict[str, object]] = []
for seg_idx, s in enumerate(parts):
try:
obj = json.loads(s)
except Exception:
continue
t = str(obj.get("text", "")).strip() if isinstance(obj, dict) else ""
if t:
texts.append(t)
# For verbose_json, merge segments with offset applied if present.
if rf == "verbose_json" and isinstance(obj, dict) and isinstance(obj.get("segments"), list):
off = offsets[seg_idx] if seg_idx < len(offsets) else seg_idx * 600.0
for seg in obj["segments"]:
try:
seg2 = dict(seg)
if "start" in seg2:
seg2["start"] = float(seg2["start"]) + off
if "end" in seg2:
seg2["end"] = float(seg2["end"]) + off
segments_agg.append(seg2)
except Exception:
segments_agg.append(seg)
out_obj: dict[str, object] = {"text": (" ".join(texts).strip() if texts else "")}
if rf == "verbose_json":
out_obj["segments"] = segments_agg
return json.dumps(out_obj, ensure_ascii=False, indent=2)
# Fallback: concatenation.
return ("\n\n".join(p.strip() for p in parts if p.strip())).rstrip() + "\n"
def prepare_audio_for_upload(
in_path: Path,
*,
transcode_mode: str,
ffmpeg_binary: str,
ffprobe_binary: str,
aac_bitrate: str,
sample_rate: int,
channels: int,
keep_transcoded: bool,
transcoded_dir: Path | None,
) -> tuple[Path, Callable[[], None], str]:
"""
Prepare audio for upload.
Return (path_to_upload, cleanup_callback, note).
"""
# Resolve available binaries only if needed
probe_bin: str | None = None
if transcode_mode != "never":
try:
probe_bin = _resolve_executable(ffprobe_binary)
except Exception:
probe_bin = None # Optional in auto mode
do_transcode, reason = should_transcode(
in_path,
mode=transcode_mode,
ffprobe_bin=probe_bin,
target_sr=sample_rate,
target_ch=channels,
)
if not do_transcode:
return in_path, (lambda: None), f"no-transcode ({reason})"
# Require ffmpeg when transcoding
ffmpeg_bin = _resolve_executable(ffmpeg_binary)
out_path = compute_transcoded_path(
in_path,
keep=keep_transcoded,
dest_dir=transcoded_dir,
)
# Reuse if up-to-date
if keep_transcoded and out_path.exists():
try:
if out_path.stat().st_mtime >= in_path.stat().st_mtime:
print(
f"[ffmpeg] Reusing existing transcoded file: {out_path}",
flush=True,
)
return out_path, (lambda: None), f"reuse-transcoded ({reason})"
except Exception:
pass
transcode_to_m4a(
in_path,
out_path,
ffmpeg_bin=ffmpeg_bin,
aac_bitrate=aac_bitrate,
sample_rate=sample_rate,
channels=channels,
)
def cleanup():
# Remove temp folder if created under a temp dir; if kept, do nothing
if not keep_transcoded:
try:
# Remove file and parent temp dir
if out_path.exists():
out_path.unlink(missing_ok=True)
parent = out_path.parent
# Remove temp dir if empty
try:
parent.rmdir()
except Exception:
pass
except Exception:
pass
return out_path, cleanup, f"transcoded ({reason})"
# Section: retry wrapper
def transcribe_with_retries(
upload_path: Path,
*,
model: str,
response_format: str,
language: str | None,
prompt: str | None,
temperature: float,
max_retries: int,
retry_backoff: float,
timeout: float,
) -> str:
"""
Transcribe a single (possibly transcoded) file with retry/backoff on transient errors.
"""
attempt = 0
delay = 1.0
while True:
attempt += 1
try:
body, resp_headers = request_transcription(
upload_path,
model=model,
response_format=response_format,
language=language,
prompt=prompt,
temperature=temperature,
timeout=timeout,
)
return format_transcription_response(body, response_format)
except Exception as e:
is_transient = is_transient_error(e)
if attempt > max_retries or not is_transient:
raise
retry_after = None
if isinstance(e, HTTPStatusError):
retry_after = get_retry_after_seconds(e.headers)
wait = max(delay, retry_after or 0.0)
print(
f"Transient error on {upload_path.name} (attempt {attempt}/{max_retries}): {e}; retrying in {wait:.1f}s",
file=sys.stderr,
)
time.sleep(wait)
delay *= max(1.0, retry_backoff)
# Section: main entry
def main() -> int:
"""
Main entry point.
"""
args = parse_args()
exts = parse_extensions(args.extensions)
if sys.version_info < (3, 10):
print("Warning: Python 3.10+ is recommended for this script.", file=sys.stderr)
if not os.getenv("OPENAI_API_KEY"):
print("Error: OPENAI_API_KEY is not set in the environment.", file=sys.stderr)
return 2
if not (0.0 <= args.temperature <= 1.0):
print(f"Warning: --temperature {args.temperature} is outside [0.0, 1.0]; clamping.", file=sys.stderr)
args.temperature = max(0.0, min(1.0, args.temperature))
if args.model == "whisper-1":
print("Note: 'whisper-1' may be slower and costlier than newer STT models. Consider 'gpt-4o-mini-transcribe'.", file=sys.stderr)
files = expand_inputs(args.inputs, exts)
if not files:
print("No input audio files found.", file=sys.stderr)
return 1
out_suffix = output_suffix_for_format(args.response_format)
total = len(files)
for idx, in_path in enumerate(files, start=1):
cleanup_fn: Callable[[], None] = lambda: None
seg_cleanup_fn: Callable[[], None] = lambda: None
try:
out_path = resolve_output_path(in_path, args.output, single=(total == 1), out_suffix=out_suffix)
if out_path.exists() and not args.overwrite:
print(f"Skipping (exists): {out_path}")
continue
# Prepare audio: transcode if needed
upload_path, cleanup_fn, transcode_note = prepare_audio_for_upload(
in_path,
transcode_mode=args.transcode,
ffmpeg_binary=args.ffmpeg_binary,
ffprobe_binary=args.ffprobe_binary,
aac_bitrate=args.aac_bitrate,
sample_rate=args.sample_rate,
channels=args.channels,
keep_transcoded=args.keep_transcoded,
transcoded_dir=args.transcoded_dir,
)
# Split long audio if needed (after transcoding decision for stability).
segments, seg_cleanup_fn, split_note = split_audio_if_needed(
upload_path,
segment_seconds=args.segment_seconds,
ffmpeg_binary=args.ffmpeg_binary,
keep_segments=args.keep_segments,
segments_dir=args.segments_dir,
)
print(f"[{idx}/{total}] Transcribing ({transcode_note}; {split_note}): {in_path} -> {out_path}")
if len(segments) == 1:
transcript_str = transcribe_with_retries(
segments[0],
model=args.model,
response_format=args.response_format,
language=args.language,
prompt=args.prompt,
temperature=args.temperature,
max_retries=args.max_retries,
retry_backoff=args.retry_backoff,
timeout=args.timeout,
)
else:
# Transcribe each segment and merge.
part_texts: list[str] = []
for j, seg_path in enumerate(segments, start=1):
print(f" - Segment {j}/{len(segments)}: {seg_path.name}")
part = transcribe_with_retries(
seg_path,
model=args.model,
response_format=args.response_format,
language=args.language,
prompt=args.prompt,
temperature=args.temperature,
max_retries=args.max_retries,
retry_backoff=args.retry_backoff,
timeout=args.timeout,
)
part_texts.append(part)
if args.sleep:
time.sleep(args.sleep)
transcript_str = merge_segment_transcripts(
part_texts,
response_format=args.response_format,
segment_paths=segments,
)
out_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = out_path.with_suffix(out_path.suffix + ".tmp")
to_write = transcript_str if args.response_format != "text" else transcript_str.rstrip() + "\n"
tmp_path.write_text(to_write, encoding="utf-8")
os.replace(tmp_path, out_path)
print(f"Saved: {out_path}")
if args.sleep:
time.sleep(args.sleep)
except FileNotFoundError as ex:
print(f"Error: {ex}", file=sys.stderr)
except Exception as e:
print(f"Error transcribing {in_path}: {e}", file=sys.stderr)
finally:
try:
cleanup_fn()
except Exception:
pass
try:
seg_cleanup_fn()
except Exception:
pass
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment