Created
July 23, 2025 00:17
-
-
Save secemp9/b39fcd9c26c84a6a38544656f4968200 to your computer and use it in GitHub Desktop.
openai toolcalling test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import time | |
import copy | |
from enum import Enum | |
from typing import Dict, List, Any, Optional, Union, Iterator | |
from dataclasses import dataclass, field | |
class ModelErrorType(Enum): | |
MAX_LENGTH = "1024" | |
MAX_COMPLETION_TOKENS = "max_completion_tokens" | |
STREAM_OPTIONS = "stream_options" | |
CITATIONS = "citations" | |
RATE_LIMIT = "rate_limit" | |
@dataclass | |
class ModelFeatures: | |
uses_max_completion_tokens: bool = False | |
class OpenAICompatibleClient: | |
"""Synchronous OpenAI-compatible API client with comprehensive error handling""" | |
def __init__(self, api_key: str, base_url: str = "https://api.openai.com/v1", provider: str = "openai"): | |
self.api_key = api_key | |
self.base_url = base_url.rstrip('/') | |
self.provider = provider | |
self.session = requests.Session() | |
# Session state | |
self.model_errors = {} | |
self.failed_keys = [] | |
self.current_error = None | |
# Model features mapping | |
self.MODEL_FEATURES = { | |
"o1": ModelFeatures(uses_max_completion_tokens=True), | |
"o1-preview": ModelFeatures(uses_max_completion_tokens=True), | |
"o1-mini": ModelFeatures(uses_max_completion_tokens=True), | |
"o1-pro": ModelFeatures(uses_max_completion_tokens=True), | |
"o3-mini": ModelFeatures(uses_max_completion_tokens=True), | |
} | |
def get_model_features(self, model_name: str) -> ModelFeatures: | |
"""Get features for a specific model""" | |
# Check exact matches first | |
if model_name in self.MODEL_FEATURES: | |
return self.MODEL_FEATURES[model_name] | |
# Check partial matches | |
for key, features in self.MODEL_FEATURES.items(): | |
if key in model_name: | |
return features | |
return ModelFeatures() | |
def apply_model_transformations(self, opts: Dict[str, Any]): | |
"""Apply model-specific parameter transformations""" | |
model = opts.get("model", "") | |
if not model: | |
return | |
features = self.get_model_features(model) | |
# Apply max_tokens -> max_completion_tokens transformation | |
if (features.uses_max_completion_tokens and | |
"max_tokens" in opts and | |
"max_completion_tokens" not in opts): | |
opts["max_completion_tokens"] = opts["max_tokens"] | |
del opts["max_tokens"] | |
def fix_tool_descriptions(self, opts: Dict[str, Any]): | |
"""Fix tool description length issues""" | |
tools = opts.get("tools", []) | |
if not tools: | |
return | |
tool_descriptions = {} | |
for tool in tools: | |
function = tool.get("function", {}) | |
description = function.get("description", "") | |
if len(description) <= 1024: | |
continue | |
# Truncate description line by line | |
truncated = "" | |
remainder = "" | |
for line in description.split('\n'): | |
if len(truncated + line) < 1024: | |
truncated += line + '\n' | |
else: | |
remainder += line + '\n' | |
function["description"] = truncated.strip() | |
if remainder.strip(): | |
tool_descriptions[function.get("name", "")] = remainder.strip() | |
# Add remainder to system message if any | |
if tool_descriptions: | |
content = '<additional-tool-usage-instructions>\n\n' | |
for name, description in tool_descriptions.items(): | |
content += f'<{name}>\n{description}\n</{name}>\n\n' | |
content += '</additional-tool-usage-instructions>' | |
messages = opts.get("messages", []) | |
# Find last system message and add content after it | |
for i in range(len(messages) - 1, -1, -1): | |
if messages[i].get("role") == "system": | |
messages.insert(i + 1, { | |
"role": "system", | |
"content": content | |
}) | |
break | |
def fix_citations(self, opts: Dict[str, Any]): | |
"""Remove citations from message content""" | |
messages = opts.get("messages", []) | |
for message in messages: | |
content = message.get("content") | |
if isinstance(content, list): | |
for item in content: | |
if isinstance(item, dict) and "citations" in item: | |
del item["citations"] | |
elif isinstance(content, dict) and "citations" in content: | |
del content["citations"] | |
def prepare_tool_messages(self, opts: Dict[str, Any]): | |
"""Ensure tool messages have string content""" | |
messages = opts.get("messages", []) | |
for msg in messages: | |
if msg.get("role") == "tool": | |
content = msg.get("content") | |
if isinstance(content, list): | |
# Convert array to string | |
text_parts = [ | |
item.get("text", "") for item in content | |
if isinstance(item, dict) and "text" in item | |
] | |
msg["content"] = "\n\n".join(filter(None, text_parts)) or "(empty content)" | |
elif not isinstance(content, str): | |
# Convert non-string to JSON | |
if content is None: | |
msg["content"] = "(empty content)" | |
else: | |
msg["content"] = json.dumps(content) | |
def create_headers(self) -> Dict[str, str]: | |
"""Create request headers""" | |
headers = {"Content-Type": "application/json"} | |
if self.api_key: | |
if self.provider == "azure": | |
headers["api-key"] = self.api_key | |
else: | |
headers["Authorization"] = f"Bearer {self.api_key}" | |
return headers | |
def get_endpoint(self) -> str: | |
"""Get the chat completions endpoint""" | |
if self.provider == "azure": | |
return f"{self.base_url}/chat/completions?api-version=2024-06-01" | |
else: | |
return f"{self.base_url}/chat/completions" | |
def is_rate_limit_error(self, error_msg: str) -> bool: | |
"""Check if error is rate limiting""" | |
if not error_msg: | |
return False | |
lower_msg = error_msg.lower() | |
return any(keyword in lower_msg for keyword in [ | |
"rate limit", "too many requests", "429" | |
]) | |
def handle_error_response(self, response: requests.Response, opts: Dict[str, Any], | |
attempt: int, max_attempts: int): | |
"""Handle API error responses with retry logic""" | |
try: | |
error_data = response.json() | |
except json.JSONDecodeError: | |
error_data = {"error": {"message": f"HTTP {response.status_code}: {response.reason}"}} | |
error_msg = "" | |
if isinstance(error_data, dict): | |
error_msg = (error_data.get("error", {}).get("message", "") or | |
error_data.get("message", "")) | |
# Rate limiting | |
if self.is_rate_limit_error(error_msg) or response.status_code == 429: | |
if attempt < max_attempts: | |
retry_after = response.headers.get("retry-after") | |
delay = int(retry_after) if retry_after and retry_after.isdigit() else min(2 ** attempt, 60) | |
print(f"Rate limited. Waiting {delay} seconds... (attempt {attempt + 1}/{max_attempts})") | |
time.sleep(delay) | |
return self.chat_completions(opts, attempt + 1, max_attempts) | |
# Authentication errors | |
if response.status_code in [401, 403] or any(keyword in error_msg.lower() for keyword in [ | |
"authentication", "unauthorized", "forbidden", "invalid api key" | |
]): | |
if self.api_key not in self.failed_keys: | |
self.failed_keys.append(self.api_key) | |
raise Exception(f"Authentication failed: {error_msg}") | |
# Max length errors | |
if "Expected a string with maximum length 1024" in error_msg: | |
if attempt < max_attempts: | |
print(f"Fixing tool description length issue... (attempt {attempt + 1}/{max_attempts})") | |
self.fix_tool_descriptions(opts) | |
return self.chat_completions(opts, attempt + 1, max_attempts) | |
# Max completion tokens errors | |
if "Use 'max_completion_tokens'" in error_msg: | |
if attempt < max_attempts and "max_tokens" in opts: | |
print(f"Converting max_tokens to max_completion_tokens... (attempt {attempt + 1}/{max_attempts})") | |
opts["max_completion_tokens"] = opts["max_tokens"] | |
del opts["max_tokens"] | |
return self.chat_completions(opts, attempt + 1, max_attempts) | |
# Stream options errors | |
if "stream_options" in error_msg: | |
if attempt < max_attempts and "stream_options" in opts: | |
print(f"Removing unsupported stream_options... (attempt {attempt + 1}/{max_attempts})") | |
del opts["stream_options"] | |
return self.chat_completions(opts, attempt + 1, max_attempts) | |
# Citations errors | |
if "Extra inputs are not permitted" in error_msg and "citations" in error_msg: | |
if attempt < max_attempts: | |
print(f"Removing citations... (attempt {attempt + 1}/{max_attempts})") | |
self.fix_citations(opts) | |
return self.chat_completions(opts, attempt + 1, max_attempts) | |
# Default error | |
raise Exception(f"API Error ({response.status_code}): {error_msg}") | |
def process_stream(self, response: requests.Response) -> Iterator[Dict[str, Any]]: | |
"""Process streaming response""" | |
buffer = "" | |
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True): | |
if not chunk: | |
continue | |
buffer += chunk | |
while '\n' in buffer: | |
line, buffer = buffer.split('\n', 1) | |
line = line.strip() | |
if line == "data: [DONE]": | |
return | |
if line.startswith("data: "): | |
data = line[6:].strip() | |
if not data: | |
continue | |
try: | |
parsed = json.loads(data) | |
yield parsed | |
except json.JSONDecodeError: | |
continue | |
def chat_completions(self, opts: Dict[str, Any], attempt: int = 0, max_attempts: int = 3): | |
"""Main chat completions method""" | |
if attempt >= max_attempts: | |
raise Exception("Max attempts reached") | |
# Prepare options | |
opts = copy.deepcopy(opts) | |
# Apply transformations | |
self.apply_model_transformations(opts) | |
self.prepare_tool_messages(opts) | |
# Setup request | |
endpoint = self.get_endpoint() | |
headers = self.create_headers() | |
try: | |
if opts.get("stream", False): | |
response = self.session.post(endpoint, headers=headers, json=opts, stream=True) | |
if not response.ok: | |
return self.handle_error_response(response, opts, attempt, max_attempts) | |
return self.process_stream(response) | |
else: | |
response = self.session.post(endpoint, headers=headers, json=opts) | |
if not response.ok: | |
return self.handle_error_response(response, opts, attempt, max_attempts) | |
result = response.json() | |
# Check for errors in successful response | |
if isinstance(result, dict) and "error" in result: | |
fake_response = type('obj', (object,), { | |
'status_code': 200, | |
'json': lambda: result, | |
'headers': {}, | |
'reason': 'OK' | |
}) | |
return self.handle_error_response(fake_response, opts, attempt, max_attempts) | |
return result | |
except requests.exceptions.RequestException as e: | |
if attempt < max_attempts - 1: | |
delay = min(2 ** attempt, 30) | |
print(f"Network error, retrying in {delay}s: {e}") | |
time.sleep(delay) | |
return self.chat_completions(opts, attempt + 1, max_attempts) | |
raise Exception(f"Network error: {e}") | |
# Enhanced Tool Calling Tester using the new client | |
class AdvancedToolCallingTester: | |
def __init__(self, api_key: str, base_url: str): | |
self.client = OpenAICompatibleClient(api_key, base_url) | |
self.test_results = [] | |
def create_test_tool(self): | |
"""Create a test tool for validation""" | |
return { | |
"type": "function", | |
"function": { | |
"name": "calculate", | |
"description": "Perform basic mathematical calculations", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"expression": { | |
"type": "string", | |
"description": "Mathematical expression to evaluate (e.g., '2 + 2', '10 * 5')" | |
} | |
}, | |
"required": ["expression"] | |
} | |
} | |
} | |
def execute_tool_call(self, tool_call: Dict[str, Any]) -> Dict[str, Any]: | |
"""Execute a tool call and return result""" | |
function_name = tool_call.get("function", {}).get("name", "") | |
arguments = tool_call.get("function", {}).get("arguments", "{}") | |
call_id = tool_call.get("id", "") | |
try: | |
args = json.loads(arguments) if arguments else {} | |
if function_name == "calculate": | |
expression = args.get("expression", "") | |
try: | |
# Simple eval for basic math (in production, use a safer parser) | |
result = eval(expression) | |
content = f"Calculation: {expression} = {result}" | |
except Exception as e: | |
content = f"Error calculating '{expression}': {str(e)}" | |
else: | |
content = f"Unknown function: {function_name}" | |
return { | |
"tool_call_id": call_id, | |
"content": content | |
} | |
except json.JSONDecodeError as e: | |
return { | |
"tool_call_id": call_id, | |
"content": f"Error parsing arguments: {e}" | |
} | |
def test_basic_completion(self, model: str) -> Dict[str, Any]: | |
"""Test basic completion without tools""" | |
print("Testing basic completion...") | |
try: | |
response = self.client.chat_completions({ | |
"model": model, | |
"messages": [{"role": "user", "content": "Say 'Hello, World!' exactly."}], | |
"max_tokens": 50 | |
}) | |
content = response.get("choices", [{}])[0].get("message", {}).get("content", "") | |
return { | |
"success": True, | |
"content": content, | |
"message": "Basic completion successful" | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"error": str(e), | |
"message": "Basic completion failed" | |
} | |
def test_tool_calling(self, model: str) -> Dict[str, Any]: | |
"""Test tool calling functionality""" | |
print("Testing tool calling...") | |
messages = [ | |
{"role": "system", "content": "You are a helpful assistant. Use the calculate tool when asked to do math."}, | |
{"role": "user", "content": "What is 15 + 27?"} | |
] | |
tools = [self.create_test_tool()] | |
try: | |
# First request - should trigger tool call | |
response = self.client.chat_completions({ | |
"model": model, | |
"messages": messages, | |
"tools": tools, | |
"max_tokens": 200 | |
}) | |
message = response.get("choices", [{}])[0].get("message", {}) | |
tool_calls = message.get("tool_calls", []) | |
if not tool_calls: | |
return { | |
"success": False, | |
"message": "No tool calls generated", | |
"response": response | |
} | |
# Execute tool call | |
tool_result = self.execute_tool_call(tool_calls[0]) | |
# Add assistant message and tool result | |
messages.append({ | |
"role": "assistant", | |
"content": message.get("content"), | |
"tool_calls": tool_calls | |
}) | |
messages.append({ | |
"role": "tool", | |
**tool_result | |
}) | |
# Second request - should respond to tool result | |
final_response = self.client.chat_completions({ | |
"model": model, | |
"messages": messages, | |
"tools": tools, | |
"max_tokens": 200 | |
}) | |
final_content = final_response.get("choices", [{}])[0].get("message", {}).get("content", "") | |
return { | |
"success": True, | |
"tool_calls": tool_calls, | |
"tool_result": tool_result, | |
"final_response": final_content, | |
"message": "Tool calling test successful" | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"error": str(e), | |
"message": "Tool calling test failed" | |
} | |
def test_streaming(self, model: str) -> Dict[str, Any]: | |
"""Test streaming functionality""" | |
print("Testing streaming...") | |
try: | |
stream = self.client.chat_completions({ | |
"model": model, | |
"messages": [{"role": "user", "content": "Count from 1 to 5."}], | |
"max_tokens": 100, | |
"stream": True | |
}) | |
chunks = [] | |
content = "" | |
for chunk in stream: | |
chunks.append(chunk) | |
delta = chunk.get("choices", [{}])[0].get("delta", {}) | |
if "content" in delta and delta["content"]: | |
content += delta["content"] | |
return { | |
"success": True, | |
"chunks_received": len(chunks), | |
"final_content": content, | |
"message": "Streaming test successful" | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"error": str(e), | |
"message": "Streaming test failed" | |
} | |
def run_comprehensive_test(self, model: str = "gpt-4o"): | |
"""Run all tests and provide comprehensive report""" | |
print(f"π Running comprehensive tests with model: {model}") | |
print(f"π‘ Base URL: {self.client.base_url}") | |
print(f"π API Key: {'*' * (len(self.client.api_key) - 4) + self.client.api_key[-4:] if self.client.api_key else 'None'}") | |
print("-" * 60) | |
# Test 1: Basic completion | |
basic_result = self.test_basic_completion(model) | |
print(f"β Basic Completion: {'PASS' if basic_result['success'] else 'FAIL'}") | |
if not basic_result['success']: | |
print(f" Error: {basic_result.get('error', 'Unknown error')}") | |
# Test 2: Tool calling | |
tool_result = self.test_tool_calling(model) | |
print(f"π§ Tool Calling: {'PASS' if tool_result['success'] else 'FAIL'}") | |
if not tool_result['success']: | |
print(f" Error: {tool_result.get('error', 'Unknown error')}") | |
else: | |
print(f" Tool calls generated: {len(tool_result.get('tool_calls', []))}") | |
print(f" Final response: {tool_result.get('final_response', '')[:100]}...") | |
# Test 3: Streaming | |
stream_result = self.test_streaming(model) | |
print(f"π‘ Streaming: {'PASS' if stream_result['success'] else 'FAIL'}") | |
if not stream_result['success']: | |
print(f" Error: {stream_result.get('error', 'Unknown error')}") | |
else: | |
print(f" Chunks received: {stream_result.get('chunks_received', 0)}") | |
print("-" * 60) | |
# Summary | |
total_tests = 3 | |
passed_tests = sum([ | |
basic_result['success'], | |
tool_result['success'], | |
stream_result['success'] | |
]) | |
print(f"π Test Summary: {passed_tests}/{total_tests} tests passed") | |
if passed_tests == total_tests: | |
print("π All tests passed! The API fully supports OpenAI-compatible operations.") | |
elif tool_result['success']: | |
print("β Tool calling works! The API supports the core functionality you need.") | |
else: | |
print("β οΈ Tool calling not supported. This API may not be suitable for tool-based workflows.") | |
return { | |
"basic_completion": basic_result, | |
"tool_calling": tool_result, | |
"streaming": stream_result, | |
"summary": { | |
"total_tests": total_tests, | |
"passed_tests": passed_tests, | |
"success_rate": passed_tests / total_tests | |
} | |
} | |
# CLI Interface | |
if __name__ == "__main__": | |
print("π§ Advanced OpenAI Compatible API Tester") | |
print("=" * 50) | |
print("Enter API Base URL (default: https://api.openai.com/v1):") | |
base_url = input().strip() or "https://api.openai.com/v1" | |
print("Enter API Key:") | |
api_key = input().strip() | |
if not api_key: | |
print("β API Key is required") | |
exit(1) | |
print("Enter model name (default: gpt-4o):") | |
model = input().strip() or "gpt-4o" | |
# Create tester and run comprehensive tests | |
tester = AdvancedToolCallingTester(api_key, base_url) | |
results = tester.run_comprehensive_test(model) | |
# Save results | |
with open("api_test_results.json", "w") as f: | |
json.dump(results, f, indent=2) | |
print(f"\nπΎ Detailed results saved to: api_test_results.json") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment