Last active
May 8, 2026 13:41
-
-
Save twobob/e8069711d5357ccebc685ebca53c5435 to your computer and use it in GitHub Desktop.
stt using parakeet and optionally canary or granite
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # setup_complete_pytorch_stack_128_270_313.ps1 | |
| $EnvName = "128_270_313" | |
| $InstallDir = "$env:USERPROFILE\Miniconda3" | |
| $CondaPath = "$InstallDir\Scripts\conda.exe" | |
| Write-Host "1. Downloading Miniconda..." | |
| Invoke-WebRequest -Uri "https://repo.anaconda.com/miniconda/Miniconda3-latest-Windows-x86_64.exe" -OutFile "miniconda_installer.exe" | |
| Write-Host "2. Installing Miniconda..." | |
| Start-Process -FilePath ".\miniconda_installer.exe" -ArgumentList "/InstallationType=JustMe /RegisterPython=0 /S /D=$InstallDir" -Wait | |
| Write-Host "3. Initializing shell profiles..." | |
| & $CondaPath init powershell | |
| & $CondaPath init cmd.exe | |
| Remove-Item ".\miniconda_installer.exe" | |
| Write-Host "4. Accepting Anaconda Terms of Service..." | |
| & $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/main | |
| & $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/r | |
| & $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/msys2 | |
| Write-Host "5. Provisioning Python 3.13 environment ('$EnvName')..." | |
| & $CondaPath create --name $EnvName python=3.13 -y | |
| Write-Host "6. Installing PyTorch 2.7.0 and NVIDIA CUDA 12.8 toolkit via PIP wheel index... this make take a while, be patient" | |
| & $CondaPath run -n $EnvName pip install torch==2.7.0 torchvision==0.22.0 torchaudio==2.7.0 --index-url https://download.pytorch.org/whl/cu128 | |
| Write-Host "7. Installing editdistance..." | |
| & $CondaPath install --name $EnvName editdistance -y | |
| Write-Host "8. installing extras, one sec" | |
| & $CondaPath run -n $EnvName pip install nemo_toolkit["asr"] keyboard sounddevice soundfile pyperclip | |
| Write-Host "Operation Complete. Restart your terminal and run 'conda activate $EnvName' to begin development." |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import queue | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import threading | |
| import time | |
| import re | |
| from functools import reduce | |
| from pathlib import Path | |
| from typing import Any | |
| DEFAULT_PARAKEET_MODEL = os.environ.get("PARAKEET_MODEL", "nvidia/parakeet-tdt-0.6b-v3") | |
| DEFAULT_CANARY_MODEL = os.environ.get("CANARY_MODEL", "nvidia/canary-qwen-2.5b") | |
| DEFAULT_GRANITE_MODEL = os.environ.get("GRANITE_MODEL", "ibm-granite/granite-speech-4.1-2b") | |
| DEFAULT_MODEL_ENV_PREFIX = Path( | |
| os.environ.get("ANTIGRAVITY_MODEL_ENV_PREFIX", r"C:\Users\new\Miniconda3\envs\128_270_313") | |
| ).expanduser() | |
| class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter): | |
| pass | |
| DEFAULT_DEVICE = os.environ.get("TRANSCRIBE_DEVICE", "auto").strip().lower() | |
| DEFAULT_DTYPE = os.environ.get("TRANSCRIBE_DTYPE", "auto").strip().lower() | |
| DEFAULT_SAMPLE_RATE = int(os.environ.get("MIC_SAMPLE_RATE", "16000")) | |
| DEFAULT_START_THRESHOLD = float(os.environ.get("MIC_START_THRESHOLD", "0.015")) | |
| DEFAULT_STOP_THRESHOLD = float(os.environ.get("MIC_STOP_THRESHOLD", "0.010")) | |
| DEFAULT_MIN_SPEECH_SECONDS = float(os.environ.get("MIC_MIN_SPEECH_SECONDS", "0.18")) | |
| DEFAULT_SILENCE_SECONDS = float(os.environ.get("MIC_SILENCE_SECONDS", "0.85")) | |
| DEFAULT_PREROLL_SECONDS = float(os.environ.get("MIC_PREROLL_SECONDS", "0.35")) | |
| DEFAULT_MAX_RECORD_SECONDS = float(os.environ.get("MIC_MAX_RECORD_SECONDS", "60.0")) | |
| DEFAULT_BLOCK_MS = int(os.environ.get("MIC_BLOCK_MS", "30")) | |
| DEFAULT_CONFIG_PATH = Path( | |
| os.environ.get( | |
| "ANTIGRAVITY_STT_CONFIG", | |
| str(Path.home() / ".config" / "antigravity" / "stt_config.json") if os.name != "nt" | |
| else str(Path.home() / "AppData" / "Local" / "Antigravity" / "stt_config.json"), | |
| ) | |
| ).expanduser() | |
| class StatusReporter: | |
| def __init__(self, enabled: bool = True) -> None: | |
| self.enabled = enabled | |
| self._lock = threading.RLock() | |
| self._last_len = 0 | |
| def set_enabled(self, enabled: bool) -> None: | |
| with self._lock: | |
| self.enabled = enabled | |
| def show(self, message: str) -> None: | |
| with self._lock: | |
| if not self.enabled: | |
| return | |
| line = f"[STT] {message}" | |
| padded = line | |
| if self._last_len > len(line): | |
| padded = line + (" " * (self._last_len - len(line))) | |
| print(f"\r{padded}", file=sys.stderr, end="", flush=True) | |
| self._last_len = len(line) | |
| def clear(self) -> None: | |
| with self._lock: | |
| if not self.enabled: | |
| return | |
| if self._last_len > 0: | |
| print("\r" + (" " * (self._last_len + 6)) + "\r", file=sys.stderr, end="", flush=True) | |
| self._last_len = 0 | |
| def done(self, message: str) -> None: | |
| with self._lock: | |
| if not self.enabled: | |
| return | |
| self.show(message) | |
| print(file=sys.stderr, flush=True) | |
| self._last_len = 0 | |
| STATUS = StatusReporter(enabled=True) | |
| class ConfigManager: | |
| def __init__(self, path: Path) -> None: | |
| self.path = path | |
| self._lock = threading.RLock() | |
| def load(self) -> dict[str, Any]: | |
| with self._lock: | |
| if not self.path.exists(): | |
| return {} | |
| try: | |
| return json.loads(self.path.read_text(encoding="utf-8")) | |
| except Exception: | |
| return {} | |
| def save(self, data: dict[str, Any]) -> None: | |
| with self._lock: | |
| self.path.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = self.path.with_suffix(self.path.suffix + ".tmp") | |
| tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | |
| tmp.replace(self.path) | |
| def get_preferred_microphone(self) -> dict[str, Any] | None: | |
| item = self.load().get("preferred_microphone") | |
| return item if isinstance(item, dict) else None | |
| def set_preferred_microphone(self, microphone: dict[str, Any] | None) -> None: | |
| data = self.load() | |
| if microphone is None: | |
| data.pop("preferred_microphone", None) | |
| else: | |
| data["preferred_microphone"] = microphone | |
| self.save(data) | |
| CONFIG = ConfigManager(DEFAULT_CONFIG_PATH) | |
| class ModelManager: | |
| def __init__(self) -> None: | |
| self._lock = threading.RLock() | |
| self._parakeet = None | |
| self._parakeet_name: str | None = None | |
| self._canary = None | |
| self._canary_name: str | None = None | |
| self._granite = None | |
| self._granite_processor = None | |
| self._granite_name: str | None = None | |
| def _torch(self): | |
| import torch | |
| return torch | |
| def resolve_device(self) -> str: | |
| torch = self._torch() | |
| if DEFAULT_DEVICE == "auto": | |
| return "cuda" if torch.cuda.is_available() else "cpu" | |
| return DEFAULT_DEVICE | |
| def resolve_dtype(self): | |
| torch = self._torch() | |
| if DEFAULT_DTYPE == "auto": | |
| return torch.float16 if self.resolve_device() == "cuda" else torch.float32 | |
| return { | |
| "float16": torch.float16, | |
| "float32": torch.float32, | |
| "bfloat16": torch.bfloat16, | |
| }[DEFAULT_DTYPE] | |
| def get_parakeet(self, model_name: str): | |
| with self._lock: | |
| if self._parakeet is not None and self._parakeet_name == model_name: | |
| return self._parakeet | |
| STATUS.show(f"DOWNLOADING / LOADING PARAKEET: {model_name}") | |
| import nemo.collections.asr as nemo_asr | |
| model = nemo_asr.models.ASRModel.from_pretrained(model_name=model_name) | |
| if self.resolve_device() == "cuda": | |
| model = model.cuda() | |
| model.eval() | |
| self._parakeet = model | |
| self._parakeet_name = model_name | |
| STATUS.show(f"PARAKEET READY: {model_name}") | |
| return model | |
| def get_canary(self, model_name: str): | |
| with self._lock: | |
| if self._canary is not None and self._canary_name == model_name: | |
| return self._canary | |
| STATUS.show(f"DOWNLOADING / LOADING CANARY: {model_name}") | |
| from nemo.collections.speechlm2.models import SALM | |
| model = SALM.from_pretrained(model_name) | |
| try: | |
| if self.resolve_device() == "cuda": | |
| model = model.cuda() | |
| if hasattr(model, "to"): | |
| model = model.to(dtype=self.resolve_dtype()) | |
| model.eval() | |
| except Exception: | |
| pass | |
| self._canary = model | |
| self._canary_name = model_name | |
| STATUS.show(f"CANARY READY: {model_name}") | |
| return model | |
| def get_granite(self, model_name: str): | |
| with self._lock: | |
| if self._granite is not None and self._granite_processor is not None and self._granite_name == model_name: | |
| return self._granite_processor, self._granite | |
| STATUS.show(f"DOWNLOADING / LOADING GRANITE: {model_name}") | |
| from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
| model_name, | |
| torch_dtype=self.resolve_dtype(), | |
| ) | |
| if hasattr(model, "to"): | |
| model = model.to(self.resolve_device()) | |
| if hasattr(model, "eval"): | |
| model.eval() | |
| processor = AutoProcessor.from_pretrained(model_name) | |
| self._granite = model | |
| self._granite_processor = processor | |
| self._granite_name = model_name | |
| STATUS.show(f"GRANITE READY: {model_name}") | |
| return processor, model | |
| MODELS = ModelManager() | |
| def get_model_env_python(env_prefix: Path | None = None) -> Path | None: | |
| prefix = (env_prefix or DEFAULT_MODEL_ENV_PREFIX).expanduser() | |
| python_name = "python.exe" if os.name == "nt" else "bin/python" | |
| candidate = prefix / python_name | |
| return candidate if candidate.exists() else None | |
| def is_running_in_model_env(env_prefix: Path | None = None) -> bool: | |
| prefix = (env_prefix or DEFAULT_MODEL_ENV_PREFIX).expanduser() | |
| try: | |
| executable = Path(sys.executable).resolve() | |
| prefix = prefix.resolve() | |
| return os.path.commonpath([str(executable), str(prefix)]) == str(prefix) | |
| except Exception: | |
| return False | |
| def should_delegate_model_inference(mode: str) -> bool: | |
| if os.environ.get("ANTIGRAVITY_MODEL_ENV_ACTIVE") == "1": | |
| return False | |
| model_python = get_model_env_python() | |
| if model_python is None or is_running_in_model_env(): | |
| return False | |
| return mode in {"fast", "granite", "high_quality"} | |
| def run_transcription_in_model_env(audio_path: Path, mode: str) -> str: | |
| model_python = get_model_env_python() | |
| if model_python is None: | |
| raise RuntimeError(f"Model environment Python not found under: {DEFAULT_MODEL_ENV_PREFIX}") | |
| env = os.environ.copy() | |
| env["ANTIGRAVITY_MODEL_ENV_ACTIVE"] = "1" | |
| env["PYTHONIOENCODING"] = "utf-8" | |
| proc = subprocess.run( | |
| [ | |
| str(model_python), | |
| str(Path(__file__).resolve()), | |
| "internal-transcribe", | |
| "--audio-path", | |
| str(audio_path), | |
| "--mode", | |
| mode, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| env=env, | |
| ) | |
| if proc.returncode != 0: | |
| stderr = (proc.stderr or "").strip() | |
| stdout = (proc.stdout or "").strip() | |
| detail = stderr or stdout or f"exit code {proc.returncode}" | |
| raise RuntimeError(f"Model env transcription failed: {detail}") | |
| lines = [line.strip() for line in (proc.stdout or "").splitlines() if line.strip()] | |
| if not lines: | |
| raise RuntimeError("Model env transcription produced no output.") | |
| try: | |
| payload = json.loads(lines[-1]) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError(f"Model env transcription returned invalid JSON: {exc}") from exc | |
| text = payload.get("text") | |
| if not isinstance(text, str): | |
| raise RuntimeError("Model env transcription response did not include text.") | |
| return text.strip() | |
| def query_microphones() -> dict[str, Any]: | |
| import sounddevice as sd | |
| devices = sd.query_devices() | |
| try: | |
| default_input_index = sd.default.device[0] | |
| except Exception: | |
| default_input_index = None | |
| result = [] | |
| for idx, raw in enumerate(devices): | |
| rec = { | |
| "index": int(idx), | |
| "name": str(raw.get("name", f"Input {idx}")), | |
| "max_input_channels": int(raw.get("max_input_channels", 0) or 0), | |
| "default_samplerate": float(raw.get("default_samplerate", 0.0) or 0.0), | |
| "is_default_input": default_input_index is not None and int(idx) == int(default_input_index), | |
| } | |
| if rec["max_input_channels"] > 0: | |
| result.append(rec) | |
| return {"devices": result, "default_input_index": default_input_index} | |
| def resolve_microphone(device: int | str | None) -> tuple[int | None, dict[str, Any] | None]: | |
| info = query_microphones() | |
| if device is None: | |
| preferred = CONFIG.get_preferred_microphone() | |
| if preferred is not None and isinstance(preferred.get("index"), int): | |
| for mic in info["devices"]: | |
| if int(mic["index"]) == int(preferred["index"]): | |
| return int(mic["index"]), mic | |
| default_index = info["default_input_index"] | |
| if default_index is None: | |
| return None, None | |
| for mic in info["devices"]: | |
| if int(mic["index"]) == int(default_index): | |
| return int(mic["index"]), mic | |
| return int(default_index), None | |
| if isinstance(device, int) or (isinstance(device, str) and device.isdigit()): | |
| idx = int(device) | |
| for mic in info["devices"]: | |
| if int(mic["index"]) == idx: | |
| return idx, mic | |
| raise ValueError(f"Microphone index not found: {idx}") | |
| target = str(device).strip().lower() | |
| exact = None | |
| partial = None | |
| for mic in info["devices"]: | |
| name = str(mic["name"]).lower() | |
| if name == target: | |
| exact = mic | |
| break | |
| if target in name and partial is None: | |
| partial = mic | |
| match = exact or partial | |
| if match is None: | |
| raise ValueError(f"Microphone name not found: {device}") | |
| return int(match["index"]), match | |
| def pick_record_samplerate(requested: int, mic: dict[str, Any] | None) -> int: | |
| if requested > 0: | |
| return requested | |
| if mic is not None: | |
| rate = int(float(mic.get("default_samplerate") or 0.0)) | |
| if rate > 0: | |
| return rate | |
| return DEFAULT_SAMPLE_RATE | |
| def record_phrase_to_wav( | |
| output_path: Path, | |
| device: int | None, | |
| sample_rate: int, | |
| start_threshold: float, | |
| stop_threshold: float, | |
| min_speech_seconds: float, | |
| silence_seconds: float, | |
| preroll_seconds: float, | |
| max_record_seconds: float, | |
| block_ms: int, | |
| ) -> dict[str, Any]: | |
| allow_empty: bool = False, | |
| ) -> dict[str, Any] | None: | |
| import collections | |
| import numpy as np | |
| import sounddevice as sd | |
| import soundfile as sf | |
| block_frames = max(1, int(sample_rate * (block_ms / 1000.0))) | |
| preroll_blocks = max(1, int(round(preroll_seconds * sample_rate / block_frames))) | |
| min_speech_blocks = max(1, int(round(min_speech_seconds * sample_rate / block_frames))) | |
| silence_blocks_to_stop = max(1, int(round(silence_seconds * sample_rate / block_frames))) | |
| max_blocks = max(1, int(round(max_record_seconds * sample_rate / block_frames))) | |
| q: queue.Queue[Any] = queue.Queue() | |
| preroll = collections.deque(maxlen=preroll_blocks) | |
| utterance: list[np.ndarray] = [] | |
| speech_started = False | |
| speech_count = 0 | |
| silence_count = 0 | |
| total_blocks = 0 | |
| peak_rms = 0.0 | |
| def callback(indata, frames, time_info, status): | |
| q.put(indata.copy()) | |
| STATUS.show("LISTENING") | |
| t0 = time.perf_counter() | |
| with sd.InputStream( | |
| samplerate=sample_rate, | |
| channels=1, | |
| dtype="float32", | |
| blocksize=block_frames, | |
| callback=callback, | |
| device=device, | |
| ): | |
| while True: | |
| block = q.get() | |
| total_blocks += 1 | |
| mono = block[:, 0] if block.ndim > 1 else block | |
| rms = float((mono.astype("float64") ** 2).mean() ** 0.5) | |
| peak_rms = max(peak_rms, rms) | |
| if not speech_started: | |
| preroll.append(block) | |
| if rms >= start_threshold: | |
| speech_count += 1 | |
| else: | |
| speech_count = 0 | |
| if speech_count >= min_speech_blocks: | |
| speech_started = True | |
| STATUS.show("SPEECH DETECTED") | |
| utterance.extend(list(preroll)) | |
| utterance.append(block) | |
| silence_count = 0 | |
| else: | |
| utterance.append(block) | |
| if rms <= stop_threshold: | |
| silence_count += 1 | |
| else: | |
| silence_count = 0 | |
| if silence_count >= silence_blocks_to_stop: | |
| break | |
| if total_blocks >= max_blocks: | |
| break | |
| t1 = time.perf_counter() | |
| if not utterance: | |
| if allow_empty: | |
| return None | |
| raise RuntimeError("No speech detected.") | |
| audio = np.concatenate(utterance, axis=0).astype("float32", copy=False) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| sf.write(str(output_path), audio, sample_rate, subtype="PCM_16") | |
| t2 = time.perf_counter() | |
| return { | |
| "capture_ms": round((t1 - t0) * 1000.0, 3), | |
| "write_wav_ms": round((t2 - t1) * 1000.0, 3), | |
| "total_capture_ms": round((t2 - t0) * 1000.0, 3), | |
| "peak_rms": round(peak_rms, 6), | |
| } | |
| def transcribe_file(audio_path: Path, mode: str) -> str: | |
| STATUS.show(f"TRANSCRIBING ({mode})") | |
| if should_delegate_model_inference(mode): | |
| STATUS.show(f"TRANSCRIBING ({mode} via env)") | |
| return run_transcription_in_model_env(audio_path, mode) | |
| if mode == "fast": | |
| model = MODELS.get_parakeet(DEFAULT_PARAKEET_MODEL) | |
| try: | |
| result = model.transcribe( | |
| [str(audio_path)], | |
| batch_size=1, | |
| verbose=False, | |
| return_hypotheses=True, | |
| ) | |
| except TypeError: | |
| result = model.transcribe( | |
| [str(audio_path)], | |
| batch_size=1, | |
| verbose=False, | |
| ) | |
| if isinstance(result, tuple): | |
| result = result[0] | |
| if not isinstance(result, list): | |
| result = [result] | |
| hyp = result[0] if result else None | |
| if hasattr(hyp, "text"): | |
| return str(hyp.text or "").strip() | |
| return str(hyp or "").strip() | |
| if mode == "granite": | |
| import numpy as np | |
| import soundfile as sf | |
| processor, model = MODELS.get_granite(DEFAULT_GRANITE_MODEL) | |
| tokenizer = getattr(processor, "tokenizer", processor) | |
| audio, sample_rate = sf.read(str(audio_path), dtype="float32", always_2d=False) | |
| audio = np.asarray(audio, dtype="float32") | |
| if audio.ndim > 1: | |
| audio = audio.mean(axis=1) | |
| if int(sample_rate) != 16000: | |
| src_positions = np.linspace(0.0, 1.0, num=max(1, audio.shape[0]), endpoint=False) | |
| dst_length = max(1, int(round(audio.shape[0] * 16000 / int(sample_rate)))) | |
| dst_positions = np.linspace(0.0, 1.0, num=dst_length, endpoint=False) | |
| audio = np.interp(dst_positions, src_positions, audio).astype("float32", copy=False) | |
| sample_rate = 16000 | |
| prompt_text = "<|audio|>transcribe the speech with proper punctuation and capitalization." | |
| if hasattr(tokenizer, "apply_chat_template"): | |
| prompt_text = tokenizer.apply_chat_template( | |
| [{"role": "user", "content": prompt_text}], | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| ) | |
| model_inputs = processor(prompt_text, audio, return_tensors="pt") | |
| if hasattr(model_inputs, "to"): | |
| model_inputs = model_inputs.to(MODELS.resolve_device()) | |
| model_outputs = model.generate( | |
| **model_inputs, | |
| max_new_tokens=200, | |
| do_sample=False, | |
| num_beams=1, | |
| ) | |
| num_input_tokens = model_inputs["input_ids"].shape[-1] if "input_ids" in model_inputs else 0 | |
| new_tokens = model_outputs[:, num_input_tokens:] if num_input_tokens else model_outputs | |
| if hasattr(tokenizer, "batch_decode"): | |
| return str( | |
| tokenizer.batch_decode( | |
| new_tokens, | |
| add_special_tokens=False, | |
| skip_special_tokens=True, | |
| )[0] | |
| ).strip() | |
| return str(new_tokens).strip() | |
| model = MODELS.get_canary(DEFAULT_CANARY_MODEL) | |
| audio_locator = getattr(model, "audio_locator_tag", "<|audioplaceholder|>") | |
| audio_str = str(audio_path) | |
| # Exact schema mandated by NeMo SALM documentation | |
| prompts = [ | |
| [ | |
| { | |
| "role": "user", | |
| "content": f"Transcribe the following: {audio_locator}", | |
| "audio": [audio_str] | |
| } | |
| ] | |
| ] | |
| try: | |
| answer_ids = model.generate(prompts=prompts, max_new_tokens=1024) | |
| except Exception as exc: | |
| raise RuntimeError(f"Canary generation failed: {exc}") | |
| try: | |
| if hasattr(answer_ids, "cpu"): | |
| tokens = answer_ids.cpu().tolist() | |
| else: | |
| tokens = answer_ids | |
| # Flatten nested lists sequentially | |
| while isinstance(tokens, list) and len(tokens) > 0 and isinstance(tokens[0], list): | |
| tokens = tokens[0] | |
| if hasattr(model.tokenizer, "decode"): | |
| text_out = model.tokenizer.decode(tokens, skip_special_tokens=True) | |
| elif hasattr(model.tokenizer, "ids_to_text"): | |
| text_out = model.tokenizer.ids_to_text(tokens) | |
| else: | |
| text_out = str(tokens) | |
| except Exception as exc: | |
| raise RuntimeError(f"Failed to decode Canary output: {exc}") | |
| # Strip Qwen ChatML artifacts | |
| if "<|im_start|>assistant" in text_out: | |
| text_out = text_out.split("<|im_start|>assistant")[-1] | |
| text_out = text_out.replace("<|im_end|>", "").replace("<|im_start|>", "").strip() | |
| return text_out | |
| def convert_us_to_uk_orthography_oneliner(text: str) -> str: | |
| """ | |
| Final optimized US->UK orthographic converter. | |
| High-coverage heuristic using functional reduction. | |
| """ | |
| return reduce(lambda t, rule: re.sub(rule[0], lambda m: (lambda orig, exp: exp.upper() if orig.isupper() else (exp[0].upper() + exp[1:] if orig[0].isupper() else exp.lower()))(m.group(0), m.expand(rule[1])), t, flags=re.IGNORECASE), [ | |
| (r"\b(\w+)yz(e|es|ed|ing)\b", r"\1ys\2"), | |
| (r"\b(?!(?:size|prize|capsize|seize|maize|assize|glaze|gaze|raze|doze|blaze)\b)(\w+)iz(e|es|ed|ing)\b", r"\1is\2"), | |
| (r"\b(?!(?:actor|author|doctor|error|motor|sponsor|mirror|major|minor|sensor|factor|prior|mayor|senator|governor|chancellor|successor|vendor|visitor|terror|honorary)\b)(\w{2,})or(s|)\b", r"\1our\2"), | |
| (r"\b(\w*[aeiou])l(ed|ing|er|ers)\b", r"\1ll\2"), | |
| (r"\b(cent|met|theat|lit|fib|sombr|meagr|calibr|lust|spect|sepulch)er(s|)\b", r"\1re\2"), | |
| (r"\b(\w+)(log|gog)(s|)\b", r"\1\2ue\3"), | |
| (r"\b(def|off|pret)ense(s|)\b", r"\1ence\2"), | |
| (r"\b(an|p|orthop|gyn|leuk|an|arch|encyclop|h)e(m|diatr|d|col|sthes|ol|matol)", r"\1ae\2"), | |
| (r"\b(estrogen|esophagus|edema)\b", r"o\1"), | |
| (r"\bmaneuver(s|ed|ing|)\b", r"manoeuvre\1"), | |
| (r"\baluminum\b", "aluminium"), | |
| (r"\bcheck(s|)\b", r"cheque\1"), | |
| (r"\bjewelry\b", "jewellery"), | |
| (r"\bprogram(s|)\b", r"programme\1"), | |
| (r"\bmold(s|)\b", r"mould\1"), | |
| (r"\bgray\b", "grey") | |
| ], text) | |
| def emit_text_at_cursor(text: str, paste: bool = True) -> None: | |
| if not text: | |
| return | |
| STATUS.show("PASTING") | |
| if paste: | |
| import pyperclip | |
| import keyboard | |
| pyperclip.copy(text) | |
| time.sleep(0.05) | |
| keyboard.press_and_release("ctrl+v") | |
| return | |
| import keyboard | |
| keyboard.write(text, delay=0) | |
| def capture_and_transcribe(microphone: int | str | None, mode: str, sample_rate: int, paste: bool, no_uk_spelling: bool = False) -> str: | |
| return capture_and_transcribe_once( | |
| microphone, | |
| mode, | |
| sample_rate, | |
| paste, | |
| no_uk_spelling=no_uk_spelling, | |
| allow_no_speech=False, | |
| ) | |
| def capture_and_transcribe_once( | |
| microphone: int | str | None, | |
| mode: str, | |
| sample_rate: int, | |
| paste: bool, | |
| start_threshold: float = DEFAULT_START_THRESHOLD, | |
| stop_threshold: float = DEFAULT_STOP_THRESHOLD, | |
| min_speech_seconds: float = DEFAULT_MIN_SPEECH_SECONDS, | |
| no_uk_spelling: bool = False, | |
| allow_no_speech: bool = False, | |
| ) -> str: | |
| idx, mic = resolve_microphone(microphone) | |
| rate = pick_record_samplerate(sample_rate, mic) | |
| mic_name = str(mic["name"]) if mic is not None and "name" in mic else f"device {idx}" | |
| STATUS.show(f"INITIALIZING MIC: {mic_name} @ {rate} Hz") | |
| tmp = tempfile.NamedTemporaryFile(prefix="antigravity_phrase_", suffix=".wav", delete=False) | |
| tmp.close() | |
| wav_path = Path(tmp.name) | |
| try: | |
| stats = record_phrase_to_wav( | |
| wav_path, | |
| device=idx, | |
| sample_rate=rate, | |
| start_threshold=DEFAULT_START_THRESHOLD, | |
| stop_threshold=DEFAULT_STOP_THRESHOLD, | |
| min_speech_seconds=DEFAULT_MIN_SPEECH_SECONDS, | |
| start_threshold=start_threshold, | |
| stop_threshold=stop_threshold, | |
| min_speech_seconds=min_speech_seconds, | |
| silence_seconds=DEFAULT_SILENCE_SECONDS, | |
| preroll_seconds=DEFAULT_PREROLL_SECONDS, | |
| max_record_seconds=DEFAULT_MAX_RECORD_SECONDS, | |
| block_ms=DEFAULT_BLOCK_MS, | |
| allow_empty=allow_no_speech, | |
| ) | |
| if stats is None: | |
| return "" | |
| text = transcribe_file(wav_path, mode=mode) | |
| if not no_uk_spelling: | |
| text = convert_us_to_uk_orthography_oneliner(text) | |
| text = text.strip() | |
| if not text: | |
| STATUS.clear() | |
| return "" | |
| emit_text_at_cursor(text, paste=paste) | |
| STATUS.done("DONE") | |
| print(json.dumps({ | |
| "text": text, | |
| "microphone": mic, | |
| "sample_rate": rate, | |
| "stats": stats, | |
| "mode": mode, | |
| }, ensure_ascii=False)) | |
| return text | |
| finally: | |
| try: | |
| wav_path.unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| def cmd_list_mics() -> int: | |
| print(json.dumps(query_microphones(), ensure_ascii=False, indent=2)) | |
| return 0 | |
| def cmd_internal_transcribe(args: argparse.Namespace) -> int: | |
| STATUS.set_enabled(False) | |
| text = transcribe_file(Path(args.audio_path), mode=args.mode) | |
| print(json.dumps({"text": text}, ensure_ascii=False)) | |
| return 0 | |
| def cmd_set_mic(device: str) -> int: | |
| idx, mic = resolve_microphone(device) | |
| if mic is None: | |
| raise RuntimeError("Microphone not found.") | |
| CONFIG.set_preferred_microphone(mic) | |
| print(json.dumps({"preferred_microphone": mic, "config_path": str(CONFIG.path)}, ensure_ascii=False, indent=2)) | |
| return 0 | |
| def cmd_clear_mic() -> int: | |
| CONFIG.set_preferred_microphone(None) | |
| print(json.dumps({"preferred_microphone": None, "config_path": str(CONFIG.path)}, ensure_ascii=False, indent=2)) | |
| return 0 | |
| def cmd_once(args: argparse.Namespace) -> int: | |
| STATUS.set_enabled(not args.quiet) | |
| capture_and_transcribe(args.microphone, args.mode, args.sample_rate, paste=not args.type_keys, no_uk_spelling=args.no_uk_spelling) | |
| capture_and_transcribe_once( | |
| args.microphone, | |
| args.mode, | |
| args.sample_rate, | |
| paste=not args.type_keys, | |
| start_threshold=args.start_threshold, | |
| stop_threshold=args.stop_threshold, | |
| min_speech_seconds=args.min_speech_seconds, | |
| no_uk_spelling=args.no_uk_spelling, | |
| ) | |
| return 0 | |
| def cmd_continuous(args: argparse.Namespace) -> int: | |
| STATUS.set_enabled(not args.quiet) | |
| print("Always listening. Press Ctrl+C to stop.") | |
| while True: | |
| try: | |
| capture_and_transcribe_once( | |
| args.microphone, | |
| args.mode, | |
| args.sample_rate, | |
| paste=not args.type_keys, | |
| start_threshold=args.start_threshold, | |
| stop_threshold=args.stop_threshold, | |
| min_speech_seconds=args.min_speech_seconds, | |
| no_uk_spelling=args.no_uk_spelling, | |
| allow_no_speech=True, | |
| ) | |
| except KeyboardInterrupt: | |
| break | |
| except Exception as e: | |
| STATUS.show(f"ERROR: {e}") | |
| print(file=sys.stderr) | |
| return 0 | |
| def cmd_hotkey(args: argparse.Namespace) -> int: | |
| import keyboard | |
| STATUS.set_enabled(not args.quiet) | |
| print(f"Ready. Press {args.hotkey} to capture one utterance. Press {args.quit_hotkey} to exit.") | |
| action_queue = queue.Queue() | |
| keyboard.add_hotkey(args.hotkey, lambda: action_queue.put("capture")) | |
| keyboard.add_hotkey(args.quit_hotkey, lambda: action_queue.put("quit")) | |
| while True: | |
| try: | |
| action = action_queue.get() | |
| if action == "quit": | |
| break | |
| elif action == "capture": | |
| try: | |
| capture_and_transcribe(args.microphone, args.mode, args.sample_rate, paste=not args.type_keys, no_uk_spelling=args.no_uk_spelling) | |
| capture_and_transcribe_once( | |
| args.microphone, | |
| args.mode, | |
| args.sample_rate, | |
| paste=not args.type_keys, | |
| start_threshold=args.start_threshold, | |
| stop_threshold=args.stop_threshold, | |
| min_speech_seconds=args.min_speech_seconds, | |
| no_uk_spelling=args.no_uk_spelling, | |
| ) | |
| except Exception as e: | |
| STATUS.show(f"ERROR: {e}") | |
| print(file=sys.stderr) | |
| except KeyboardInterrupt: | |
| break | |
| try: | |
| keyboard.unhook_all() | |
| except Exception: | |
| pass | |
| return 0 | |
| def parse_mode_alias(value: str) -> str: | |
| val = value.strip().lower() | |
| if val in ["fast", "quick", "parakeet"]: | |
| return "fast" | |
| if val in ["granite", "granite-4.1", "granite4", "granite-speech"]: | |
| return "granite" | |
| if val in ["high_quality", "high-quality", "hq", "good", "canary", "best"]: | |
| return "high_quality" | |
| raise argparse.ArgumentTypeError(f"Invalid mode alias: '{value}'. Use 'fast' or 'good/hq'.") | |
| raise argparse.ArgumentTypeError(f"Invalid mode alias: '{value}'. Use 'fast', 'granite', or 'good/hq'.") | |
| def add_shared_args(parser_obj: argparse.ArgumentParser) -> None: | |
| group = parser_obj.add_argument_group("Transcription Options") | |
| group.add_argument("--microphone", default=None, help="Explicit microphone index or name to use (overrides config).") | |
| group.add_argument( | |
| "--mode", | |
| type=parse_mode_alias, | |
| default="fast", | |
| help="Transcription mode. For fast (Parakeet) use: 'fast', 'quick'. For high-quality (Canary) use: 'good', 'hq', 'high-quality', 'high_quality'." | |
| help="Transcription mode. For fast (Parakeet) use: 'fast', 'quick'. For Granite 4.1 use: 'granite', 'granite-4.1'. For high-quality (Canary) use: 'good', 'hq', 'high-quality', 'high_quality'." | |
| ) | |
| group.add_argument("--sample-rate", type=int, default=0, help="Explicit sample rate for recording (0 to use device default).") | |
| group.add_argument("--type-keys", action="store_true", help="Type characters natively instead of using clipboard-paste.") | |
| group.add_argument("--no-uk-spelling", action="store_true", help="Bypass US to UK orthography conversion.") | |
| group.add_argument("--quiet", action="store_true", help="Disable runtime status messages output to stderr.") | |
| vad_group = parser_obj.add_argument_group("Speech Detection Tuning") | |
| vad_group.add_argument( | |
| "--start-threshold", | |
| type=float, | |
| default=DEFAULT_START_THRESHOLD, | |
| help="RMS threshold required to start speech detection. Raise slightly to make triggering less sensitive.", | |
| ) | |
| vad_group.add_argument( | |
| "--stop-threshold", | |
| type=float, | |
| default=DEFAULT_STOP_THRESHOLD, | |
| help="RMS threshold below which speech is treated as silence after capture has started.", | |
| ) | |
| vad_group.add_argument( | |
| "--min-speech-seconds", | |
| type=float, | |
| default=DEFAULT_MIN_SPEECH_SECONDS, | |
| help="Minimum sustained speech duration required before capture starts. Raise slightly to reduce accidental triggers.", | |
| ) | |
| def build_parser() -> argparse.ArgumentParser: | |
| examples = ( | |
| "Examples:\n" | |
| " python stt.py\n" | |
| " python stt.py --start-threshold 0.020\n" | |
| " python stt.py --start-threshold 0.020 --min-speech-seconds 0.24\n" | |
| " python stt.py hotkey --start-threshold 0.018\n" | |
| "\n" | |
| "To make detection slightly less sensitive, try raising `--start-threshold` a bit\n" | |
| "from 0.015 to 0.018 or 0.020, and optionally raise `--min-speech-seconds`\n" | |
| "from 0.18 to 0.22 or 0.24." | |
| ) | |
| parser = argparse.ArgumentParser( | |
| description="Local microphone STT that types/pastes transcript at the active cursor.", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
| description="Local microphone STT that types/pastes transcript at the active cursor. If no command is specified, continuous mode is used.", | |
| formatter_class=HelpFormatter, | |
| epilog=examples, | |
| ) | |
| add_shared_args(parser) | |
| sub = parser.add_subparsers(dest="cmd", required=True, title="Commands") | |
| sub = parser.add_subparsers( | |
| dest="cmd", | |
| required=False, | |
| title="Commands", | |
| metavar="{list-mics,set-mic,clear-mic,once,continuous,hotkey}", | |
| ) | |
| sub.add_parser("list-mics", help="List all available microphones and their indices.") | |
| p_set = sub.add_parser("set-mic", help="Set the preferred default microphone by index or name.") | |
| p_set.add_argument("device", help="The index or substring name of the microphone to set.") | |
| sub.add_parser("clear-mic", help="Clear the preferred microphone configuration.") | |
| p_once = sub.add_parser("once", help="Record and transcribe a single utterance.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
| p_once = sub.add_parser("once", help="Record and transcribe a single utterance.", formatter_class=HelpFormatter) | |
| add_shared_args(p_once) | |
| p_hotkey = sub.add_parser("hotkey", help="Run in the background and listen for hotkeys to trigger recording.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
| p_continuous = sub.add_parser("continuous", help="Continuously listen for speech and transcribe each utterance.", formatter_class=HelpFormatter) | |
| add_shared_args(p_continuous) | |
| p_hotkey = sub.add_parser("hotkey", help="Run in the background and listen for hotkeys to trigger recording.", formatter_class=HelpFormatter) | |
| add_shared_args(p_hotkey) | |
| p_hotkey.add_argument("--hotkey", default="ctrl+alt+space", help="Keyboard shortcut to trigger recording.") | |
| p_hotkey.add_argument("--quit-hotkey", default="ctrl+alt+q", help="Keyboard shortcut to terminate the listener loop.") | |
| return parser | |
| def build_internal_transcribe_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser(add_help=False) | |
| parser.add_argument("--audio-path", required=True) | |
| parser.add_argument("--mode", type=parse_mode_alias, required=True) | |
| return parser | |
| def main() -> int: | |
| argv = sys.argv[1:] | |
| if argv and argv[0] == "internal-transcribe": | |
| args = build_internal_transcribe_parser().parse_args(argv[1:]) | |
| return cmd_internal_transcribe(args) | |
| parser = build_parser() | |
| args = parser.parse_known_args()[0] | |
| args = parser.parse_known_args(argv)[0] | |
| if args.cmd is None: | |
| args.cmd = "continuous" | |
| if args.cmd == "list-mics": | |
| return cmd_list_mics() | |
| if args.cmd == "set-mic": | |
| return cmd_set_mic(args.device) | |
| if args.cmd == "clear-mic": | |
| return cmd_clear_mic() | |
| if args.cmd == "once": | |
| return cmd_once(args) | |
| if args.cmd == "continuous": | |
| return cmd_continuous(args) | |
| if args.cmd == "hotkey": | |
| return cmd_hotkey(args) | |
| raise RuntimeError(f"Unsupported command: {args.cmd}") | |
| if __name__ == "__main__": | |
| try: | |
| sys.exit(main()) | |
| except KeyboardInterrupt: | |
| sys.exit(0) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
usage: stt.py hotkey [-h] [--microphone MICROPHONE] [--mode MODE] [--sample-rate SAMPLE_RATE] [--type-keys]
[--no-uk-spelling] [--quiet] [--start-threshold START_THRESHOLD]
[--stop-threshold STOP_THRESHOLD] [--min-speech-seconds MIN_SPEECH_SECONDS] [--hotkey HOTKEY]
[--quit-hotkey QUIT_HOTKEY]
options:
-h, --help show this help message and exit
--hotkey HOTKEY Keyboard shortcut to trigger recording. (default: ctrl+alt+space)
--quit-hotkey QUIT_HOTKEY
Keyboard shortcut to terminate the listener loop. (default: ctrl+alt+q)
Transcription Options:
--microphone MICROPHONE
Explicit microphone index or name to use (overrides config). (default: None)
--mode MODE Transcription mode. For fast (Parakeet) use: 'fast', 'quick'. For Granite 4.1 use: 'granite',
'granite-4.1'. For high-quality (Canary) use: 'good', 'hq', 'high-quality', 'high_quality'.
(default: fast)
--sample-rate SAMPLE_RATE
Explicit sample rate for recording (0 to use device default). (default: 0)
--type-keys Type characters natively instead of using clipboard-paste. (default: False)
--no-uk-spelling Bypass US to UK orthography conversion. (default: False)
--quiet Disable runtime status messages output to stderr. (default: False)
Speech Detection Tuning:
--start-threshold START_THRESHOLD
RMS threshold required to start speech detection. Raise slightly to make triggering less
sensitive. (default: 0.015)
--stop-threshold STOP_THRESHOLD
RMS threshold below which speech is treated as silence after capture has started. (default:
0.01)
--min-speech-seconds MIN_SPEECH_SECONDS
Minimum sustained speech duration required before capture starts. Raise slightly to reduce
accidental triggers. (default: 0.18)