-
-
Save twobob/e8069711d5357ccebc685ebca53c5435 to your computer and use it in GitHub Desktop.
| # setup_complete_pytorch_stack_128_270_313.ps1 | |
| $EnvName = "128_270_313" | |
| $InstallDir = "$env:USERPROFILE\Miniconda3" | |
| $CondaPath = "$InstallDir\Scripts\conda.exe" | |
| Write-Host "1. Downloading Miniconda..." | |
| Invoke-WebRequest -Uri "https://repo.anaconda.com/miniconda/Miniconda3-latest-Windows-x86_64.exe" -OutFile "miniconda_installer.exe" | |
| Write-Host "2. Installing Miniconda..." | |
| Start-Process -FilePath ".\miniconda_installer.exe" -ArgumentList "/InstallationType=JustMe /RegisterPython=0 /S /D=$InstallDir" -Wait | |
| Write-Host "3. Initializing shell profiles..." | |
| & $CondaPath init powershell | |
| & $CondaPath init cmd.exe | |
| Remove-Item ".\miniconda_installer.exe" | |
| Write-Host "4. Accepting Anaconda Terms of Service..." | |
| & $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/main | |
| & $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/r | |
| & $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/msys2 | |
| Write-Host "5. Provisioning Python 3.13 environment ('$EnvName')..." | |
| & $CondaPath create --name $EnvName python=3.13 -y | |
| Write-Host "6. Installing PyTorch 2.7.0 and NVIDIA CUDA 12.8 toolkit via PIP wheel index... this make take a while, be patient" | |
| & $CondaPath run -n $EnvName pip install torch==2.7.0 torchvision==0.22.0 torchaudio==2.7.0 --index-url https://download.pytorch.org/whl/cu128 | |
| Write-Host "7. Installing editdistance..." | |
| & $CondaPath install --name $EnvName editdistance -y | |
| Write-Host "8. installing extras, one sec" | |
| & $CondaPath run -n $EnvName pip install nemo_toolkit["asr"] keyboard sounddevice soundfile pyperclip | |
| Write-Host "Operation Complete. Restart your terminal and run 'conda activate $EnvName' to begin development." |
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import queue | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import threading | |
| import time | |
| import re | |
| from functools import reduce | |
| from pathlib import Path | |
| from typing import Any | |
| DEFAULT_PARAKEET_MODEL = os.environ.get("PARAKEET_MODEL", "nvidia/parakeet-tdt-0.6b-v3") | |
| DEFAULT_CANARY_MODEL = os.environ.get("CANARY_MODEL", "nvidia/canary-qwen-2.5b") | |
| DEFAULT_GRANITE_MODEL = os.environ.get("GRANITE_MODEL", "ibm-granite/granite-speech-4.1-2b") | |
| DEFAULT_MODEL_ENV_PREFIX = Path( | |
| os.environ.get("ANTIGRAVITY_MODEL_ENV_PREFIX", r"C:\Users\new\Miniconda3\envs\128_270_313") | |
| ).expanduser() | |
| class HelpFormatter(argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter): | |
| pass | |
| DEFAULT_DEVICE = os.environ.get("TRANSCRIBE_DEVICE", "auto").strip().lower() | |
| DEFAULT_DTYPE = os.environ.get("TRANSCRIBE_DTYPE", "auto").strip().lower() | |
| DEFAULT_SAMPLE_RATE = int(os.environ.get("MIC_SAMPLE_RATE", "16000")) | |
| DEFAULT_START_THRESHOLD = float(os.environ.get("MIC_START_THRESHOLD", "0.015")) | |
| DEFAULT_STOP_THRESHOLD = float(os.environ.get("MIC_STOP_THRESHOLD", "0.010")) | |
| DEFAULT_MIN_SPEECH_SECONDS = float(os.environ.get("MIC_MIN_SPEECH_SECONDS", "0.18")) | |
| DEFAULT_SILENCE_SECONDS = float(os.environ.get("MIC_SILENCE_SECONDS", "0.85")) | |
| DEFAULT_PREROLL_SECONDS = float(os.environ.get("MIC_PREROLL_SECONDS", "0.35")) | |
| DEFAULT_MAX_RECORD_SECONDS = float(os.environ.get("MIC_MAX_RECORD_SECONDS", "60.0")) | |
| DEFAULT_BLOCK_MS = int(os.environ.get("MIC_BLOCK_MS", "30")) | |
| DEFAULT_CONFIG_PATH = Path( | |
| os.environ.get( | |
| "ANTIGRAVITY_STT_CONFIG", | |
| str(Path.home() / ".config" / "antigravity" / "stt_config.json") if os.name != "nt" | |
| else str(Path.home() / "AppData" / "Local" / "Antigravity" / "stt_config.json"), | |
| ) | |
| ).expanduser() | |
| class StatusReporter: | |
| def __init__(self, enabled: bool = True) -> None: | |
| self.enabled = enabled | |
| self._lock = threading.RLock() | |
| self._last_len = 0 | |
| def set_enabled(self, enabled: bool) -> None: | |
| with self._lock: | |
| self.enabled = enabled | |
| def show(self, message: str) -> None: | |
| with self._lock: | |
| if not self.enabled: | |
| return | |
| line = f"[STT] {message}" | |
| padded = line | |
| if self._last_len > len(line): | |
| padded = line + (" " * (self._last_len - len(line))) | |
| print(f"\r{padded}", file=sys.stderr, end="", flush=True) | |
| self._last_len = len(line) | |
| def clear(self) -> None: | |
| with self._lock: | |
| if not self.enabled: | |
| return | |
| if self._last_len > 0: | |
| print("\r" + (" " * (self._last_len + 6)) + "\r", file=sys.stderr, end="", flush=True) | |
| self._last_len = 0 | |
| def done(self, message: str) -> None: | |
| with self._lock: | |
| if not self.enabled: | |
| return | |
| self.show(message) | |
| print(file=sys.stderr, flush=True) | |
| self._last_len = 0 | |
| STATUS = StatusReporter(enabled=True) | |
| class ConfigManager: | |
| def __init__(self, path: Path) -> None: | |
| self.path = path | |
| self._lock = threading.RLock() | |
| def load(self) -> dict[str, Any]: | |
| with self._lock: | |
| if not self.path.exists(): | |
| return {} | |
| try: | |
| return json.loads(self.path.read_text(encoding="utf-8")) | |
| except Exception: | |
| return {} | |
| def save(self, data: dict[str, Any]) -> None: | |
| with self._lock: | |
| self.path.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = self.path.with_suffix(self.path.suffix + ".tmp") | |
| tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | |
| tmp.replace(self.path) | |
| def get_preferred_microphone(self) -> dict[str, Any] | None: | |
| item = self.load().get("preferred_microphone") | |
| return item if isinstance(item, dict) else None | |
| def set_preferred_microphone(self, microphone: dict[str, Any] | None) -> None: | |
| data = self.load() | |
| if microphone is None: | |
| data.pop("preferred_microphone", None) | |
| else: | |
| data["preferred_microphone"] = microphone | |
| self.save(data) | |
| CONFIG = ConfigManager(DEFAULT_CONFIG_PATH) | |
| class ModelManager: | |
| def __init__(self) -> None: | |
| self._lock = threading.RLock() | |
| self._parakeet = None | |
| self._parakeet_name: str | None = None | |
| self._canary = None | |
| self._canary_name: str | None = None | |
| self._granite = None | |
| self._granite_processor = None | |
| self._granite_name: str | None = None | |
| def _torch(self): | |
| import torch | |
| return torch | |
| def resolve_device(self) -> str: | |
| torch = self._torch() | |
| if DEFAULT_DEVICE == "auto": | |
| return "cuda" if torch.cuda.is_available() else "cpu" | |
| return DEFAULT_DEVICE | |
| def resolve_dtype(self): | |
| torch = self._torch() | |
| if DEFAULT_DTYPE == "auto": | |
| return torch.float16 if self.resolve_device() == "cuda" else torch.float32 | |
| return { | |
| "float16": torch.float16, | |
| "float32": torch.float32, | |
| "bfloat16": torch.bfloat16, | |
| }[DEFAULT_DTYPE] | |
| def get_parakeet(self, model_name: str): | |
| with self._lock: | |
| if self._parakeet is not None and self._parakeet_name == model_name: | |
| return self._parakeet | |
| STATUS.show(f"DOWNLOADING / LOADING PARAKEET: {model_name}") | |
| import nemo.collections.asr as nemo_asr | |
| model = nemo_asr.models.ASRModel.from_pretrained(model_name=model_name) | |
| if self.resolve_device() == "cuda": | |
| model = model.cuda() | |
| model.eval() | |
| self._parakeet = model | |
| self._parakeet_name = model_name | |
| STATUS.show(f"PARAKEET READY: {model_name}") | |
| return model | |
| def get_canary(self, model_name: str): | |
| with self._lock: | |
| if self._canary is not None and self._canary_name == model_name: | |
| return self._canary | |
| STATUS.show(f"DOWNLOADING / LOADING CANARY: {model_name}") | |
| from nemo.collections.speechlm2.models import SALM | |
| model = SALM.from_pretrained(model_name) | |
| try: | |
| if self.resolve_device() == "cuda": | |
| model = model.cuda() | |
| if hasattr(model, "to"): | |
| model = model.to(dtype=self.resolve_dtype()) | |
| model.eval() | |
| except Exception: | |
| pass | |
| self._canary = model | |
| self._canary_name = model_name | |
| STATUS.show(f"CANARY READY: {model_name}") | |
| return model | |
| def get_granite(self, model_name: str): | |
| with self._lock: | |
| if self._granite is not None and self._granite_processor is not None and self._granite_name == model_name: | |
| return self._granite_processor, self._granite | |
| STATUS.show(f"DOWNLOADING / LOADING GRANITE: {model_name}") | |
| from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
| model_name, | |
| torch_dtype=self.resolve_dtype(), | |
| ) | |
| if hasattr(model, "to"): | |
| model = model.to(self.resolve_device()) | |
| if hasattr(model, "eval"): | |
| model.eval() | |
| processor = AutoProcessor.from_pretrained(model_name) | |
| self._granite = model | |
| self._granite_processor = processor | |
| self._granite_name = model_name | |
| STATUS.show(f"GRANITE READY: {model_name}") | |
| return processor, model | |
| MODELS = ModelManager() | |
| def get_model_env_python(env_prefix: Path | None = None) -> Path | None: | |
| prefix = (env_prefix or DEFAULT_MODEL_ENV_PREFIX).expanduser() | |
| python_name = "python.exe" if os.name == "nt" else "bin/python" | |
| candidate = prefix / python_name | |
| return candidate if candidate.exists() else None | |
| def is_running_in_model_env(env_prefix: Path | None = None) -> bool: | |
| prefix = (env_prefix or DEFAULT_MODEL_ENV_PREFIX).expanduser() | |
| try: | |
| executable = Path(sys.executable).resolve() | |
| prefix = prefix.resolve() | |
| return os.path.commonpath([str(executable), str(prefix)]) == str(prefix) | |
| except Exception: | |
| return False | |
| def should_delegate_model_inference(mode: str) -> bool: | |
| if os.environ.get("ANTIGRAVITY_MODEL_ENV_ACTIVE") == "1": | |
| return False | |
| model_python = get_model_env_python() | |
| if model_python is None or is_running_in_model_env(): | |
| return False | |
| return mode in {"fast", "granite", "high_quality"} | |
| def run_transcription_in_model_env(audio_path: Path, mode: str) -> str: | |
| model_python = get_model_env_python() | |
| if model_python is None: | |
| raise RuntimeError(f"Model environment Python not found under: {DEFAULT_MODEL_ENV_PREFIX}") | |
| env = os.environ.copy() | |
| env["ANTIGRAVITY_MODEL_ENV_ACTIVE"] = "1" | |
| env["PYTHONIOENCODING"] = "utf-8" | |
| proc = subprocess.run( | |
| [ | |
| str(model_python), | |
| str(Path(__file__).resolve()), | |
| "internal-transcribe", | |
| "--audio-path", | |
| str(audio_path), | |
| "--mode", | |
| mode, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| env=env, | |
| ) | |
| if proc.returncode != 0: | |
| stderr = (proc.stderr or "").strip() | |
| stdout = (proc.stdout or "").strip() | |
| detail = stderr or stdout or f"exit code {proc.returncode}" | |
| raise RuntimeError(f"Model env transcription failed: {detail}") | |
| lines = [line.strip() for line in (proc.stdout or "").splitlines() if line.strip()] | |
| if not lines: | |
| raise RuntimeError("Model env transcription produced no output.") | |
| try: | |
| payload = json.loads(lines[-1]) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError(f"Model env transcription returned invalid JSON: {exc}") from exc | |
| text = payload.get("text") | |
| if not isinstance(text, str): | |
| raise RuntimeError("Model env transcription response did not include text.") | |
| return text.strip() | |
| def query_microphones() -> dict[str, Any]: | |
| import sounddevice as sd | |
| devices = sd.query_devices() | |
| try: | |
| default_input_index = sd.default.device[0] | |
| except Exception: | |
| default_input_index = None | |
| result = [] | |
| for idx, raw in enumerate(devices): | |
| rec = { | |
| "index": int(idx), | |
| "name": str(raw.get("name", f"Input {idx}")), | |
| "max_input_channels": int(raw.get("max_input_channels", 0) or 0), | |
| "default_samplerate": float(raw.get("default_samplerate", 0.0) or 0.0), | |
| "is_default_input": default_input_index is not None and int(idx) == int(default_input_index), | |
| } | |
| if rec["max_input_channels"] > 0: | |
| result.append(rec) | |
| return {"devices": result, "default_input_index": default_input_index} | |
| def resolve_microphone(device: int | str | None) -> tuple[int | None, dict[str, Any] | None]: | |
| info = query_microphones() | |
| if device is None: | |
| preferred = CONFIG.get_preferred_microphone() | |
| if preferred is not None and isinstance(preferred.get("index"), int): | |
| for mic in info["devices"]: | |
| if int(mic["index"]) == int(preferred["index"]): | |
| return int(mic["index"]), mic | |
| default_index = info["default_input_index"] | |
| if default_index is None: | |
| return None, None | |
| for mic in info["devices"]: | |
| if int(mic["index"]) == int(default_index): | |
| return int(mic["index"]), mic | |
| return int(default_index), None | |
| if isinstance(device, int) or (isinstance(device, str) and device.isdigit()): | |
| idx = int(device) | |
| for mic in info["devices"]: | |
| if int(mic["index"]) == idx: | |
| return idx, mic | |
| raise ValueError(f"Microphone index not found: {idx}") | |
| target = str(device).strip().lower() | |
| exact = None | |
| partial = None | |
| for mic in info["devices"]: | |
| name = str(mic["name"]).lower() | |
| if name == target: | |
| exact = mic | |
| break | |
| if target in name and partial is None: | |
| partial = mic | |
| match = exact or partial | |
| if match is None: | |
| raise ValueError(f"Microphone name not found: {device}") | |
| return int(match["index"]), match | |
| def pick_record_samplerate(requested: int, mic: dict[str, Any] | None) -> int: | |
| if requested > 0: | |
| return requested | |
| if mic is not None: | |
| rate = int(float(mic.get("default_samplerate") or 0.0)) | |
| if rate > 0: | |
| return rate | |
| return DEFAULT_SAMPLE_RATE | |
| def record_phrase_to_wav( | |
| output_path: Path, | |
| device: int | None, | |
| sample_rate: int, | |
| start_threshold: float, | |
| stop_threshold: float, | |
| min_speech_seconds: float, | |
| silence_seconds: float, | |
| preroll_seconds: float, | |
| max_record_seconds: float, | |
| block_ms: int, | |
| ) -> dict[str, Any]: | |
| allow_empty: bool = False, | |
| ) -> dict[str, Any] | None: | |
| import collections | |
| import numpy as np | |
| import sounddevice as sd | |
| import soundfile as sf | |
| block_frames = max(1, int(sample_rate * (block_ms / 1000.0))) | |
| preroll_blocks = max(1, int(round(preroll_seconds * sample_rate / block_frames))) | |
| min_speech_blocks = max(1, int(round(min_speech_seconds * sample_rate / block_frames))) | |
| silence_blocks_to_stop = max(1, int(round(silence_seconds * sample_rate / block_frames))) | |
| max_blocks = max(1, int(round(max_record_seconds * sample_rate / block_frames))) | |
| q: queue.Queue[Any] = queue.Queue() | |
| preroll = collections.deque(maxlen=preroll_blocks) | |
| utterance: list[np.ndarray] = [] | |
| speech_started = False | |
| speech_count = 0 | |
| silence_count = 0 | |
| total_blocks = 0 | |
| peak_rms = 0.0 | |
| def callback(indata, frames, time_info, status): | |
| q.put(indata.copy()) | |
| STATUS.show("LISTENING") | |
| t0 = time.perf_counter() | |
| with sd.InputStream( | |
| samplerate=sample_rate, | |
| channels=1, | |
| dtype="float32", | |
| blocksize=block_frames, | |
| callback=callback, | |
| device=device, | |
| ): | |
| while True: | |
| block = q.get() | |
| total_blocks += 1 | |
| mono = block[:, 0] if block.ndim > 1 else block | |
| rms = float((mono.astype("float64") ** 2).mean() ** 0.5) | |
| peak_rms = max(peak_rms, rms) | |
| if not speech_started: | |
| preroll.append(block) | |
| if rms >= start_threshold: | |
| speech_count += 1 | |
| else: | |
| speech_count = 0 | |
| if speech_count >= min_speech_blocks: | |
| speech_started = True | |
| STATUS.show("SPEECH DETECTED") | |
| utterance.extend(list(preroll)) | |
| utterance.append(block) | |
| silence_count = 0 | |
| else: | |
| utterance.append(block) | |
| if rms <= stop_threshold: | |
| silence_count += 1 | |
| else: | |
| silence_count = 0 | |
| if silence_count >= silence_blocks_to_stop: | |
| break | |
| if total_blocks >= max_blocks: | |
| break | |
| t1 = time.perf_counter() | |
| if not utterance: | |
| if allow_empty: | |
| return None | |
| raise RuntimeError("No speech detected.") | |
| audio = np.concatenate(utterance, axis=0).astype("float32", copy=False) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| sf.write(str(output_path), audio, sample_rate, subtype="PCM_16") | |
| t2 = time.perf_counter() | |
| return { | |
| "capture_ms": round((t1 - t0) * 1000.0, 3), | |
| "write_wav_ms": round((t2 - t1) * 1000.0, 3), | |
| "total_capture_ms": round((t2 - t0) * 1000.0, 3), | |
| "peak_rms": round(peak_rms, 6), | |
| } | |
| def transcribe_file(audio_path: Path, mode: str) -> str: | |
| STATUS.show(f"TRANSCRIBING ({mode})") | |
| if should_delegate_model_inference(mode): | |
| STATUS.show(f"TRANSCRIBING ({mode} via env)") | |
| return run_transcription_in_model_env(audio_path, mode) | |
| if mode == "fast": | |
| model = MODELS.get_parakeet(DEFAULT_PARAKEET_MODEL) | |
| try: | |
| result = model.transcribe( | |
| [str(audio_path)], | |
| batch_size=1, | |
| verbose=False, | |
| return_hypotheses=True, | |
| ) | |
| except TypeError: | |
| result = model.transcribe( | |
| [str(audio_path)], | |
| batch_size=1, | |
| verbose=False, | |
| ) | |
| if isinstance(result, tuple): | |
| result = result[0] | |
| if not isinstance(result, list): | |
| result = [result] | |
| hyp = result[0] if result else None | |
| if hasattr(hyp, "text"): | |
| return str(hyp.text or "").strip() | |
| return str(hyp or "").strip() | |
| if mode == "granite": | |
| import numpy as np | |
| import soundfile as sf | |
| processor, model = MODELS.get_granite(DEFAULT_GRANITE_MODEL) | |
| tokenizer = getattr(processor, "tokenizer", processor) | |
| audio, sample_rate = sf.read(str(audio_path), dtype="float32", always_2d=False) | |
| audio = np.asarray(audio, dtype="float32") | |
| if audio.ndim > 1: | |
| audio = audio.mean(axis=1) | |
| if int(sample_rate) != 16000: | |
| src_positions = np.linspace(0.0, 1.0, num=max(1, audio.shape[0]), endpoint=False) | |
| dst_length = max(1, int(round(audio.shape[0] * 16000 / int(sample_rate)))) | |
| dst_positions = np.linspace(0.0, 1.0, num=dst_length, endpoint=False) | |
| audio = np.interp(dst_positions, src_positions, audio).astype("float32", copy=False) | |
| sample_rate = 16000 | |
| prompt_text = "<|audio|>transcribe the speech with proper punctuation and capitalization." | |
| if hasattr(tokenizer, "apply_chat_template"): | |
| prompt_text = tokenizer.apply_chat_template( | |
| [{"role": "user", "content": prompt_text}], | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| ) | |
| model_inputs = processor(prompt_text, audio, return_tensors="pt") | |
| if hasattr(model_inputs, "to"): | |
| model_inputs = model_inputs.to(MODELS.resolve_device()) | |
| model_outputs = model.generate( | |
| **model_inputs, | |
| max_new_tokens=200, | |
| do_sample=False, | |
| num_beams=1, | |
| ) | |
| num_input_tokens = model_inputs["input_ids"].shape[-1] if "input_ids" in model_inputs else 0 | |
| new_tokens = model_outputs[:, num_input_tokens:] if num_input_tokens else model_outputs | |
| if hasattr(tokenizer, "batch_decode"): | |
| return str( | |
| tokenizer.batch_decode( | |
| new_tokens, | |
| add_special_tokens=False, | |
| skip_special_tokens=True, | |
| )[0] | |
| ).strip() | |
| return str(new_tokens).strip() | |
| model = MODELS.get_canary(DEFAULT_CANARY_MODEL) | |
| audio_locator = getattr(model, "audio_locator_tag", "<|audioplaceholder|>") | |
| audio_str = str(audio_path) | |
| # Exact schema mandated by NeMo SALM documentation | |
| prompts = [ | |
| [ | |
| { | |
| "role": "user", | |
| "content": f"Transcribe the following: {audio_locator}", | |
| "audio": [audio_str] | |
| } | |
| ] | |
| ] | |
| try: | |
| answer_ids = model.generate(prompts=prompts, max_new_tokens=1024) | |
| except Exception as exc: | |
| raise RuntimeError(f"Canary generation failed: {exc}") | |
| try: | |
| if hasattr(answer_ids, "cpu"): | |
| tokens = answer_ids.cpu().tolist() | |
| else: | |
| tokens = answer_ids | |
| # Flatten nested lists sequentially | |
| while isinstance(tokens, list) and len(tokens) > 0 and isinstance(tokens[0], list): | |
| tokens = tokens[0] | |
| if hasattr(model.tokenizer, "decode"): | |
| text_out = model.tokenizer.decode(tokens, skip_special_tokens=True) | |
| elif hasattr(model.tokenizer, "ids_to_text"): | |
| text_out = model.tokenizer.ids_to_text(tokens) | |
| else: | |
| text_out = str(tokens) | |
| except Exception as exc: | |
| raise RuntimeError(f"Failed to decode Canary output: {exc}") | |
| # Strip Qwen ChatML artifacts | |
| if "<|im_start|>assistant" in text_out: | |
| text_out = text_out.split("<|im_start|>assistant")[-1] | |
| text_out = text_out.replace("<|im_end|>", "").replace("<|im_start|>", "").strip() | |
| return text_out | |
| def convert_us_to_uk_orthography_oneliner(text: str) -> str: | |
| """ | |
| Final optimized US->UK orthographic converter. | |
| High-coverage heuristic using functional reduction. | |
| """ | |
| return reduce(lambda t, rule: re.sub(rule[0], lambda m: (lambda orig, exp: exp.upper() if orig.isupper() else (exp[0].upper() + exp[1:] if orig[0].isupper() else exp.lower()))(m.group(0), m.expand(rule[1])), t, flags=re.IGNORECASE), [ | |
| (r"\b(\w+)yz(e|es|ed|ing)\b", r"\1ys\2"), | |
| (r"\b(?!(?:size|prize|capsize|seize|maize|assize|glaze|gaze|raze|doze|blaze)\b)(\w+)iz(e|es|ed|ing)\b", r"\1is\2"), | |
| (r"\b(?!(?:actor|author|doctor|error|motor|sponsor|mirror|major|minor|sensor|factor|prior|mayor|senator|governor|chancellor|successor|vendor|visitor|terror|honorary)\b)(\w{2,})or(s|)\b", r"\1our\2"), | |
| (r"\b(\w*[aeiou])l(ed|ing|er|ers)\b", r"\1ll\2"), | |
| (r"\b(cent|met|theat|lit|fib|sombr|meagr|calibr|lust|spect|sepulch)er(s|)\b", r"\1re\2"), | |
| (r"\b(\w+)(log|gog)(s|)\b", r"\1\2ue\3"), | |
| (r"\b(def|off|pret)ense(s|)\b", r"\1ence\2"), | |
| (r"\b(an|p|orthop|gyn|leuk|an|arch|encyclop|h)e(m|diatr|d|col|sthes|ol|matol)", r"\1ae\2"), | |
| (r"\b(estrogen|esophagus|edema)\b", r"o\1"), | |
| (r"\bmaneuver(s|ed|ing|)\b", r"manoeuvre\1"), | |
| (r"\baluminum\b", "aluminium"), | |
| (r"\bcheck(s|)\b", r"cheque\1"), | |
| (r"\bjewelry\b", "jewellery"), | |
| (r"\bprogram(s|)\b", r"programme\1"), | |
| (r"\bmold(s|)\b", r"mould\1"), | |
| (r"\bgray\b", "grey") | |
| ], text) | |
| def emit_text_at_cursor(text: str, paste: bool = True) -> None: | |
| if not text: | |
| return | |
| STATUS.show("PASTING") | |
| if paste: | |
| import pyperclip | |
| import keyboard | |
| pyperclip.copy(text) | |
| time.sleep(0.05) | |
| keyboard.press_and_release("ctrl+v") | |
| return | |
| import keyboard | |
| keyboard.write(text, delay=0) | |
| def capture_and_transcribe(microphone: int | str | None, mode: str, sample_rate: int, paste: bool, no_uk_spelling: bool = False) -> str: | |
| return capture_and_transcribe_once( | |
| microphone, | |
| mode, | |
| sample_rate, | |
| paste, | |
| no_uk_spelling=no_uk_spelling, | |
| allow_no_speech=False, | |
| ) | |
| def capture_and_transcribe_once( | |
| microphone: int | str | None, | |
| mode: str, | |
| sample_rate: int, | |
| paste: bool, | |
| start_threshold: float = DEFAULT_START_THRESHOLD, | |
| stop_threshold: float = DEFAULT_STOP_THRESHOLD, | |
| min_speech_seconds: float = DEFAULT_MIN_SPEECH_SECONDS, | |
| no_uk_spelling: bool = False, | |
| allow_no_speech: bool = False, | |
| ) -> str: | |
| idx, mic = resolve_microphone(microphone) | |
| rate = pick_record_samplerate(sample_rate, mic) | |
| mic_name = str(mic["name"]) if mic is not None and "name" in mic else f"device {idx}" | |
| STATUS.show(f"INITIALIZING MIC: {mic_name} @ {rate} Hz") | |
| tmp = tempfile.NamedTemporaryFile(prefix="antigravity_phrase_", suffix=".wav", delete=False) | |
| tmp.close() | |
| wav_path = Path(tmp.name) | |
| try: | |
| stats = record_phrase_to_wav( | |
| wav_path, | |
| device=idx, | |
| sample_rate=rate, | |
| start_threshold=DEFAULT_START_THRESHOLD, | |
| stop_threshold=DEFAULT_STOP_THRESHOLD, | |
| min_speech_seconds=DEFAULT_MIN_SPEECH_SECONDS, | |
| start_threshold=start_threshold, | |
| stop_threshold=stop_threshold, | |
| min_speech_seconds=min_speech_seconds, | |
| silence_seconds=DEFAULT_SILENCE_SECONDS, | |
| preroll_seconds=DEFAULT_PREROLL_SECONDS, | |
| max_record_seconds=DEFAULT_MAX_RECORD_SECONDS, | |
| block_ms=DEFAULT_BLOCK_MS, | |
| allow_empty=allow_no_speech, | |
| ) | |
| if stats is None: | |
| return "" | |
| text = transcribe_file(wav_path, mode=mode) | |
| if not no_uk_spelling: | |
| text = convert_us_to_uk_orthography_oneliner(text) | |
| text = text.strip() | |
| if not text: | |
| STATUS.clear() | |
| return "" | |
| emit_text_at_cursor(text, paste=paste) | |
| STATUS.done("DONE") | |
| print(json.dumps({ | |
| "text": text, | |
| "microphone": mic, | |
| "sample_rate": rate, | |
| "stats": stats, | |
| "mode": mode, | |
| }, ensure_ascii=False)) | |
| return text | |
| finally: | |
| try: | |
| wav_path.unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| def cmd_list_mics() -> int: | |
| print(json.dumps(query_microphones(), ensure_ascii=False, indent=2)) | |
| return 0 | |
| def cmd_internal_transcribe(args: argparse.Namespace) -> int: | |
| STATUS.set_enabled(False) | |
| text = transcribe_file(Path(args.audio_path), mode=args.mode) | |
| print(json.dumps({"text": text}, ensure_ascii=False)) | |
| return 0 | |
| def cmd_set_mic(device: str) -> int: | |
| idx, mic = resolve_microphone(device) | |
| if mic is None: | |
| raise RuntimeError("Microphone not found.") | |
| CONFIG.set_preferred_microphone(mic) | |
| print(json.dumps({"preferred_microphone": mic, "config_path": str(CONFIG.path)}, ensure_ascii=False, indent=2)) | |
| return 0 | |
| def cmd_clear_mic() -> int: | |
| CONFIG.set_preferred_microphone(None) | |
| print(json.dumps({"preferred_microphone": None, "config_path": str(CONFIG.path)}, ensure_ascii=False, indent=2)) | |
| return 0 | |
| def cmd_once(args: argparse.Namespace) -> int: | |
| STATUS.set_enabled(not args.quiet) | |
| capture_and_transcribe(args.microphone, args.mode, args.sample_rate, paste=not args.type_keys, no_uk_spelling=args.no_uk_spelling) | |
| capture_and_transcribe_once( | |
| args.microphone, | |
| args.mode, | |
| args.sample_rate, | |
| paste=not args.type_keys, | |
| start_threshold=args.start_threshold, | |
| stop_threshold=args.stop_threshold, | |
| min_speech_seconds=args.min_speech_seconds, | |
| no_uk_spelling=args.no_uk_spelling, | |
| ) | |
| return 0 | |
| def cmd_continuous(args: argparse.Namespace) -> int: | |
| STATUS.set_enabled(not args.quiet) | |
| print("Always listening. Press Ctrl+C to stop.") | |
| while True: | |
| try: | |
| capture_and_transcribe_once( | |
| args.microphone, | |
| args.mode, | |
| args.sample_rate, | |
| paste=not args.type_keys, | |
| start_threshold=args.start_threshold, | |
| stop_threshold=args.stop_threshold, | |
| min_speech_seconds=args.min_speech_seconds, | |
| no_uk_spelling=args.no_uk_spelling, | |
| allow_no_speech=True, | |
| ) | |
| except KeyboardInterrupt: | |
| break | |
| except Exception as e: | |
| STATUS.show(f"ERROR: {e}") | |
| print(file=sys.stderr) | |
| return 0 | |
| def cmd_hotkey(args: argparse.Namespace) -> int: | |
| import keyboard | |
| STATUS.set_enabled(not args.quiet) | |
| print(f"Ready. Press {args.hotkey} to capture one utterance. Press {args.quit_hotkey} to exit.") | |
| action_queue = queue.Queue() | |
| keyboard.add_hotkey(args.hotkey, lambda: action_queue.put("capture")) | |
| keyboard.add_hotkey(args.quit_hotkey, lambda: action_queue.put("quit")) | |
| while True: | |
| try: | |
| action = action_queue.get() | |
| if action == "quit": | |
| break | |
| elif action == "capture": | |
| try: | |
| capture_and_transcribe(args.microphone, args.mode, args.sample_rate, paste=not args.type_keys, no_uk_spelling=args.no_uk_spelling) | |
| capture_and_transcribe_once( | |
| args.microphone, | |
| args.mode, | |
| args.sample_rate, | |
| paste=not args.type_keys, | |
| start_threshold=args.start_threshold, | |
| stop_threshold=args.stop_threshold, | |
| min_speech_seconds=args.min_speech_seconds, | |
| no_uk_spelling=args.no_uk_spelling, | |
| ) | |
| except Exception as e: | |
| STATUS.show(f"ERROR: {e}") | |
| print(file=sys.stderr) | |
| except KeyboardInterrupt: | |
| break | |
| try: | |
| keyboard.unhook_all() | |
| except Exception: | |
| pass | |
| return 0 | |
| def parse_mode_alias(value: str) -> str: | |
| val = value.strip().lower() | |
| if val in ["fast", "quick", "parakeet"]: | |
| return "fast" | |
| if val in ["granite", "granite-4.1", "granite4", "granite-speech"]: | |
| return "granite" | |
| if val in ["high_quality", "high-quality", "hq", "good", "canary", "best"]: | |
| return "high_quality" | |
| raise argparse.ArgumentTypeError(f"Invalid mode alias: '{value}'. Use 'fast' or 'good/hq'.") | |
| raise argparse.ArgumentTypeError(f"Invalid mode alias: '{value}'. Use 'fast', 'granite', or 'good/hq'.") | |
| def add_shared_args(parser_obj: argparse.ArgumentParser) -> None: | |
| group = parser_obj.add_argument_group("Transcription Options") | |
| group.add_argument("--microphone", default=None, help="Explicit microphone index or name to use (overrides config).") | |
| group.add_argument( | |
| "--mode", | |
| type=parse_mode_alias, | |
| default="fast", | |
| help="Transcription mode. For fast (Parakeet) use: 'fast', 'quick'. For high-quality (Canary) use: 'good', 'hq', 'high-quality', 'high_quality'." | |
| help="Transcription mode. For fast (Parakeet) use: 'fast', 'quick'. For Granite 4.1 use: 'granite', 'granite-4.1'. For high-quality (Canary) use: 'good', 'hq', 'high-quality', 'high_quality'." | |
| ) | |
| group.add_argument("--sample-rate", type=int, default=0, help="Explicit sample rate for recording (0 to use device default).") | |
| group.add_argument("--type-keys", action="store_true", help="Type characters natively instead of using clipboard-paste.") | |
| group.add_argument("--no-uk-spelling", action="store_true", help="Bypass US to UK orthography conversion.") | |
| group.add_argument("--quiet", action="store_true", help="Disable runtime status messages output to stderr.") | |
| vad_group = parser_obj.add_argument_group("Speech Detection Tuning") | |
| vad_group.add_argument( | |
| "--start-threshold", | |
| type=float, | |
| default=DEFAULT_START_THRESHOLD, | |
| help="RMS threshold required to start speech detection. Raise slightly to make triggering less sensitive.", | |
| ) | |
| vad_group.add_argument( | |
| "--stop-threshold", | |
| type=float, | |
| default=DEFAULT_STOP_THRESHOLD, | |
| help="RMS threshold below which speech is treated as silence after capture has started.", | |
| ) | |
| vad_group.add_argument( | |
| "--min-speech-seconds", | |
| type=float, | |
| default=DEFAULT_MIN_SPEECH_SECONDS, | |
| help="Minimum sustained speech duration required before capture starts. Raise slightly to reduce accidental triggers.", | |
| ) | |
| def build_parser() -> argparse.ArgumentParser: | |
| examples = ( | |
| "Examples:\n" | |
| " python stt.py\n" | |
| " python stt.py --start-threshold 0.020\n" | |
| " python stt.py --start-threshold 0.020 --min-speech-seconds 0.24\n" | |
| " python stt.py hotkey --start-threshold 0.018\n" | |
| "\n" | |
| "To make detection slightly less sensitive, try raising `--start-threshold` a bit\n" | |
| "from 0.015 to 0.018 or 0.020, and optionally raise `--min-speech-seconds`\n" | |
| "from 0.18 to 0.22 or 0.24." | |
| ) | |
| parser = argparse.ArgumentParser( | |
| description="Local microphone STT that types/pastes transcript at the active cursor.", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
| description="Local microphone STT that types/pastes transcript at the active cursor. If no command is specified, continuous mode is used.", | |
| formatter_class=HelpFormatter, | |
| epilog=examples, | |
| ) | |
| add_shared_args(parser) | |
| sub = parser.add_subparsers(dest="cmd", required=True, title="Commands") | |
| sub = parser.add_subparsers( | |
| dest="cmd", | |
| required=False, | |
| title="Commands", | |
| metavar="{list-mics,set-mic,clear-mic,once,continuous,hotkey}", | |
| ) | |
| sub.add_parser("list-mics", help="List all available microphones and their indices.") | |
| p_set = sub.add_parser("set-mic", help="Set the preferred default microphone by index or name.") | |
| p_set.add_argument("device", help="The index or substring name of the microphone to set.") | |
| sub.add_parser("clear-mic", help="Clear the preferred microphone configuration.") | |
| p_once = sub.add_parser("once", help="Record and transcribe a single utterance.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
| p_once = sub.add_parser("once", help="Record and transcribe a single utterance.", formatter_class=HelpFormatter) | |
| add_shared_args(p_once) | |
| p_hotkey = sub.add_parser("hotkey", help="Run in the background and listen for hotkeys to trigger recording.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
| p_continuous = sub.add_parser("continuous", help="Continuously listen for speech and transcribe each utterance.", formatter_class=HelpFormatter) | |
| add_shared_args(p_continuous) | |
| p_hotkey = sub.add_parser("hotkey", help="Run in the background and listen for hotkeys to trigger recording.", formatter_class=HelpFormatter) | |
| add_shared_args(p_hotkey) | |
| p_hotkey.add_argument("--hotkey", default="ctrl+alt+space", help="Keyboard shortcut to trigger recording.") | |
| p_hotkey.add_argument("--quit-hotkey", default="ctrl+alt+q", help="Keyboard shortcut to terminate the listener loop.") | |
| return parser | |
| def build_internal_transcribe_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser(add_help=False) | |
| parser.add_argument("--audio-path", required=True) | |
| parser.add_argument("--mode", type=parse_mode_alias, required=True) | |
| return parser | |
| def main() -> int: | |
| argv = sys.argv[1:] | |
| if argv and argv[0] == "internal-transcribe": | |
| args = build_internal_transcribe_parser().parse_args(argv[1:]) | |
| return cmd_internal_transcribe(args) | |
| parser = build_parser() | |
| args = parser.parse_known_args()[0] | |
| args = parser.parse_known_args(argv)[0] | |
| if args.cmd is None: | |
| args.cmd = "continuous" | |
| if args.cmd == "list-mics": | |
| return cmd_list_mics() | |
| if args.cmd == "set-mic": | |
| return cmd_set_mic(args.device) | |
| if args.cmd == "clear-mic": | |
| return cmd_clear_mic() | |
| if args.cmd == "once": | |
| return cmd_once(args) | |
| if args.cmd == "continuous": | |
| return cmd_continuous(args) | |
| if args.cmd == "hotkey": | |
| return cmd_hotkey(args) | |
| raise RuntimeError(f"Unsupported command: {args.cmd}") | |
| if __name__ == "__main__": | |
| try: | |
| sys.exit(main()) | |
| except KeyboardInterrupt: | |
| sys.exit(0) |
powershell setup_complete_pytorch_stack_128_270_313.ps1
conda activate 128_270_313
Added granite support
can now be called like
C:\Users\new\Miniconda3\envs\128_270_313\python.exe ./stt.py hotkey --mode granite
or C:\Users\new\Miniconda3\envs\128_270_313\python.exe ./stt.py --mode granite (for continuous listening)
etc
usage: stt.py hotkey [-h] [--microphone MICROPHONE] [--mode MODE] [--sample-rate SAMPLE_RATE] [--type-keys]
[--no-uk-spelling] [--quiet] [--start-threshold START_THRESHOLD]
[--stop-threshold STOP_THRESHOLD] [--min-speech-seconds MIN_SPEECH_SECONDS] [--hotkey HOTKEY]
[--quit-hotkey QUIT_HOTKEY]
options:
-h, --help show this help message and exit
--hotkey HOTKEY Keyboard shortcut to trigger recording. (default: ctrl+alt+space)
--quit-hotkey QUIT_HOTKEY
Keyboard shortcut to terminate the listener loop. (default: ctrl+alt+q)
Transcription Options:
--microphone MICROPHONE
Explicit microphone index or name to use (overrides config). (default: None)
--mode MODE Transcription mode. For fast (Parakeet) use: 'fast', 'quick'. For Granite 4.1 use: 'granite',
'granite-4.1'. For high-quality (Canary) use: 'good', 'hq', 'high-quality', 'high_quality'.
(default: fast)
--sample-rate SAMPLE_RATE
Explicit sample rate for recording (0 to use device default). (default: 0)
--type-keys Type characters natively instead of using clipboard-paste. (default: False)
--no-uk-spelling Bypass US to UK orthography conversion. (default: False)
--quiet Disable runtime status messages output to stderr. (default: False)
Speech Detection Tuning:
--start-threshold START_THRESHOLD
RMS threshold required to start speech detection. Raise slightly to make triggering less
sensitive. (default: 0.015)
--stop-threshold STOP_THRESHOLD
RMS threshold below which speech is treated as silence after capture has started. (default:
0.01)
--min-speech-seconds MIN_SPEECH_SECONDS
Minimum sustained speech duration required before capture starts. Raise slightly to reduce
accidental triggers. (default: 0.18)
canary
python .\stt.py hotkey --mode high_quality
regular
python .\stt.py hotkey