|
from agents import OpenAIResponsesModel, ModelSettings |
|
from agents.models.openai_responses import ConvertedTools |
|
from openai import AsyncOpenAI, AsyncStream |
|
from openai.types.responses import Response, ResponseStreamEvent |
|
from agents.items import TResponseInputItem |
|
from agents.agent_output import AgentOutputSchema |
|
from agents.handoffs import Handoff |
|
from agents.tool import Tool, ComputerTool, FunctionTool, WebSearchTool, FileSearchTool |
|
from agents.models.interface import ModelTracing |
|
from typing import Optional, Union, Literal, TypeVar, override, Any |
|
from agents.items import ItemHelpers |
|
from openai.types.responses import ( |
|
Response, |
|
ResponseCompletedEvent, |
|
ResponseStreamEvent, |
|
ResponseTextConfigParam, |
|
ToolParam, |
|
WebSearchToolParam, |
|
response_create_params, |
|
) |
|
from agents.exceptions import UserError |
|
from agents.version import __version__ |
|
from agents.logger import logger |
|
from agents import _debug |
|
import json |
|
|
|
|
|
# From the Responses API |
|
IncludeLiteral = Literal[ |
|
"file_search_call.results", |
|
"message.input_image.image_url", |
|
"computer_call_output.output.image_url", |
|
] |
|
_USER_AGENT = f"Agents/Python {__version__}" |
|
_HEADERS = {"User-Agent": _USER_AGENT} |
|
|
|
_T = TypeVar("_T") |
|
# Sentinel class used until PEP 0661 is accepted |
|
class NotGiven: |
|
""" |
|
A sentinel singleton class used to distinguish omitted keyword arguments |
|
from those passed in with the value None (which may have different behavior). |
|
|
|
For example: |
|
|
|
```py |
|
def get(timeout: Union[int, NotGiven, None] = NotGiven()) -> Response: ... |
|
|
|
|
|
get(timeout=1) # 1s timeout |
|
get(timeout=None) # No timeout |
|
get() # Default timeout behavior, which may not be statically known at the method definition. |
|
``` |
|
""" |
|
|
|
def __bool__(self) -> Literal[False]: |
|
return False |
|
|
|
@override |
|
def __repr__(self) -> str: |
|
return "NOT_GIVEN" |
|
|
|
|
|
NotGivenOr = Union[_T, NotGiven] |
|
NOT_GIVEN = NotGiven() |
|
|
|
class CustomResponsesModel(OpenAIResponsesModel): |
|
"""Custom Responses Model that allows for previous response id to be passed in""" |
|
previous_response_id: Optional[str] = None |
|
|
|
def __init__(self, model: str, openai_client: AsyncOpenAI) -> None: |
|
super().__init__(model, openai_client) |
|
self.previous_response_id = None |
|
|
|
async def _fetch_response( |
|
self, |
|
system_instructions: str | None, |
|
input: str | list[TResponseInputItem], |
|
model_settings: ModelSettings, |
|
tools: list[Tool], |
|
output_schema: AgentOutputSchema | None, |
|
handoffs: list[Handoff], |
|
stream: bool = False |
|
) -> Response | AsyncStream[ResponseStreamEvent]: |
|
list_input = ItemHelpers.input_to_new_input_list(input) |
|
|
|
parallel_tool_calls = ( |
|
True if model_settings.parallel_tool_calls and tools and len(tools) > 0 else NOT_GIVEN |
|
) |
|
|
|
tool_choice = Converter.convert_tool_choice(model_settings.tool_choice) |
|
converted_tools = Converter.convert_tools(tools, handoffs) |
|
response_format = Converter.get_response_format(output_schema) |
|
|
|
if _debug.DONT_LOG_MODEL_DATA: |
|
logger.debug("Calling LLM") |
|
else: |
|
logger.debug( |
|
f"Calling LLM {self.model} with input:\n" |
|
f"{json.dumps(list_input, indent=2)}\n" |
|
f"Tools:\n{json.dumps(converted_tools.tools, indent=2)}\n" |
|
f"Stream: {stream}\n" |
|
f"Tool choice: {tool_choice}\n" |
|
f"Response format: {response_format}\n" |
|
) |
|
|
|
# Required parameters |
|
params = { |
|
"model": self.model, |
|
"input": list_input, |
|
} |
|
print(f"previous_response_id: {self.previous_response_id}") |
|
# Optional parameters - only add if they have valid values |
|
optional_params = { |
|
"instructions": self._non_null_or_not_given(system_instructions), |
|
"include": converted_tools.includes, |
|
"tools": converted_tools.tools, |
|
"temperature": self._non_null_or_not_given(model_settings.temperature), |
|
"top_p": self._non_null_or_not_given(model_settings.top_p), |
|
"truncation": self._non_null_or_not_given(model_settings.truncation), |
|
"max_output_tokens": self._non_null_or_not_given(model_settings.max_tokens), |
|
"tool_choice": tool_choice, |
|
"parallel_tool_calls": parallel_tool_calls, |
|
"stream": stream, |
|
"extra_headers": _HEADERS, |
|
"text": response_format, |
|
"previous_response_id": self.previous_response_id |
|
} |
|
|
|
# Filter out None and NotGiven values |
|
filtered_params = { |
|
key: value for key, value in optional_params.items() |
|
if not isinstance(value, NotGiven) and value is not None |
|
} |
|
|
|
# Merge required and filtered optional parameters |
|
params.update(filtered_params) |
|
|
|
return await self._client.responses.create(**params) |
|
|
|
class Converter: |
|
@classmethod |
|
def convert_tool_choice( |
|
cls, tool_choice: Literal["auto", "required", "none"] | str | None |
|
) -> response_create_params.ToolChoice | NotGiven: |
|
if tool_choice is None: |
|
return NOT_GIVEN |
|
elif tool_choice == "required": |
|
return "required" |
|
elif tool_choice == "auto": |
|
return "auto" |
|
elif tool_choice == "none": |
|
return "none" |
|
elif tool_choice == "file_search": |
|
return { |
|
"type": "file_search", |
|
} |
|
elif tool_choice == "web_search_preview": |
|
return { |
|
"type": "web_search_preview", |
|
} |
|
elif tool_choice == "computer_use_preview": |
|
return { |
|
"type": "computer_use_preview", |
|
} |
|
else: |
|
return { |
|
"type": "function", |
|
"name": tool_choice, |
|
} |
|
|
|
@classmethod |
|
def get_response_format( |
|
cls, output_schema: AgentOutputSchema | None |
|
) -> ResponseTextConfigParam | NotGiven: |
|
if output_schema is None or output_schema.is_plain_text(): |
|
return NOT_GIVEN |
|
else: |
|
return { |
|
"format": { |
|
"type": "json_schema", |
|
"name": "final_output", |
|
"schema": output_schema.json_schema(), |
|
"strict": output_schema.strict_json_schema, |
|
} |
|
} |
|
|
|
@classmethod |
|
def convert_tools( |
|
cls, |
|
tools: list[Tool], |
|
handoffs: list[Handoff[Any]], |
|
) -> ConvertedTools: |
|
converted_tools: list[ToolParam] = [] |
|
includes: list[IncludeLiteral] = [] |
|
|
|
computer_tools = [tool for tool in tools if isinstance(tool, ComputerTool)] |
|
if len(computer_tools) > 1: |
|
raise UserError(f"You can only provide one computer tool. Got {len(computer_tools)}") |
|
|
|
for tool in tools: |
|
converted_tool, include = cls._convert_tool(tool) |
|
converted_tools.append(converted_tool) |
|
if include: |
|
includes.append(include) |
|
|
|
for handoff in handoffs: |
|
converted_tools.append(cls._convert_handoff_tool(handoff)) |
|
|
|
return ConvertedTools(tools=converted_tools, includes=includes) |
|
|
|
@classmethod |
|
def _convert_tool(cls, tool: Tool) -> tuple[ToolParam, IncludeLiteral | None]: |
|
"""Returns converted tool and includes""" |
|
|
|
if isinstance(tool, FunctionTool): |
|
converted_tool: ToolParam = { |
|
"name": tool.name, |
|
"parameters": tool.params_json_schema, |
|
"strict": tool.strict_json_schema, |
|
"type": "function", |
|
"description": tool.description, |
|
} |
|
includes: IncludeLiteral | None = None |
|
elif isinstance(tool, WebSearchTool): |
|
ws: WebSearchToolParam = { |
|
"type": "web_search_preview", |
|
"user_location": tool.user_location, |
|
"search_context_size": tool.search_context_size, |
|
} |
|
converted_tool = ws |
|
includes = None |
|
elif isinstance(tool, FileSearchTool): |
|
converted_tool = { |
|
"type": "file_search", |
|
"vector_store_ids": tool.vector_store_ids, |
|
} |
|
if tool.max_num_results: |
|
converted_tool["max_num_results"] = tool.max_num_results |
|
if tool.ranking_options: |
|
converted_tool["ranking_options"] = tool.ranking_options |
|
if tool.filters: |
|
converted_tool["filters"] = tool.filters |
|
|
|
includes = "file_search_call.results" if tool.include_search_results else None |
|
elif isinstance(tool, ComputerTool): |
|
converted_tool = { |
|
"type": "computer_use_preview", |
|
"environment": tool.computer.environment, |
|
"display_width": tool.computer.dimensions[0], |
|
"display_height": tool.computer.dimensions[1], |
|
} |
|
includes = None |
|
|
|
else: |
|
raise UserError(f"Unknown tool type: {type(tool)}, tool") |
|
|
|
return converted_tool, includes |
|
|
|
@classmethod |
|
def _convert_handoff_tool(cls, handoff: Handoff) -> ToolParam: |
|
return { |
|
"name": handoff.tool_name, |
|
"parameters": handoff.input_json_schema, |
|
"strict": handoff.strict_json_schema, |
|
"type": "function", |
|
"description": handoff.tool_description, |
|
} |