Created
April 22, 2025 16:11
-
-
Save vr000m/b5d9b156ea93bd52c1eef82b112aa257 to your computer and use it in GitHub Desktop.
bot.py for pcc generates tones
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Copyright (c) 2025, Daily | |
# | |
# SPDX-License-Identifier: BSD 2-Clause License | |
# | |
import asyncio | |
import os | |
import sys | |
import time | |
import numpy as np | |
from dotenv import load_dotenv | |
from loguru import logger | |
from openai._types import NotGiven | |
from pipecat.audio.vad.silero import SileroVADAnalyzer | |
from pipecat.frames.frames import ( | |
Frame, | |
OutputAudioRawFrame, | |
) | |
from pipecat.pipeline.pipeline import Pipeline | |
from pipecat.pipeline.runner import PipelineRunner | |
from pipecat.pipeline.task import PipelineParams, PipelineTask | |
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext | |
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor | |
# from pipecat.services.cartesia.tts import CartesiaTTSService | |
# from pipecat.services.deepgram.stt import DeepgramSTTService | |
from pipecat.services.openai.llm import OpenAILLMService | |
from pipecat.transports.services.daily import DailyParams, DailyTransport | |
from pipecatcloud.agent import DailySessionArguments | |
load_dotenv(override=True) | |
logger.remove() | |
logger.add(sys.stderr, level="DEBUG") | |
AUDIO_SAMPLE_RATE = 8000 | |
# Add environment variable validation | |
def validate_env_vars(): | |
required_vars = ["DAILY_API_KEY", "DEEPGRAM_API_KEY", "OPENAI_API_KEY", "CARTESIA_API_KEY"] | |
missing = [var for var in required_vars if not os.getenv(var)] | |
if missing: | |
raise ValueError(f"Missing required environment variables: {', '.join(missing)}") | |
def generate_tone_data(freq=400, duration=5.0, sample_rate=8000): | |
"""Generate a sine wave tone with the specified parameters. | |
Args: | |
freq: Frequency of the tone in Hz | |
duration: Duration of the tone in seconds | |
sample_rate: Sample rate in Hz | |
Returns: | |
A NumPy array containing the audio data as int16 | |
""" | |
# Generate time array | |
t = np.linspace(0, duration, int(sample_rate * duration), False) | |
# Generate sine wave | |
tone = 0.5 * np.sin(2 * np.pi * freq * t) | |
# Convert to 16-bit PCM | |
tone_data = (tone * 32767).astype(np.int16) | |
logger.info(f"Generated tone data: {len(tone_data) / sample_rate:.1f}s at {freq}Hz") | |
return tone_data | |
class SimpleAudioFrameInserter(FrameProcessor): | |
"""A frame processor that inserts audio frames into the pipeline.""" | |
def __init__(self, audio_data=None, tone_file_path=None, sample_rate=8000): | |
"""Initialize the SimpleAudioFrameInserter. | |
Args: | |
audio_data: NumPy array containing the audio data (priority over tone_file_path) | |
tone_file_path: Path to the audio file containing tone data (used if audio_data is None) | |
sample_rate: Sample rate of the audio frames to be sent | |
""" | |
super().__init__() | |
self.sample_rate = sample_rate | |
self.tone_file_path = tone_file_path | |
self.audio_data = audio_data | |
self.frame_size = int(sample_rate * 0.02) # 20ms frames | |
self.num_frames = 0 | |
self.running = False | |
self._send_task = None | |
logger.info(f"SimpleAudioFrameInserter initialized with frame_size: {self.frame_size}") | |
# Set number of frames immediately if audio_data is provided | |
if self.audio_data is not None: | |
self._calculate_frames() | |
def _calculate_frames(self): | |
"""Calculate the number of frames from audio data.""" | |
if self.audio_data is not None and len(self.audio_data) > 0: | |
self.num_frames = len(self.audio_data) // self.frame_size | |
logger.info(f"Calculated {self.num_frames} frames from {len(self.audio_data)} samples") | |
# Fallback if we don't have enough samples for even one frame | |
if self.num_frames == 0 and len(self.audio_data) > 0: | |
logger.warning("Not enough audio data for a full frame, creating a single frame") | |
self.num_frames = 1 | |
else: | |
logger.warning("Cannot calculate frames: No audio data available") | |
async def setup(self): | |
"""Prepare audio data during setup.""" | |
await super().setup() | |
logger.info("Setting up SimpleAudioFrameInserter") | |
try: | |
# If audio data was already provided | |
if self.audio_data is not None: | |
logger.info(f"Using provided audio data: {len(self.audio_data)} samples") | |
self._calculate_frames() | |
return | |
# Otherwise, load from file or generate | |
if self.tone_file_path: | |
import soundfile as sf | |
logger.info(f"Loading tone file: {self.tone_file_path}") | |
self.audio_data, _ = sf.read(self.tone_file_path, dtype="int16") | |
logger.info(f"Loaded tone file: {len(self.audio_data)} samples") | |
else: | |
# Generate tone data if neither was provided | |
logger.info(f"Generating tone data with sample rate {self.sample_rate}") | |
self.audio_data = generate_tone_data( | |
freq=400, duration=5.0, sample_rate=self.sample_rate | |
) | |
logger.info(f"Generated tone data: {len(self.audio_data)} samples") | |
# Calculate the number of frames and validate | |
if self.frame_size <= 0: | |
self.frame_size = 160 # Default for 8kHz, 20ms at 8bytes per sample | |
logger.warning(f"Invalid frame size, using default: {self.frame_size}") | |
self._calculate_frames() | |
# if we still have 0 frames, create some fallback data | |
if self.num_frames == 0: | |
logger.warning("No frames available after setup, creating fallback data") | |
# Create 1 second of silence as fallback | |
self.audio_data = np.zeros(self.sample_rate, dtype=np.int16) | |
self.num_frames = self.sample_rate // self.frame_size | |
logger.info(f"Created fallback data with {self.num_frames} frames") | |
logger.info(f"Setup complete: {self.num_frames} frames of {self.frame_size} samples") | |
except Exception as e: | |
logger.error(f"Error in setup: {e}") | |
# Create fallback data | |
logger.warning("Creating fallback audio data due to setup error") | |
self.audio_data = np.zeros(self.sample_rate, dtype=np.int16) | |
self.num_frames = self.sample_rate // self.frame_size | |
logger.info(f"Created fallback data with {self.num_frames} frames") | |
async def process_frame(self, frame: Frame, direction: FrameDirection): | |
"""Pass all frames through this processor.""" | |
await super().process_frame(frame, direction) | |
await self.push_frame(frame, direction) | |
def start_sending(self): | |
"""Start a loop that sends audio frames.""" | |
if self.running: | |
logger.warning("Audio sender is already running") | |
return | |
# Double-check frames before starting | |
if self.num_frames == 0: | |
logger.warning("No frames available before start_sending, recalculating") | |
self._calculate_frames() | |
# If we still have no frames, create fallback data | |
if self.num_frames == 0: | |
logger.warning("Creating fallback data in start_sending") | |
self.audio_data = np.zeros(self.sample_rate, dtype=np.int16) | |
self.num_frames = self.sample_rate // self.frame_size | |
logger.info(f"Starting audio sender with {self.num_frames} frames") | |
self.running = True | |
self._send_task = self.get_event_loop().create_task(self._send_loop()) | |
async def stop_sending(self): | |
"""Stop sending audio frames.""" | |
if not self.running: | |
return | |
logger.info("Stopping audio sender") | |
self.running = False | |
# Cancel the send task if it's running | |
if self._send_task and not self._send_task.done(): | |
try: | |
self._send_task.cancel() | |
# Add timeout to prevent hanging if task doesn't respond to cancellation | |
await asyncio.wait_for(self._send_task, timeout=2.0) | |
except asyncio.TimeoutError: | |
logger.warning("Timed out waiting for audio send task to cancel") | |
except Exception as e: | |
logger.error(f"Error cancelling send task: {e}") | |
async def _send_loop(self): | |
"""Send audio frames at a consistent rate.""" | |
try: | |
logger.debug("Starting SimpleAudioFrameInserter send loop") | |
# Verify that we have valid audio data and frame configuration | |
if self.audio_data is None or len(self.audio_data) == 0: | |
logger.error("No audio data available in send loop") | |
self.audio_data = np.zeros(self.sample_rate, dtype=np.int16) | |
self.num_frames = self.sample_rate // self.frame_size | |
logger.info(f"Created emergency fallback with {self.num_frames} frames") | |
if self.num_frames <= 0: | |
logger.error(f"Invalid number of frames: {self.num_frames}") | |
self.num_frames = self.sample_rate // self.frame_size | |
logger.info(f"Reset to {self.num_frames} frames") | |
# Use a sliding window approach rather than fixed start time | |
# to prevent drift over long periods | |
frame_index = 0 | |
next_frame_time = time.time() | |
logger.info(f"Send loop initialized with {self.num_frames} frames") | |
while self.running: | |
# Calculate frame start position in the audio data | |
frame_start = (frame_index % self.num_frames) * self.frame_size | |
# Get the audio data for this frame | |
frame_end = min(frame_start + self.frame_size, len(self.audio_data)) | |
frame_data = self.audio_data[frame_start:frame_end] | |
# Create and send the audio frame | |
audio_frame = OutputAudioRawFrame( | |
audio=frame_data.tobytes(), | |
sample_rate=self.sample_rate, | |
num_channels=1, | |
) | |
await self.push_frame(audio_frame) | |
# Increment frame index and wrap around if needed | |
frame_index += 1 | |
# Update next frame time (20ms per frame = 50fps) | |
next_frame_time += 0.02 | |
# Sleep until next frame time | |
sleep_time = max(0, next_frame_time - time.time()) | |
# If we're significantly behind, reset the timing | |
if sleep_time <= 0: | |
logger.warning(f"Audio frame sending is falling behind by {-sleep_time:.3f}s") | |
next_frame_time = time.time() + 0.02 | |
sleep_time = 0.02 | |
await asyncio.sleep(sleep_time) | |
except asyncio.CancelledError: | |
logger.info("Audio sending task cancelled") | |
except Exception as e: | |
logger.error(f"Error in audio sending loop: {e}") | |
self.running = False | |
async def teardown(self): | |
"""Clean up resources when the processor is shutting down.""" | |
logger.info("Tearing down SimpleAudioFrameInserter") | |
await self.stop_sending() | |
await super().teardown() | |
class DialInHandler: | |
"""Handles all dial-in related functionality and event handling. | |
This class encapsulates the logic for incoming calls and handling | |
all dial-in related events from the Daily platform. | |
""" | |
def __init__(self, transport, task, context_aggregator): | |
"""Initialize the DialInHandler. | |
Args: | |
transport: The Daily transport instance | |
task: The PipelineTask instance | |
context_aggregator: The context aggregator for the LLM | |
""" | |
self.transport = transport | |
self.task = task | |
self.context_aggregator = context_aggregator | |
self._register_handlers() | |
def _register_handlers(self): | |
"""Register all event handlers related to dial-in functionality.""" | |
@self.transport.event_handler("on_dialin_ready") | |
async def on_dialin_ready(transport, data): | |
"""Handler for when the dial-in is ready (SIP addresses registered with the SIP network).""" | |
# For Twilio, Telnyx, etc. You need to update the state of the call | |
# and forward it to the sip_uri. | |
logger.debug(f"Dial-in ready: {data}") | |
@self.transport.event_handler("on_dialin_connected") | |
async def on_dialin_connected(transport, data): | |
"""Handler for when a dial-in call is connected.""" | |
session_id = data.get("sessionId") | |
logger.debug(f"Dial-in connected: {data} and set_bot_ready") | |
logger.info(f"Dial-in session ID: {session_id}") | |
@self.transport.event_handler("on_dialin_stopped") | |
async def on_dialin_stopped(transport, data): | |
"""Handler for when a dial-in call is stopped.""" | |
logger.debug(f"Dial-in stopped: {data}") | |
@self.transport.event_handler("on_dialin_error") | |
async def on_dialin_error(transport, data): | |
"""Handler for dial-in errors.""" | |
logger.error(f"Dial-in error: {data}") | |
# The bot should leave the call if there is an error | |
await self.task.cancel() | |
@self.transport.event_handler("on_dialin_warning") | |
async def on_dialin_warning(transport, data): | |
"""Handler for dial-in warnings.""" | |
logger.warning(f"Dial-in warning: {data}") | |
@self.transport.event_handler("on_first_participant_joined") | |
async def on_first_participant_joined(transport, participant): | |
"""Handler for when the first participant joins the call.""" | |
logger.info("First participant joined: {}", participant["id"]) | |
# Capture the participant's transcription | |
await transport.capture_participant_transcription(participant["id"]) | |
# For the dial-in case, we want the bot to greet the user. | |
# We can prompt the bot to speak by putting the context into the pipeline. | |
await self.task.queue_frames([self.context_aggregator.user().get_context_frame()]) | |
class DialOutHandler: | |
"""Handles a single dial-out call and it is also managing retry attempts. | |
In addition handling all dial-out related events from the Daily platform.""" | |
def __init__(self, transport, task, dialout_setting, max_attempts=5): | |
"""Initialize the DialOutHandler for a single call. | |
Args: | |
transport: The Daily transport instance | |
task: The PipelineTask instance | |
dialout_setting: Configuration for this specific outbound call | |
max_attempts: Maximum number of dial-out attempts on a specific number | |
""" | |
self.transport = transport | |
self.task = task | |
self.dialout_setting = dialout_setting | |
self.max_attempts = max_attempts | |
self.attempt_count = 0 | |
self.status = "pending" # pending, connected, answered, failed, stopped | |
self._register_handlers() | |
logger.info(f"Initialized DialOutHandler for call: {dialout_setting}") | |
async def start(self): | |
"""Initiates an outbound call using the configured dial-out settings.""" | |
self.attempt_count += 1 | |
if self.attempt_count > self.max_attempts: | |
logger.error( | |
f"Max dialout attempts ({self.max_attempts}) reached for {self.dialout_setting}" | |
) | |
self.status = "failed" | |
return | |
logger.debug( | |
f"Dialout attempt {self.attempt_count}/{self.max_attempts} for {self.dialout_setting}" | |
) | |
try: | |
if "phoneNumber" in self.dialout_setting: | |
logger.info(f"Dialing number: {self.dialout_setting['phoneNumber']}") | |
if "callerId" in self.dialout_setting: | |
await self.transport.start_dialout( | |
{ | |
"phoneNumber": self.dialout_setting["phoneNumber"], | |
"callerId": self.dialout_setting["callerId"], | |
} | |
) | |
else: | |
await self.transport.start_dialout( | |
{"phoneNumber": self.dialout_setting["phoneNumber"]} | |
) | |
elif "sipUri" in self.dialout_setting: | |
logger.info(f"Dialing sipUri: {self.dialout_setting['sipUri']}") | |
await self.transport.start_dialout({"sipUri": self.dialout_setting["sipUri"]}) | |
except Exception as e: | |
logger.error(f"Error starting dialout: {e}") | |
self.status = "failed" | |
def _register_handlers(self): | |
"""Register all event handlers related to the dial-out functionality.""" | |
@self.transport.event_handler("on_dialout_connected") | |
async def on_dialout_connected(transport, data): | |
"""Handler for when a dial-out call is connected (starts ringing).""" | |
self.status = "connected" | |
logger.debug(f"Dial-out connected: {data}") | |
@self.transport.event_handler("on_dialout_answered") | |
async def on_dialout_answered(transport, data): | |
"""Handler for when a dial-out call is answered (off hook). We capture the transcription, but we do not | |
queue up a context frame, because we are waiting for the user to speak first.""" | |
self.status = "answered" | |
session_id = data.get("sessionId") | |
await transport.capture_participant_transcription(session_id) | |
logger.debug(f"Dial-out answered: {data}") | |
logger.info(f"Dial-out session ID: {session_id}") | |
@self.transport.event_handler("on_dialout_stopped") | |
async def on_dialout_stopped(transport, data): | |
"""Handler for when a dial-out call is stopped.""" | |
self.status = "stopped" | |
logger.debug(f"Dial-out stopped: {data}") | |
@self.transport.event_handler("on_dialout_error") | |
async def on_dialout_error(transport, data): | |
"""Handler for dial-out errors. Will retry this specific call.""" | |
self.status = "failed" | |
await self.start() # Retry could cause infinite loop if error persists | |
logger.error(f"Dial-out error: {data}, retrying...") | |
@self.transport.event_handler("on_dialout_warning") | |
async def on_dialout_warning(transport, data): | |
"""Handler for dial-out warnings.""" | |
logger.warning(f"Dial-out warning: {data}") | |
async def main(room_url: str, token: str, body: dict): | |
logger.debug("Starting bot in room: {}", room_url) | |
# Validate environment variables | |
validate_env_vars() | |
# Dial-in configuration: | |
# dialin_settings are received when a call is triggered to | |
# Daily via pinless_dialin. This can be a phone number on Daily or a | |
# sip interconnect from Twilio or Telnyx. | |
dialin_settings = None | |
dialled_phonenum = None | |
if raw_dialin_settings := body.get("dialin_settings"): | |
# these fields can capitalize the first letter | |
dialled_phonenum = raw_dialin_settings.get("To") or raw_dialin_settings.get("to") | |
caller_phonenum = raw_dialin_settings.get("From") or raw_dialin_settings.get("from") | |
dialin_settings = { | |
# these fields can be received as snake_case or camelCase. | |
"call_id": raw_dialin_settings.get("callId") or raw_dialin_settings.get("call_id"), | |
"call_domain": raw_dialin_settings.get("callDomain") | |
or raw_dialin_settings.get("call_domain"), | |
} | |
logger.debug( | |
f"Dialin settings: To: {dialled_phonenum}, From: {caller_phonenum}, dialin_settings: {dialin_settings}" | |
) | |
# Dial-out configuration | |
dialout_settings = body.get("dialout_settings") | |
logger.debug(f"Dialout settings: {dialout_settings}") | |
# Voicemail detection configuration | |
voicemail_detection = body.get("voicemail_detection") | |
using_voicemail_detection = bool(voicemail_detection and dialout_settings) | |
logger.debug(f"Using voicemail detection: {using_voicemail_detection}") | |
daily_api_key = os.getenv("DAILY_API_KEY") | |
logger.info(f"DAILY API Key: {daily_api_key}") | |
transport = DailyTransport( | |
room_url, | |
token, | |
"Voice AI Bot", | |
DailyParams( | |
# api_url="https://api.staging.daily.co/v1", | |
api_key=daily_api_key, # needed for dial-in | |
dialin_settings=dialin_settings, | |
audio_out_enabled=True, | |
audio_out_sample_rate=AUDIO_SAMPLE_RATE, | |
vad_enabled=True, | |
vad_analyzer=SileroVADAnalyzer(), | |
vad_audio_passthrough=True, | |
), | |
) | |
# Generate the tone data directly in memory | |
logger.info( | |
f"Generating tone data with freq=400, duration=50.0, sample_rate={AUDIO_SAMPLE_RATE}" | |
) | |
tone_data = generate_tone_data(freq=400, duration=50.0, sample_rate=AUDIO_SAMPLE_RATE) | |
logger.info(f"Generated tone data with {len(tone_data)} samples") | |
# Validate tone data | |
if len(tone_data) == 0: | |
logger.warning("Generated tone data is empty, creating fallback data") | |
tone_data = np.zeros(AUDIO_SAMPLE_RATE * 5, dtype=np.int16) # 5 seconds of silence | |
logger.info(f"Created fallback data with {len(tone_data)} samples") | |
logger.info(f"Creating SimpleAudioFrameInserter with {len(tone_data)} samples") | |
# Create the audio frame inserter with the generated tone data | |
audio_sender = SimpleAudioFrameInserter(audio_data=tone_data, sample_rate=AUDIO_SAMPLE_RATE) | |
# Configure your STT, LLM, and TTS services here | |
# Swap out different processors or properties to customize your bot | |
# stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) | |
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") | |
# tts = CartesiaTTSService( | |
# api_key=os.getenv("CARTESIA_API_KEY"), | |
# voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", | |
# ) | |
# Format phone number for better TTS pronunciation | |
if caller_phonenum: | |
# Remove any non-digit characters | |
cleaned = "".join(filter(str.isdigit, caller_phonenum)) | |
# Format as +1 (XXX) XXX XXXX | |
if len(cleaned) == 11 and cleaned.startswith("1"): | |
caller_phonenum = f"+1 ({cleaned[1:4]}) {cleaned[4:7]} {cleaned[7:]}" | |
elif len(cleaned) == 10: | |
caller_phonenum = f"+1 ({cleaned[0:3]}) {cleaned[3:6]} {cleaned[6:]}" | |
# Set up the initial context for the conversation | |
# You can specified initial system and assistant messages here | |
# or register tools for the LLM to use | |
tools = NotGiven() | |
# This sets up the LLM context by providing messages and tools | |
messages = [ | |
{ | |
"role": "system", | |
"content": """You are a helpfil voice AI bot that can answer questions. | |
""", | |
}, | |
] | |
context = OpenAILLMContext(messages, tools) | |
context_aggregator = llm.create_context_aggregator(context) | |
# A core voice AI pipeline | |
# Add additional processors to customize the bot's behavior | |
pipeline = Pipeline( | |
[ | |
transport.input(), | |
audio_sender, | |
transport.output(), | |
] | |
) | |
task = PipelineTask( | |
pipeline, | |
params=PipelineParams( | |
allow_interruptions=False, | |
audio_in_sample_rate=8000, | |
audio_out_sample_rate=8000, | |
enable_metrics=True, | |
enable_usage_metrics=True, | |
), | |
) | |
# Initialize handlers dict to keep references | |
handlers = {} | |
# Initialize appropriate handlers based on the call type | |
if dialin_settings: | |
handlers["dialin"] = DialInHandler(transport, task, context_aggregator) | |
if dialout_settings: | |
# Create a handler for each dial-out setting | |
# i.e., each phone number/sip address gets its own handler | |
# allows more control on retries and state management | |
handlers["dialout"] = [ | |
DialOutHandler(transport, task, setting) for setting in dialout_settings | |
] | |
# Set up general event handlers | |
@transport.event_handler("on_call_state_updated") | |
async def on_call_state_updated(transport, state): | |
logger.info(f"on_call_state_updated, state: {state}") | |
if state == "joined": | |
audio_sender.start_sending() | |
# Start all dial-out calls if configured | |
if dialout_settings and "dialout" in handlers: | |
for handler in handlers["dialout"]: | |
await handler.start() | |
elif state in ["left", "leaving", "error"]: | |
# Stop audio sending for any terminal state | |
logger.info(f"Call state {state}: stopping audio sender") | |
await audio_sender.stop_sending() | |
await task.cancel() | |
@transport.event_handler("on_joined") | |
async def on_joined(transport, data): | |
session_id = data["meetingSession"]["id"] | |
bot_id = data["participants"]["local"]["id"] | |
logger.info(f"Session ID: {session_id}, Bot ID: {bot_id}") | |
@transport.event_handler("on_participant_left") | |
async def on_participant_left(transport, participant, reason): | |
logger.debug(f"Participant left: {participant}, reason: {reason}") | |
await task.cancel() | |
runner = PipelineRunner(handle_sigint=False, force_gc=True) | |
await runner.run(task) | |
async def bot(args: DailySessionArguments): | |
"""Main bot entry point compatible with the FastAPI route handler. | |
Args: | |
room_url: The Daily room URL | |
token: The Daily room token | |
body: The configuration object from the request body can contain dialin_settings, dialout_settings, voicemail_detection, and call_transfer | |
session_id: The session ID for logging | |
""" | |
logger.info(f"Bot process initialized {args.room_url} {args.token}") | |
try: | |
await main(args.room_url, args.token, args.body) | |
logger.info("Bot process completed") | |
except Exception as e: | |
logger.exception(f"Error in bot process: {str(e)}") | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment