Skip to content

Instantly share code, notes, and snippets.

@twobob
Last active May 8, 2026 13:41
Show Gist options
  • Select an option

  • Save twobob/e8069711d5357ccebc685ebca53c5435 to your computer and use it in GitHub Desktop.

Select an option

Save twobob/e8069711d5357ccebc685ebca53c5435 to your computer and use it in GitHub Desktop.
stt using parakeet and optionally canary or granite
# setup_complete_pytorch_stack_128_270_313.ps1
$EnvName = "128_270_313"
$InstallDir = "$env:USERPROFILE\Miniconda3"
$CondaPath = "$InstallDir\Scripts\conda.exe"
Write-Host "1. Downloading Miniconda..."
Invoke-WebRequest -Uri "https://repo.anaconda.com/miniconda/Miniconda3-latest-Windows-x86_64.exe" -OutFile "miniconda_installer.exe"
Write-Host "2. Installing Miniconda..."
Start-Process -FilePath ".\miniconda_installer.exe" -ArgumentList "/InstallationType=JustMe /RegisterPython=0 /S /D=$InstallDir" -Wait
Write-Host "3. Initializing shell profiles..."
& $CondaPath init powershell
& $CondaPath init cmd.exe
Remove-Item ".\miniconda_installer.exe"
Write-Host "4. Accepting Anaconda Terms of Service..."
& $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/main
& $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/r
& $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/msys2
Write-Host "5. Provisioning Python 3.13 environment ('$EnvName')..."
& $CondaPath create --name $EnvName python=3.13 -y
Write-Host "6. Installing PyTorch 2.7.0 and NVIDIA CUDA 12.8 toolkit via PIP wheel index... this make take a while, be patient"
& $CondaPath run -n $EnvName pip install torch==2.7.0 torchvision==0.22.0 torchaudio==2.7.0 --index-url https://download.pytorch.org/whl/cu128
Write-Host "7. Installing editdistance..."
& $CondaPath install --name $EnvName editdistance -y
Write-Host "8. installing extras, one sec"
& $CondaPath run -n $EnvName pip install nemo_toolkit["asr"] keyboard sounddevice soundfile pyperclip
Write-Host "Operation Complete. Restart your terminal and run 'conda activate $EnvName' to begin development."
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import queue
import subprocess
import sys
import tempfile
import threading
import time
import re
from functools import reduce
from pathlib import Path
from typing import Any
DEFAULT_PARAKEET_MODEL = os.environ.get("PARAKEET_MODEL", "nvidia/parakeet-tdt-0.6b-v3")
DEFAULT_CANARY_MODEL = os.environ.get("CANARY_MODEL", "nvidia/canary-qwen-2.5b")
DEFAULT_GRANITE_MODEL = os.environ.get("GRANITE_MODEL", "ibm-granite/granite-speech-4.1-2b")
DEFAULT_MODEL_ENV_PREFIX = Path(
os.environ.get("ANTIGRAVITY_MODEL_ENV_PREFIX", r"C:\Users\new\Miniconda3\envs\128_270_313")
).expanduser()
class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter):
pass
DEFAULT_DEVICE = os.environ.get("TRANSCRIBE_DEVICE", "auto").strip().lower()
DEFAULT_DTYPE = os.environ.get("TRANSCRIBE_DTYPE", "auto").strip().lower()
DEFAULT_SAMPLE_RATE = int(os.environ.get("MIC_SAMPLE_RATE", "16000"))
DEFAULT_START_THRESHOLD = float(os.environ.get("MIC_START_THRESHOLD", "0.015"))
DEFAULT_STOP_THRESHOLD = float(os.environ.get("MIC_STOP_THRESHOLD", "0.010"))
DEFAULT_MIN_SPEECH_SECONDS = float(os.environ.get("MIC_MIN_SPEECH_SECONDS", "0.18"))
DEFAULT_SILENCE_SECONDS = float(os.environ.get("MIC_SILENCE_SECONDS", "0.85"))
DEFAULT_PREROLL_SECONDS = float(os.environ.get("MIC_PREROLL_SECONDS", "0.35"))
DEFAULT_MAX_RECORD_SECONDS = float(os.environ.get("MIC_MAX_RECORD_SECONDS", "60.0"))
DEFAULT_BLOCK_MS = int(os.environ.get("MIC_BLOCK_MS", "30"))
DEFAULT_CONFIG_PATH = Path(
os.environ.get(
"ANTIGRAVITY_STT_CONFIG",
str(Path.home() / ".config" / "antigravity" / "stt_config.json") if os.name != "nt"
else str(Path.home() / "AppData" / "Local" / "Antigravity" / "stt_config.json"),
)
).expanduser()
class StatusReporter:
def __init__(self, enabled: bool = True) -> None:
self.enabled = enabled
self._lock = threading.RLock()
self._last_len = 0
def set_enabled(self, enabled: bool) -> None:
with self._lock:
self.enabled = enabled
def show(self, message: str) -> None:
with self._lock:
if not self.enabled:
return
line = f"[STT] {message}"
padded = line
if self._last_len > len(line):
padded = line + (" " * (self._last_len - len(line)))
print(f"\r{padded}", file=sys.stderr, end="", flush=True)
self._last_len = len(line)
def clear(self) -> None:
with self._lock:
if not self.enabled:
return
if self._last_len > 0:
print("\r" + (" " * (self._last_len + 6)) + "\r", file=sys.stderr, end="", flush=True)
self._last_len = 0
def done(self, message: str) -> None:
with self._lock:
if not self.enabled:
return
self.show(message)
print(file=sys.stderr, flush=True)
self._last_len = 0
STATUS = StatusReporter(enabled=True)
class ConfigManager:
def __init__(self, path: Path) -> None:
self.path = path
self._lock = threading.RLock()
def load(self) -> dict[str, Any]:
with self._lock:
if not self.path.exists():
return {}
try:
return json.loads(self.path.read_text(encoding="utf-8"))
except Exception:
return {}
def save(self, data: dict[str, Any]) -> None:
with self._lock:
self.path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.path.with_suffix(self.path.suffix + ".tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(self.path)
def get_preferred_microphone(self) -> dict[str, Any] | None:
item = self.load().get("preferred_microphone")
return item if isinstance(item, dict) else None
def set_preferred_microphone(self, microphone: dict[str, Any] | None) -> None:
data = self.load()
if microphone is None:
data.pop("preferred_microphone", None)
else:
data["preferred_microphone"] = microphone
self.save(data)
CONFIG = ConfigManager(DEFAULT_CONFIG_PATH)
class ModelManager:
def __init__(self) -> None:
self._lock = threading.RLock()
self._parakeet = None
self._parakeet_name: str | None = None
self._canary = None
self._canary_name: str | None = None
self._granite = None
self._granite_processor = None
self._granite_name: str | None = None
def _torch(self):
import torch
return torch
def resolve_device(self) -> str:
torch = self._torch()
if DEFAULT_DEVICE == "auto":
return "cuda" if torch.cuda.is_available() else "cpu"
return DEFAULT_DEVICE
def resolve_dtype(self):
torch = self._torch()
if DEFAULT_DTYPE == "auto":
return torch.float16 if self.resolve_device() == "cuda" else torch.float32
return {
"float16": torch.float16,
"float32": torch.float32,
"bfloat16": torch.bfloat16,
}[DEFAULT_DTYPE]
def get_parakeet(self, model_name: str):
with self._lock:
if self._parakeet is not None and self._parakeet_name == model_name:
return self._parakeet
STATUS.show(f"DOWNLOADING / LOADING PARAKEET: {model_name}")
import nemo.collections.asr as nemo_asr
model = nemo_asr.models.ASRModel.from_pretrained(model_name=model_name)
if self.resolve_device() == "cuda":
model = model.cuda()
model.eval()
self._parakeet = model
self._parakeet_name = model_name
STATUS.show(f"PARAKEET READY: {model_name}")
return model
def get_canary(self, model_name: str):
with self._lock:
if self._canary is not None and self._canary_name == model_name:
return self._canary
STATUS.show(f"DOWNLOADING / LOADING CANARY: {model_name}")
from nemo.collections.speechlm2.models import SALM
model = SALM.from_pretrained(model_name)
try:
if self.resolve_device() == "cuda":
model = model.cuda()
if hasattr(model, "to"):
model = model.to(dtype=self.resolve_dtype())
model.eval()
except Exception:
pass
self._canary = model
self._canary_name = model_name
STATUS.show(f"CANARY READY: {model_name}")
return model
def get_granite(self, model_name: str):
with self._lock:
if self._granite is not None and self._granite_processor is not None and self._granite_name == model_name:
return self._granite_processor, self._granite
STATUS.show(f"DOWNLOADING / LOADING GRANITE: {model_name}")
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
model = AutoModelForSpeechSeq2Seq.from_pretrained(
model_name,
torch_dtype=self.resolve_dtype(),
)
if hasattr(model, "to"):
model = model.to(self.resolve_device())
if hasattr(model, "eval"):
model.eval()
processor = AutoProcessor.from_pretrained(model_name)
self._granite = model
self._granite_processor = processor
self._granite_name = model_name
STATUS.show(f"GRANITE READY: {model_name}")
return processor, model
MODELS = ModelManager()
def get_model_env_python(env_prefix: Path | None = None) -> Path | None:
prefix = (env_prefix or DEFAULT_MODEL_ENV_PREFIX).expanduser()
python_name = "python.exe" if os.name == "nt" else "bin/python"
candidate = prefix / python_name
return candidate if candidate.exists() else None
def is_running_in_model_env(env_prefix: Path | None = None) -> bool:
prefix = (env_prefix or DEFAULT_MODEL_ENV_PREFIX).expanduser()
try:
executable = Path(sys.executable).resolve()
prefix = prefix.resolve()
return os.path.commonpath([str(executable), str(prefix)]) == str(prefix)
except Exception:
return False
def should_delegate_model_inference(mode: str) -> bool:
if os.environ.get("ANTIGRAVITY_MODEL_ENV_ACTIVE") == "1":
return False
model_python = get_model_env_python()
if model_python is None or is_running_in_model_env():
return False
return mode in {"fast", "granite", "high_quality"}
def run_transcription_in_model_env(audio_path: Path, mode: str) -> str:
model_python = get_model_env_python()
if model_python is None:
raise RuntimeError(f"Model environment Python not found under: {DEFAULT_MODEL_ENV_PREFIX}")
env = os.environ.copy()
env["ANTIGRAVITY_MODEL_ENV_ACTIVE"] = "1"
env["PYTHONIOENCODING"] = "utf-8"
proc = subprocess.run(
[
str(model_python),
str(Path(__file__).resolve()),
"internal-transcribe",
"--audio-path",
str(audio_path),
"--mode",
mode,
],
capture_output=True,
text=True,
encoding="utf-8",
env=env,
)
if proc.returncode != 0:
stderr = (proc.stderr or "").strip()
stdout = (proc.stdout or "").strip()
detail = stderr or stdout or f"exit code {proc.returncode}"
raise RuntimeError(f"Model env transcription failed: {detail}")
lines = [line.strip() for line in (proc.stdout or "").splitlines() if line.strip()]
if not lines:
raise RuntimeError("Model env transcription produced no output.")
try:
payload = json.loads(lines[-1])
except json.JSONDecodeError as exc:
raise RuntimeError(f"Model env transcription returned invalid JSON: {exc}") from exc
text = payload.get("text")
if not isinstance(text, str):
raise RuntimeError("Model env transcription response did not include text.")
return text.strip()
def query_microphones() -> dict[str, Any]:
import sounddevice as sd
devices = sd.query_devices()
try:
default_input_index = sd.default.device[0]
except Exception:
default_input_index = None
result = []
for idx, raw in enumerate(devices):
rec = {
"index": int(idx),
"name": str(raw.get("name", f"Input {idx}")),
"max_input_channels": int(raw.get("max_input_channels", 0) or 0),
"default_samplerate": float(raw.get("default_samplerate", 0.0) or 0.0),
"is_default_input": default_input_index is not None and int(idx) == int(default_input_index),
}
if rec["max_input_channels"] > 0:
result.append(rec)
return {"devices": result, "default_input_index": default_input_index}
def resolve_microphone(device: int | str | None) -> tuple[int | None, dict[str, Any] | None]:
info = query_microphones()
if device is None:
preferred = CONFIG.get_preferred_microphone()
if preferred is not None and isinstance(preferred.get("index"), int):
for mic in info["devices"]:
if int(mic["index"]) == int(preferred["index"]):
return int(mic["index"]), mic
default_index = info["default_input_index"]
if default_index is None:
return None, None
for mic in info["devices"]:
if int(mic["index"]) == int(default_index):
return int(mic["index"]), mic
return int(default_index), None
if isinstance(device, int) or (isinstance(device, str) and device.isdigit()):
idx = int(device)
for mic in info["devices"]:
if int(mic["index"]) == idx:
return idx, mic
raise ValueError(f"Microphone index not found: {idx}")
target = str(device).strip().lower()
exact = None
partial = None
for mic in info["devices"]:
name = str(mic["name"]).lower()
if name == target:
exact = mic
break
if target in name and partial is None:
partial = mic
match = exact or partial
if match is None:
raise ValueError(f"Microphone name not found: {device}")
return int(match["index"]), match
def pick_record_samplerate(requested: int, mic: dict[str, Any] | None) -> int:
if requested > 0:
return requested
if mic is not None:
rate = int(float(mic.get("default_samplerate") or 0.0))
if rate > 0:
return rate
return DEFAULT_SAMPLE_RATE
def record_phrase_to_wav(
output_path: Path,
device: int | None,
sample_rate: int,
start_threshold: float,
stop_threshold: float,
min_speech_seconds: float,
silence_seconds: float,
preroll_seconds: float,
max_record_seconds: float,
block_ms: int,
) -> dict[str, Any]:
allow_empty: bool = False,
) -> dict[str, Any] | None:
import collections
import numpy as np
import sounddevice as sd
import soundfile as sf
block_frames = max(1, int(sample_rate * (block_ms / 1000.0)))
preroll_blocks = max(1, int(round(preroll_seconds * sample_rate / block_frames)))
min_speech_blocks = max(1, int(round(min_speech_seconds * sample_rate / block_frames)))
silence_blocks_to_stop = max(1, int(round(silence_seconds * sample_rate / block_frames)))
max_blocks = max(1, int(round(max_record_seconds * sample_rate / block_frames)))
q: queue.Queue[Any] = queue.Queue()
preroll = collections.deque(maxlen=preroll_blocks)
utterance: list[np.ndarray] = []
speech_started = False
speech_count = 0
silence_count = 0
total_blocks = 0
peak_rms = 0.0
def callback(indata, frames, time_info, status):
q.put(indata.copy())
STATUS.show("LISTENING")
t0 = time.perf_counter()
with sd.InputStream(
samplerate=sample_rate,
channels=1,
dtype="float32",
blocksize=block_frames,
callback=callback,
device=device,
):
while True:
block = q.get()
total_blocks += 1
mono = block[:, 0] if block.ndim > 1 else block
rms = float((mono.astype("float64") ** 2).mean() ** 0.5)
peak_rms = max(peak_rms, rms)
if not speech_started:
preroll.append(block)
if rms >= start_threshold:
speech_count += 1
else:
speech_count = 0
if speech_count >= min_speech_blocks:
speech_started = True
STATUS.show("SPEECH DETECTED")
utterance.extend(list(preroll))
utterance.append(block)
silence_count = 0
else:
utterance.append(block)
if rms <= stop_threshold:
silence_count += 1
else:
silence_count = 0
if silence_count >= silence_blocks_to_stop:
break
if total_blocks >= max_blocks:
break
t1 = time.perf_counter()
if not utterance:
if allow_empty:
return None
raise RuntimeError("No speech detected.")
audio = np.concatenate(utterance, axis=0).astype("float32", copy=False)
output_path.parent.mkdir(parents=True, exist_ok=True)
sf.write(str(output_path), audio, sample_rate, subtype="PCM_16")
t2 = time.perf_counter()
return {
"capture_ms": round((t1 - t0) * 1000.0, 3),
"write_wav_ms": round((t2 - t1) * 1000.0, 3),
"total_capture_ms": round((t2 - t0) * 1000.0, 3),
"peak_rms": round(peak_rms, 6),
}
def transcribe_file(audio_path: Path, mode: str) -> str:
STATUS.show(f"TRANSCRIBING ({mode})")
if should_delegate_model_inference(mode):
STATUS.show(f"TRANSCRIBING ({mode} via env)")
return run_transcription_in_model_env(audio_path, mode)
if mode == "fast":
model = MODELS.get_parakeet(DEFAULT_PARAKEET_MODEL)
try:
result = model.transcribe(
[str(audio_path)],
batch_size=1,
verbose=False,
return_hypotheses=True,
)
except TypeError:
result = model.transcribe(
[str(audio_path)],
batch_size=1,
verbose=False,
)
if isinstance(result, tuple):
result = result[0]
if not isinstance(result, list):
result = [result]
hyp = result[0] if result else None
if hasattr(hyp, "text"):
return str(hyp.text or "").strip()
return str(hyp or "").strip()
if mode == "granite":
import numpy as np
import soundfile as sf
processor, model = MODELS.get_granite(DEFAULT_GRANITE_MODEL)
tokenizer = getattr(processor, "tokenizer", processor)
audio, sample_rate = sf.read(str(audio_path), dtype="float32", always_2d=False)
audio = np.asarray(audio, dtype="float32")
if audio.ndim > 1:
audio = audio.mean(axis=1)
if int(sample_rate) != 16000:
src_positions = np.linspace(0.0, 1.0, num=max(1, audio.shape[0]), endpoint=False)
dst_length = max(1, int(round(audio.shape[0] * 16000 / int(sample_rate))))
dst_positions = np.linspace(0.0, 1.0, num=dst_length, endpoint=False)
audio = np.interp(dst_positions, src_positions, audio).astype("float32", copy=False)
sample_rate = 16000
prompt_text = "<|audio|>transcribe the speech with proper punctuation and capitalization."
if hasattr(tokenizer, "apply_chat_template"):
prompt_text = tokenizer.apply_chat_template(
[{"role": "user", "content": prompt_text}],
tokenize=False,
add_generation_prompt=True,
)
model_inputs = processor(prompt_text, audio, return_tensors="pt")
if hasattr(model_inputs, "to"):
model_inputs = model_inputs.to(MODELS.resolve_device())
model_outputs = model.generate(
**model_inputs,
max_new_tokens=200,
do_sample=False,
num_beams=1,
)
num_input_tokens = model_inputs["input_ids"].shape[-1] if "input_ids" in model_inputs else 0
new_tokens = model_outputs[:, num_input_tokens:] if num_input_tokens else model_outputs
if hasattr(tokenizer, "batch_decode"):
return str(
tokenizer.batch_decode(
new_tokens,
add_special_tokens=False,
skip_special_tokens=True,
)[0]
).strip()
return str(new_tokens).strip()
model = MODELS.get_canary(DEFAULT_CANARY_MODEL)
audio_locator = getattr(model, "audio_locator_tag", "<|audioplaceholder|>")
audio_str = str(audio_path)
# Exact schema mandated by NeMo SALM documentation
prompts = [
[
{
"role": "user",
"content": f"Transcribe the following: {audio_locator}",
"audio": [audio_str]
}
]
]
try:
answer_ids = model.generate(prompts=prompts, max_new_tokens=1024)
except Exception as exc:
raise RuntimeError(f"Canary generation failed: {exc}")
try:
if hasattr(answer_ids, "cpu"):
tokens = answer_ids.cpu().tolist()
else:
tokens = answer_ids
# Flatten nested lists sequentially
while isinstance(tokens, list) and len(tokens) > 0 and isinstance(tokens[0], list):
tokens = tokens[0]
if hasattr(model.tokenizer, "decode"):
text_out = model.tokenizer.decode(tokens, skip_special_tokens=True)
elif hasattr(model.tokenizer, "ids_to_text"):
text_out = model.tokenizer.ids_to_text(tokens)
else:
text_out = str(tokens)
except Exception as exc:
raise RuntimeError(f"Failed to decode Canary output: {exc}")
# Strip Qwen ChatML artifacts
if "<|im_start|>assistant" in text_out:
text_out = text_out.split("<|im_start|>assistant")[-1]
text_out = text_out.replace("<|im_end|>", "").replace("<|im_start|>", "").strip()
return text_out
def convert_us_to_uk_orthography_oneliner(text: str) -> str:
"""
Final optimized US->UK orthographic converter.
High-coverage heuristic using functional reduction.
"""
return reduce(lambda t, rule: re.sub(rule[0], lambda m: (lambda orig, exp: exp.upper() if orig.isupper() else (exp[0].upper() + exp[1:] if orig[0].isupper() else exp.lower()))(m.group(0), m.expand(rule[1])), t, flags=re.IGNORECASE), [
(r"\b(\w+)yz(e|es|ed|ing)\b", r"\1ys\2"),
(r"\b(?!(?:size|prize|capsize|seize|maize|assize|glaze|gaze|raze|doze|blaze)\b)(\w+)iz(e|es|ed|ing)\b", r"\1is\2"),
(r"\b(?!(?:actor|author|doctor|error|motor|sponsor|mirror|major|minor|sensor|factor|prior|mayor|senator|governor|chancellor|successor|vendor|visitor|terror|honorary)\b)(\w{2,})or(s|)\b", r"\1our\2"),
(r"\b(\w*[aeiou])l(ed|ing|er|ers)\b", r"\1ll\2"),
(r"\b(cent|met|theat|lit|fib|sombr|meagr|calibr|lust|spect|sepulch)er(s|)\b", r"\1re\2"),
(r"\b(\w+)(log|gog)(s|)\b", r"\1\2ue\3"),
(r"\b(def|off|pret)ense(s|)\b", r"\1ence\2"),
(r"\b(an|p|orthop|gyn|leuk|an|arch|encyclop|h)e(m|diatr|d|col|sthes|ol|matol)", r"\1ae\2"),
(r"\b(estrogen|esophagus|edema)\b", r"o\1"),
(r"\bmaneuver(s|ed|ing|)\b", r"manoeuvre\1"),
(r"\baluminum\b", "aluminium"),
(r"\bcheck(s|)\b", r"cheque\1"),
(r"\bjewelry\b", "jewellery"),
(r"\bprogram(s|)\b", r"programme\1"),
(r"\bmold(s|)\b", r"mould\1"),
(r"\bgray\b", "grey")
], text)
def emit_text_at_cursor(text: str, paste: bool = True) -> None:
if not text:
return
STATUS.show("PASTING")
if paste:
import pyperclip
import keyboard
pyperclip.copy(text)
time.sleep(0.05)
keyboard.press_and_release("ctrl+v")
return
import keyboard
keyboard.write(text, delay=0)
def capture_and_transcribe(microphone: int | str | None, mode: str, sample_rate: int, paste: bool, no_uk_spelling: bool = False) -> str:
return capture_and_transcribe_once(
microphone,
mode,
sample_rate,
paste,
no_uk_spelling=no_uk_spelling,
allow_no_speech=False,
)
def capture_and_transcribe_once(
microphone: int | str | None,
mode: str,
sample_rate: int,
paste: bool,
start_threshold: float = DEFAULT_START_THRESHOLD,
stop_threshold: float = DEFAULT_STOP_THRESHOLD,
min_speech_seconds: float = DEFAULT_MIN_SPEECH_SECONDS,
no_uk_spelling: bool = False,
allow_no_speech: bool = False,
) -> str:
idx, mic = resolve_microphone(microphone)
rate = pick_record_samplerate(sample_rate, mic)
mic_name = str(mic["name"]) if mic is not None and "name" in mic else f"device {idx}"
STATUS.show(f"INITIALIZING MIC: {mic_name} @ {rate} Hz")
tmp = tempfile.NamedTemporaryFile(prefix="antigravity_phrase_", suffix=".wav", delete=False)
tmp.close()
wav_path = Path(tmp.name)
try:
stats = record_phrase_to_wav(
wav_path,
device=idx,
sample_rate=rate,
start_threshold=DEFAULT_START_THRESHOLD,
stop_threshold=DEFAULT_STOP_THRESHOLD,
min_speech_seconds=DEFAULT_MIN_SPEECH_SECONDS,
start_threshold=start_threshold,
stop_threshold=stop_threshold,
min_speech_seconds=min_speech_seconds,
silence_seconds=DEFAULT_SILENCE_SECONDS,
preroll_seconds=DEFAULT_PREROLL_SECONDS,
max_record_seconds=DEFAULT_MAX_RECORD_SECONDS,
block_ms=DEFAULT_BLOCK_MS,
allow_empty=allow_no_speech,
)
if stats is None:
return ""
text = transcribe_file(wav_path, mode=mode)
if not no_uk_spelling:
text = convert_us_to_uk_orthography_oneliner(text)
text = text.strip()
if not text:
STATUS.clear()
return ""
emit_text_at_cursor(text, paste=paste)
STATUS.done("DONE")
print(json.dumps({
"text": text,
"microphone": mic,
"sample_rate": rate,
"stats": stats,
"mode": mode,
}, ensure_ascii=False))
return text
finally:
try:
wav_path.unlink(missing_ok=True)
except Exception:
pass
def cmd_list_mics() -> int:
print(json.dumps(query_microphones(), ensure_ascii=False, indent=2))
return 0
def cmd_internal_transcribe(args: argparse.Namespace) -> int:
STATUS.set_enabled(False)
text = transcribe_file(Path(args.audio_path), mode=args.mode)
print(json.dumps({"text": text}, ensure_ascii=False))
return 0
def cmd_set_mic(device: str) -> int:
idx, mic = resolve_microphone(device)
if mic is None:
raise RuntimeError("Microphone not found.")
CONFIG.set_preferred_microphone(mic)
print(json.dumps({"preferred_microphone": mic, "config_path": str(CONFIG.path)}, ensure_ascii=False, indent=2))
return 0
def cmd_clear_mic() -> int:
CONFIG.set_preferred_microphone(None)
print(json.dumps({"preferred_microphone": None, "config_path": str(CONFIG.path)}, ensure_ascii=False, indent=2))
return 0
def cmd_once(args: argparse.Namespace) -> int:
STATUS.set_enabled(not args.quiet)
capture_and_transcribe(args.microphone, args.mode, args.sample_rate, paste=not args.type_keys, no_uk_spelling=args.no_uk_spelling)
capture_and_transcribe_once(
args.microphone,
args.mode,
args.sample_rate,
paste=not args.type_keys,
start_threshold=args.start_threshold,
stop_threshold=args.stop_threshold,
min_speech_seconds=args.min_speech_seconds,
no_uk_spelling=args.no_uk_spelling,
)
return 0
def cmd_continuous(args: argparse.Namespace) -> int:
STATUS.set_enabled(not args.quiet)
print("Always listening. Press Ctrl+C to stop.")
while True:
try:
capture_and_transcribe_once(
args.microphone,
args.mode,
args.sample_rate,
paste=not args.type_keys,
start_threshold=args.start_threshold,
stop_threshold=args.stop_threshold,
min_speech_seconds=args.min_speech_seconds,
no_uk_spelling=args.no_uk_spelling,
allow_no_speech=True,
)
except KeyboardInterrupt:
break
except Exception as e:
STATUS.show(f"ERROR: {e}")
print(file=sys.stderr)
return 0
def cmd_hotkey(args: argparse.Namespace) -> int:
import keyboard
STATUS.set_enabled(not args.quiet)
print(f"Ready. Press {args.hotkey} to capture one utterance. Press {args.quit_hotkey} to exit.")
action_queue = queue.Queue()
keyboard.add_hotkey(args.hotkey, lambda: action_queue.put("capture"))
keyboard.add_hotkey(args.quit_hotkey, lambda: action_queue.put("quit"))
while True:
try:
action = action_queue.get()
if action == "quit":
break
elif action == "capture":
try:
capture_and_transcribe(args.microphone, args.mode, args.sample_rate, paste=not args.type_keys, no_uk_spelling=args.no_uk_spelling)
capture_and_transcribe_once(
args.microphone,
args.mode,
args.sample_rate,
paste=not args.type_keys,
start_threshold=args.start_threshold,
stop_threshold=args.stop_threshold,
min_speech_seconds=args.min_speech_seconds,
no_uk_spelling=args.no_uk_spelling,
)
except Exception as e:
STATUS.show(f"ERROR: {e}")
print(file=sys.stderr)
except KeyboardInterrupt:
break
try:
keyboard.unhook_all()
except Exception:
pass
return 0
def parse_mode_alias(value: str) -> str:
val = value.strip().lower()
if val in ["fast", "quick", "parakeet"]:
return "fast"
if val in ["granite", "granite-4.1", "granite4", "granite-speech"]:
return "granite"
if val in ["high_quality", "high-quality", "hq", "good", "canary", "best"]:
return "high_quality"
raise argparse.ArgumentTypeError(f"Invalid mode alias: '{value}'. Use 'fast' or 'good/hq'.")
raise argparse.ArgumentTypeError(f"Invalid mode alias: '{value}'. Use 'fast', 'granite', or 'good/hq'.")
def add_shared_args(parser_obj: argparse.ArgumentParser) -> None:
group = parser_obj.add_argument_group("Transcription Options")
group.add_argument("--microphone", default=None, help="Explicit microphone index or name to use (overrides config).")
group.add_argument(
"--mode",
type=parse_mode_alias,
default="fast",
help="Transcription mode. For fast (Parakeet) use: 'fast', 'quick'. For high-quality (Canary) use: 'good', 'hq', 'high-quality', 'high_quality'."
help="Transcription mode. For fast (Parakeet) use: 'fast', 'quick'. For Granite 4.1 use: 'granite', 'granite-4.1'. For high-quality (Canary) use: 'good', 'hq', 'high-quality', 'high_quality'."
)
group.add_argument("--sample-rate", type=int, default=0, help="Explicit sample rate for recording (0 to use device default).")
group.add_argument("--type-keys", action="store_true", help="Type characters natively instead of using clipboard-paste.")
group.add_argument("--no-uk-spelling", action="store_true", help="Bypass US to UK orthography conversion.")
group.add_argument("--quiet", action="store_true", help="Disable runtime status messages output to stderr.")
vad_group = parser_obj.add_argument_group("Speech Detection Tuning")
vad_group.add_argument(
"--start-threshold",
type=float,
default=DEFAULT_START_THRESHOLD,
help="RMS threshold required to start speech detection. Raise slightly to make triggering less sensitive.",
)
vad_group.add_argument(
"--stop-threshold",
type=float,
default=DEFAULT_STOP_THRESHOLD,
help="RMS threshold below which speech is treated as silence after capture has started.",
)
vad_group.add_argument(
"--min-speech-seconds",
type=float,
default=DEFAULT_MIN_SPEECH_SECONDS,
help="Minimum sustained speech duration required before capture starts. Raise slightly to reduce accidental triggers.",
)
def build_parser() -> argparse.ArgumentParser:
examples = (
"Examples:\n"
" python stt.py\n"
" python stt.py --start-threshold 0.020\n"
" python stt.py --start-threshold 0.020 --min-speech-seconds 0.24\n"
" python stt.py hotkey --start-threshold 0.018\n"
"\n"
"To make detection slightly less sensitive, try raising `--start-threshold` a bit\n"
"from 0.015 to 0.018 or 0.020, and optionally raise `--min-speech-seconds`\n"
"from 0.18 to 0.22 or 0.24."
)
parser = argparse.ArgumentParser(
description="Local microphone STT that types/pastes transcript at the active cursor.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
description="Local microphone STT that types/pastes transcript at the active cursor. If no command is specified, continuous mode is used.",
formatter_class=HelpFormatter,
epilog=examples,
)
add_shared_args(parser)
sub = parser.add_subparsers(dest="cmd", required=True, title="Commands")
sub = parser.add_subparsers(
dest="cmd",
required=False,
title="Commands",
metavar="{list-mics,set-mic,clear-mic,once,continuous,hotkey}",
)
sub.add_parser("list-mics", help="List all available microphones and their indices.")
p_set = sub.add_parser("set-mic", help="Set the preferred default microphone by index or name.")
p_set.add_argument("device", help="The index or substring name of the microphone to set.")
sub.add_parser("clear-mic", help="Clear the preferred microphone configuration.")
p_once = sub.add_parser("once", help="Record and transcribe a single utterance.", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
p_once = sub.add_parser("once", help="Record and transcribe a single utterance.", formatter_class=HelpFormatter)
add_shared_args(p_once)
p_hotkey = sub.add_parser("hotkey", help="Run in the background and listen for hotkeys to trigger recording.", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
p_continuous = sub.add_parser("continuous", help="Continuously listen for speech and transcribe each utterance.", formatter_class=HelpFormatter)
add_shared_args(p_continuous)
p_hotkey = sub.add_parser("hotkey", help="Run in the background and listen for hotkeys to trigger recording.", formatter_class=HelpFormatter)
add_shared_args(p_hotkey)
p_hotkey.add_argument("--hotkey", default="ctrl+alt+space", help="Keyboard shortcut to trigger recording.")
p_hotkey.add_argument("--quit-hotkey", default="ctrl+alt+q", help="Keyboard shortcut to terminate the listener loop.")
return parser
def build_internal_transcribe_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--audio-path", required=True)
parser.add_argument("--mode", type=parse_mode_alias, required=True)
return parser
def main() -> int:
argv = sys.argv[1:]
if argv and argv[0] == "internal-transcribe":
args = build_internal_transcribe_parser().parse_args(argv[1:])
return cmd_internal_transcribe(args)
parser = build_parser()
args = parser.parse_known_args()[0]
args = parser.parse_known_args(argv)[0]
if args.cmd is None:
args.cmd = "continuous"
if args.cmd == "list-mics":
return cmd_list_mics()
if args.cmd == "set-mic":
return cmd_set_mic(args.device)
if args.cmd == "clear-mic":
return cmd_clear_mic()
if args.cmd == "once":
return cmd_once(args)
if args.cmd == "continuous":
return cmd_continuous(args)
if args.cmd == "hotkey":
return cmd_hotkey(args)
raise RuntimeError(f"Unsupported command: {args.cmd}")
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
sys.exit(0)
@twobob
Copy link
Copy Markdown
Author

twobob commented Mar 21, 2026

canary
python .\stt.py hotkey --mode high_quality

regular
python .\stt.py hotkey

@twobob
Copy link
Copy Markdown
Author

twobob commented Apr 12, 2026

powershell setup_complete_pytorch_stack_128_270_313.ps1

conda activate 128_270_313

@twobob
Copy link
Copy Markdown
Author

twobob commented May 8, 2026

Added granite support
can now be called like

C:\Users\new\Miniconda3\envs\128_270_313\python.exe ./stt.py hotkey --mode granite
or C:\Users\new\Miniconda3\envs\128_270_313\python.exe ./stt.py --mode granite (for continuous listening)

etc

@twobob
Copy link
Copy Markdown
Author

twobob commented May 8, 2026

usage: stt.py hotkey [-h] [--microphone MICROPHONE] [--mode MODE] [--sample-rate SAMPLE_RATE] [--type-keys]
[--no-uk-spelling] [--quiet] [--start-threshold START_THRESHOLD]
[--stop-threshold STOP_THRESHOLD] [--min-speech-seconds MIN_SPEECH_SECONDS] [--hotkey HOTKEY]
[--quit-hotkey QUIT_HOTKEY]

options:
-h, --help show this help message and exit
--hotkey HOTKEY Keyboard shortcut to trigger recording. (default: ctrl+alt+space)
--quit-hotkey QUIT_HOTKEY
Keyboard shortcut to terminate the listener loop. (default: ctrl+alt+q)

Transcription Options:
--microphone MICROPHONE
Explicit microphone index or name to use (overrides config). (default: None)
--mode MODE Transcription mode. For fast (Parakeet) use: 'fast', 'quick'. For Granite 4.1 use: 'granite',
'granite-4.1'. For high-quality (Canary) use: 'good', 'hq', 'high-quality', 'high_quality'.
(default: fast)
--sample-rate SAMPLE_RATE
Explicit sample rate for recording (0 to use device default). (default: 0)
--type-keys Type characters natively instead of using clipboard-paste. (default: False)
--no-uk-spelling Bypass US to UK orthography conversion. (default: False)
--quiet Disable runtime status messages output to stderr. (default: False)

Speech Detection Tuning:
--start-threshold START_THRESHOLD
RMS threshold required to start speech detection. Raise slightly to make triggering less
sensitive. (default: 0.015)
--stop-threshold STOP_THRESHOLD
RMS threshold below which speech is treated as silence after capture has started. (default:
0.01)
--min-speech-seconds MIN_SPEECH_SECONDS
Minimum sustained speech duration required before capture starts. Raise slightly to reduce
accidental triggers. (default: 0.18)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment