Note: This version of the document is 100% AI generated based off it reading the code for the chat method. I'll apply some human editing at some point. I really just wanted to document the event flow but it did such a nice job of breaking down the code itself I'm going to keep it around
The chat
method orchestrates a chat interaction with an external language model (via an asynchronous stream of chunks) and raises a series of events along the way. These events notify client-side code about the progress of the interaction, partial outputs (such as text and audio deltas), tool calls that may be triggered, and error conditions. In addition, events are used to record the start and end of the overall interaction and to update the session history.
The method performs the following high-level steps:
- Setup and Initialization: Prepare the options, messages, session, and synchronization primitives.
- Interaction Start: Signal the start of the interaction.
- Completion Start: Signal the start of receiving the completion (streaming) data.
- Streaming Response Processing: Process each incoming chunk, raising events as new text or audio data arrives.
- Completion End and Finalization: Once the stream ends, signal completion and (if necessary) process any tool calls; then update session history and signal the end of the interaction.
- Error Handling: In case of exceptions, signal system error events and (where applicable) complete the interaction with error notifications.
Below you’ll find a breakdown of each phase, including the events raised and their order.
-
Option Preparation:
The method begins by callingself.__interaction_setup(**kwargs)
which prepares the options (e.g.,completion_opts
,callback_opts
) needed for the request. -
Session and Message Setup:
The messages to be sent and the session manager (if provided) are extracted from the options and keyword arguments. -
Concurrency Control:
An asynchronous semaphore (async with self.semaphore
) is used to control concurrent access.
- Event:
interaction_start
Before starting the actual chat completion request, the method calls:This event signals that an interaction has begun. It provides an identifier (interaction_id = await self._raise_interaction_start(**opts['callback_opts'])
interaction_id
) that will later be used to mark the end of the interaction.
-
Event:
completion_start
Just before sending the request to the language model, the method raises:await self._raise_completion_start(opts["completion_opts"], **opts['callback_opts'])
This notifies that the completion phase (the stream of responses) is starting.
-
Initiating the Chat Completion Request:
The method then creates the chat completion stream:response = await self.client.chat.completions.create(**opts['completion_opts'])
The method then enters a loop to process each chunk received from the asynchronous stream (async for chunk in response:
).
For every chunk received:
- Handling Missing or Empty Choices:
- If
chunk.choices
isNone
, the chunk is skipped. - If
chunk.choices
is an empty list (i.e.,len(chunk.choices) == 0
), it signals that the stream has ended. Before leaving the streaming loop, several final events are raised (see below).
- If
For chunks that contain at least one choice:
-
Finish Reason Check:
- If the first choice (
first_choice = chunk.choices[0]
) has afinish_reason
(i.e., it is notNone
), this value is stored in a local variable (stop_reason
). The chunk itself is not processed further, but the finish reason is used later to determine the next steps.
- If the first choice (
-
Tool Call Fragments:
- If the choice’s delta contains a tool call fragment (
first_choice.delta.tool_calls is not None
), the method aggregates these fragments:The fragments are collected into theself.__handle_tool_use_fragment(first_choice.delta.tool_calls[0], tool_calls)
tool_calls
list for later processing.
- If the choice’s delta contains a tool call fragment (
-
Text Content Deltas:
- If the delta contains text content (
first_choice.delta.content is not None
):- The content is appended to a local accumulator (
collected_messages
). - Event:
text_delta
An event is raised with the new text fragment:This lets the client know about incremental text updates.await self._raise_text_delta(first_choice.delta.content, **opts['callback_opts'])
- The content is appended to a local accumulator (
- If the delta contains text content (
-
Audio Content Deltas:
- If the delta contains audio data (detected via
first_choice.delta.model_extra.get('audio', None)
):- The method extracts an audio identifier (if not already set) and checks for:
- Transcript:
If an audio transcript is available, it is treated like text and appended tocollected_messages
, and atext_delta
event is raised. - Audio Data:
If binary (base64-encoded) audio data is provided, an event is raised with an audio delta:await self._raise_event(AudioDeltaEvent(content_type="audio/L16", id=audio_id, content=b64_audio, **opts['callback_opts']))
- Transcript:
- The method extracts an audio identifier (if not already set) and checks for:
- If the delta contains audio data (detected via
When a chunk is received with an empty choices
list, this signals the end of the streaming process. At this point, the method proceeds as follows:
-
Raising Completion End:
- Event:
completion_end
The method raises a completion end event to report that streaming has finished, along with token usage data:await self._raise_completion_end(opts["completion_opts"], stop_reason=stop_reason, input_tokens=input_tokens, output_tokens=output_tokens, **opts['callback_opts'])
- Event:
-
Handling Tool Calls vs. Regular Completion:
The next steps depend on thestop_reason
determined earlier:-
A. If
stop_reason
Indicates Tool Calls ('tool_calls'
):- Tool Call Start Event:
- Event:
tool_call_start
The method signals the start of tool call processing:await self._raise_tool_call_start(tool_calls, vendor="open_ai", **opts['callback_opts'])
- Event:
- Processing Tool Calls:
The tool calls are executed via:result_messages = await self.__tool_calls_to_messages(tool_calls)
- If an error occurs during tool call execution, a system event is raised and tool call end is signaled with an empty result.
- Tool Call End Event:
- Event:
tool_call_end
After processing, the tool call end event is raised:await self._raise_tool_call_end(tool_calls, result_messages[1:], vendor="open_ai", **opts['callback_opts'])
- Event:
- History Update:
- Event:
history_event
The resulting messages (from tool calls) are appended to the session history:await self._raise_history_event(messages, **opts['callback_opts'])
- Event:
- Tool Call Start Event:
-
B. If the Interaction Is a Standard Chat Response (No Tool Calls):
- Save the Final Message:
- The collected text (and possibly audio transcript) is combined into the final output message.
- Depending on whether audio was involved, the message is saved via the session manager:
messages.append(await self._save_audio_interaction_to_session(...)) # for audio messages.append(await self._save_interaction_to_session(...)) # for text
- History Update:
- Event:
history_event
The updated message history is broadcast:await self._raise_history_event(messages, **opts['callback_opts'])
- Event:
- Ending the Interaction:
- Event:
interaction_end
Finally, the method signals that the interaction has ended:await self._raise_interaction_end(id=interaction_id, **opts['callback_opts'])
- The processing loop is terminated by setting
interacting
toFalse
.
- Event:
- Save the Final Message:
-
During the operation, if any errors occur, the code enters one of several exception handlers. In each case, relevant system events are raised to notify the client developers:
-
API Errors (e.g.,
openai.APIError
):- Event:
system_event
A system event is raised with details about the API error and the retry delay. - Exponential Backoff:
The method waits (using an exponential backoff strategy) before retrying.
- Event:
-
Bad Request Errors (
openai.BadRequestError
):- Event:
system_event
Raised with an "Invalid request" message. - Completion End:
The method raises a completion end event with a stop reason of"exception"
. - The error is then re-raised after logging.
- Event:
-
Timeout and Internal Server Errors:
- Both error types raise a
system_event
notifying the client about the timeout or internal server error, along with the planned delay before a retry.
- Both error types raise a
-
Other Exceptions:
- For any other exceptions, a system event is raised to signal an exception in chat completion.
- A completion end event is also raised with a stop reason of
"exception"
, and then the exception is re-raised.
Below is a simplified sequence diagram of the events raised during a typical chat operation:
-
Initialization:
- (No event) Options are set up, and the semaphore is acquired.
-
Interaction Start:
interaction_start
-
Completion Start:
completion_start
-
During Streaming (for each chunk):
text_delta
(for each text fragment received)AudioDeltaEvent
(for audio data chunks, if any)
-
End of Streaming:
completion_end
-
If Tool Calls Are Detected:
tool_call_start
- (Tool call execution occurs)
tool_call_end
history_event
-
If Standard Completion:
- Save final output to session.
history_event
interaction_end
-
Error Handling (if an error occurs at any point):
system_event
(with details about the error)- (May also trigger a
completion_end
with stop reason"exception"
)
This flow ensures that client developers receive real-time notifications for each significant phase of the chat interaction:
-
Start and End of Interaction:
Clients know exactly when an interaction begins and finishes. -
Incremental Updates:
Text and audio deltas are streamed incrementally, allowing for a responsive UI. -
Tool Call Handling:
Special events signal when additional actions (tool calls) need to be processed. -
Robust Error Handling:
System events inform the client about any issues, with details that help in diagnosing problems and implementing retry logic.
By following this documented event stream, client developers can build interfaces that respond to these events appropriately and provide users with a clear and timely view of the ongoing chat operation.