Skip to content

Instantly share code, notes, and snippets.

@donbr
Last active October 29, 2025 00:32
Show Gist options
  • Save donbr/b12a469ba457f75551758fffc34ab459 to your computer and use it in GitHub Desktop.
Save donbr/b12a469ba457f75551758fffc34ab459 to your computer and use it in GitHub Desktop.
a2a notebook
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Agent-to-Agent (A2A) Protocol - Client Tutorial\n",
"\n",
"This notebook demonstrates how to interact with an A2A-compliant agent service using the A2A client library. You'll learn how to:\n",
"\n",
"- Discover an agent's capabilities through AgentCards\n",
"- Send single messages to an agent\n",
"- Handle multi-turn conversations with context\n",
"- Work with streaming responses\n",
"\n",
"## Prerequisites\n",
"- A running A2A agent service (default: http://localhost:10000)\n",
"- Python 3.8+\n",
"- Required packages: a2a-sdk, httpx\n",
"\n",
"## What is A2A?\n",
"The Agent-to-Agent (A2A) protocol is a standardized way for AI agents to communicate with each other. It uses AgentCards (similar to OpenAPI specs) to describe agent capabilities and JSON-RPC for communication.\n",
"\n",
"## Starting the Agent Service\n",
"\n",
"Before running this notebook, start the A2A agent service:\n",
"\n",
"```bash\n",
"# From the project root\n",
"cd a2a_service\n",
"uv run python -m a2a_service\n",
"```\n",
"\n",
"The service should start on http://localhost:10000"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"✓ All imports successful!\n"
]
}
],
"source": [
"# Import required libraries\n",
"import logging\n",
"from typing import Any\n",
"from uuid import uuid4\n",
"import httpx\n",
"\n",
"from a2a.client import A2ACardResolver, ClientFactory, ClientConfig\n",
"from a2a.types import (\n",
" AgentCard,\n",
" MessageSendParams,\n",
" SendMessageRequest,\n",
" SendStreamingMessageRequest,\n",
")\n",
"from a2a.utils.constants import (\n",
" AGENT_CARD_WELL_KNOWN_PATH,\n",
" EXTENDED_AGENT_CARD_PATH,\n",
")\n",
"\n",
"# Configure logging to see what's happening\n",
"logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')\n",
"logger = logging.getLogger(__name__)\n",
"\n",
"print(\"✓ All imports successful!\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Configuration\n",
"\n",
"Before we start, let's configure the connection to our A2A agent service. The service should be running locally on port 10000.\n",
"\n",
"**Key Configuration Parameters:**\n",
"- `base_url`: The URL where your A2A agent service is running\n",
"- `timeout`: HTTP timeout for requests (increased to 60s for LLM responses)\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Agent Service URL: http://localhost:10000\n",
"Request Timeout: 60.0s\n"
]
}
],
"source": [
"# Configuration\n",
"base_url = 'http://localhost:10000'\n",
"timeout_seconds = 60.0\n",
"\n",
"print(f\"Agent Service URL: {base_url}\")\n",
"print(f\"Request Timeout: {timeout_seconds}s\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 1: Agent Discovery via AgentCards\n",
"\n",
"An **AgentCard** is a machine-readable description of an agent's capabilities, similar to an OpenAPI specification. It tells clients:\n",
"- What the agent can do\n",
"- What endpoints are available\n",
"- What authentication is required\n",
"- What features are supported\n",
"\n",
"### Two Types of AgentCards:\n",
"\n",
"1. **Public AgentCard** (`/.well-known/agent.card.json`):\n",
" - Publicly accessible without authentication\n",
" - Contains basic agent information\n",
" - May indicate if an extended card is available\n",
"\n",
"2. **Extended AgentCard** (`/agent.card.json`):\n",
" - Requires authentication\n",
" - May contain additional capabilities or sensitive information\n",
" - Only available if indicated in the public card\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"✓ HTTP client and resolver initialized\n"
]
}
],
"source": [
"# Create an async HTTP client with extended timeout\n",
"httpx_client = httpx.AsyncClient(timeout=httpx.Timeout(timeout_seconds))\n",
"\n",
"# Initialize the A2ACardResolver\n",
"# This helps us discover and fetch AgentCards from the service\n",
"resolver = A2ACardResolver(\n",
" httpx_client=httpx_client,\n",
" base_url=base_url,\n",
")\n",
"\n",
"print(\"✓ HTTP client and resolver initialized\")\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: Fetching public agent card from: http://localhost:10000/.well-known/agent-card.json\n",
"INFO: HTTP Request: GET http://localhost:10000/.well-known/agent-card.json \"HTTP/1.1 200 OK\"\n",
"INFO: Successfully fetched agent card data from http://localhost:10000/.well-known/agent-card.json: {'capabilities': {'pushNotifications': True, 'streaming': True}, 'defaultInputModes': ['text', 'text/plain'], 'defaultOutputModes': ['text', 'text/plain'], 'description': 'A helpful AI assistant with web search, academic paper search, and document retrieval capabilities', 'name': 'General Purpose Agent', 'preferredTransport': 'JSONRPC', 'protocolVersion': '0.3.0', 'skills': [{'description': 'Search the web for current information', 'examples': ['What are the latest news about AI?'], 'id': 'web_search', 'name': 'Web Search Tool', 'tags': ['search', 'web', 'internet']}, {'description': 'Search for academic papers on arXiv', 'examples': ['Find recent papers on large language models'], 'id': 'arxiv_search', 'name': 'Academic Paper Search', 'tags': ['research', 'papers', 'academic']}, {'description': 'Search through loaded documents for specific information', 'examples': ['What do the policy documents say about student loans?'], 'id': 'rag_search', 'name': 'Document Retrieval', 'tags': ['documents', 'rag', 'retrieval']}], 'url': 'http://localhost:10000/', 'version': '1.0.0'}\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"============================================================\n",
"PUBLIC AGENT CARD\n",
"============================================================\n",
"{\n",
" \"capabilities\": {\n",
" \"pushNotifications\": true,\n",
" \"streaming\": true\n",
" },\n",
" \"defaultInputModes\": [\n",
" \"text\",\n",
" \"text/plain\"\n",
" ],\n",
" \"defaultOutputModes\": [\n",
" \"text\",\n",
" \"text/plain\"\n",
" ],\n",
" \"description\": \"A helpful AI assistant with web search, academic paper search, and document retrieval capabilities\",\n",
" \"name\": \"General Purpose Agent\",\n",
" \"preferredTransport\": \"JSONRPC\",\n",
" \"protocolVersion\": \"0.3.0\",\n",
" \"skills\": [\n",
" {\n",
" \"description\": \"Search the web for current information\",\n",
" \"examples\": [\n",
" \"What are the latest news about AI?\"\n",
" ],\n",
" \"id\": \"web_search\",\n",
" \"name\": \"Web Search Tool\",\n",
" \"tags\": [\n",
" \"search\",\n",
" \"web\",\n",
" \"internet\"\n",
" ]\n",
" },\n",
" {\n",
" \"description\": \"Search for academic papers on arXiv\",\n",
" \"examples\": [\n",
" \"Find recent papers on large language models\"\n",
" ],\n",
" \"id\": \"arxiv_search\",\n",
" \"name\": \"Academic Paper Search\",\n",
" \"tags\": [\n",
" \"research\",\n",
" \"papers\",\n",
" \"academic\"\n",
" ]\n",
" },\n",
" {\n",
" \"description\": \"Search through loaded documents for specific information\",\n",
" \"examples\": [\n",
" \"What do the policy documents say about student loans?\"\n",
" ],\n",
" \"id\": \"rag_search\",\n",
" \"name\": \"Document Retrieval\",\n",
" \"tags\": [\n",
" \"documents\",\n",
" \"rag\",\n",
" \"retrieval\"\n",
" ]\n",
" }\n",
" ],\n",
" \"url\": \"http://localhost:10000/\",\n",
" \"version\": \"1.0.0\"\n",
"}\n",
"\n",
"✓ Successfully fetched public agent card\n"
]
}
],
"source": [
"# Fetch the public agent card\n",
"try:\n",
" logger.info(f'Fetching public agent card from: {base_url}{AGENT_CARD_WELL_KNOWN_PATH}')\n",
" public_card = await resolver.get_agent_card()\n",
" \n",
" print(\"\\n\" + \"=\"*60)\n",
" print(\"PUBLIC AGENT CARD\")\n",
" print(\"=\"*60)\n",
" print(public_card.model_dump_json(indent=2, exclude_none=True))\n",
" \n",
" # Track which card we'll use\n",
" final_agent_card = public_card\n",
" \n",
" print(\"\\n✓ Successfully fetched public agent card\")\n",
" \n",
"except Exception as e:\n",
" logger.error(f'Failed to fetch public agent card: {e}')\n",
" raise RuntimeError('Cannot continue without agent card') from e\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 2: Initialize the A2A Client\n",
"\n",
"Now that we have the AgentCard, we can create a client to communicate with the agent. The client uses:\n",
"- **ClientFactory**: Creates properly configured clients\n",
"- **JSON-RPC transport**: Default protocol for A2A communication\n"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"✓ A2A Client initialized successfully\n",
" Ready to communicate with agent at: http://localhost:10000\n"
]
}
],
"source": [
"# Create ClientFactory with configuration\n",
"factory = ClientFactory(\n",
" ClientConfig(\n",
" httpx_client=httpx_client,\n",
" # JSON-RPC is the default transport\n",
" )\n",
")\n",
"\n",
"# Create client using the factory and agent card\n",
"client = factory.create(card=final_agent_card)\n",
"\n",
"print(\"✓ A2A Client initialized successfully\")\n",
"print(f\" Ready to communicate with agent at: {base_url}\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 3: Sending a Single Message\n",
"\n",
"Let's send our first message to the agent! This is a simple one-shot interaction.\n",
"\n",
"### Key Components:\n",
"- **Message**: Contains the user's question/request\n",
"- **Parts**: Can include text, images, or other content types\n",
"- **message_id**: Unique identifier for tracking\n",
"- **Request ID**: Unique identifier for the JSON-RPC request\n"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: HTTP Request: POST http://localhost:10000/ \"HTTP/1.1 200 OK\"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Sending message to agent...\n",
"\n",
"============================================================\n",
"STREAMING RESPONSE CHUNKS\n",
"============================================================\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: New task created with id: be3ebcff-66b3-4c5e-ba02-462897182869\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"--- Chunk 1 ---\n",
"\n",
"============================================================\n",
"STREAMING COMPLETE - Received 1 chunks\n",
"============================================================\n"
]
},
{
"ename": "AttributeError",
"evalue": "'tuple' object has no attribute 'model_dump'",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mAttributeError\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[9]\u001b[39m\u001b[32m, line 22\u001b[39m\n\u001b[32m 20\u001b[39m chunk_count += \u001b[32m1\u001b[39m\n\u001b[32m 21\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33mf\u001b[39m\u001b[33m\"\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[33m--- Chunk \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mchunk_count\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m ---\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m---> \u001b[39m\u001b[32m22\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[43mchunk\u001b[49m\u001b[43m.\u001b[49m\u001b[43mmodel_dump\u001b[49m(mode=\u001b[33m\"\u001b[39m\u001b[33mjson\u001b[39m\u001b[33m\"\u001b[39m, exclude_none=\u001b[38;5;28;01mTrue\u001b[39;00m))\n\u001b[32m 23\u001b[39m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[32m 24\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33m\"\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[33m\"\u001b[39m + \u001b[33m\"\u001b[39m\u001b[33m=\u001b[39m\u001b[33m\"\u001b[39m * \u001b[32m60\u001b[39m)\n",
"\u001b[31mAttributeError\u001b[39m: 'tuple' object has no attribute 'model_dump'"
]
}
],
"source": [
"# Construct the message (NOT a SendMessageRequest)\n",
"message = {\n",
" \"role\": \"user\",\n",
" \"parts\": [\n",
" {\"kind\": \"text\", \"text\": \"What are the latest developments in artificial intelligence?\"}\n",
" ],\n",
" \"message_id\": uuid4().hex,\n",
"}\n",
"\n",
"print(\"Sending message to agent...\")\n",
"print(\"\\n\" + \"=\" * 60)\n",
"print(\"STREAMING RESPONSE CHUNKS\")\n",
"print(\"=\" * 60)\n",
"\n",
"chunk_count = 0\n",
"try:\n",
" # BaseClient.send_message expects a Message (dict or model)\n",
" # When the client is configured for streaming, this returns an async generator of events\n",
" async for chunk in client.send_message(message):\n",
" chunk_count += 1\n",
" print(f\"\\n--- Chunk {chunk_count} ---\")\n",
" print(chunk.model_dump(mode=\"json\", exclude_none=True))\n",
"finally:\n",
" print(\"\\n\" + \"=\" * 60)\n",
" print(f\"STREAMING COMPLETE - Received {chunk_count} chunks\")\n",
" print(\"=\" * 60)\n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: HTTP Request: POST http://localhost:10000/ \"HTTP/1.1 200 OK\"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Sending message to agent...\n",
"\n",
"============================================================\n",
"STREAMING RESPONSE CHUNKS\n",
"============================================================\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: New task created with id: a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"--- Chunk 1 (artifacts=None context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6' history=[Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='ee844a5befa5430da1a7cf901a0e53ed', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='What are the latest developments in artificial intelligence?'))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6')] id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6' kind='task' metadata=None status=TaskStatus(message=None, state=<TaskState.submitted: 'submitted'>, timestamp=None)) ---\n",
"null\n",
"\n",
"--- Chunk 2 (artifacts=None context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6' history=[Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='ee844a5befa5430da1a7cf901a0e53ed', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='What are the latest developments in artificial intelligence?'))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6'), Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='fa9fcf69-c049-48df-a535-7fc72ccb6c94', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Searching for information...'))], reference_task_ids=None, role=<Role.agent: 'agent'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6')] id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6' kind='task' metadata=None status=TaskStatus(message=Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='fa9fcf69-c049-48df-a535-7fc72ccb6c94', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Searching for information...'))], reference_task_ids=None, role=<Role.agent: 'agent'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6'), state=<TaskState.working: 'working'>, timestamp='2025-10-28T23:39:18.474921+00:00')) ---\n",
"{\n",
" \"contextId\": \"94f9cbdb-857a-479f-a71d-6c5fe87e91c6\",\n",
" \"final\": false,\n",
" \"kind\": \"status-update\",\n",
" \"status\": {\n",
" \"message\": {\n",
" \"contextId\": \"94f9cbdb-857a-479f-a71d-6c5fe87e91c6\",\n",
" \"kind\": \"message\",\n",
" \"messageId\": \"fa9fcf69-c049-48df-a535-7fc72ccb6c94\",\n",
" \"parts\": [\n",
" {\n",
" \"kind\": \"text\",\n",
" \"text\": \"Searching for information...\"\n",
" }\n",
" ],\n",
" \"role\": \"agent\",\n",
" \"taskId\": \"a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6\"\n",
" },\n",
" \"state\": \"working\",\n",
" \"timestamp\": \"2025-10-28T23:39:18.474921+00:00\"\n",
" },\n",
" \"taskId\": \"a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6\"\n",
"}\n",
"\n",
"--- Chunk 3 (artifacts=None context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6' history=[Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='ee844a5befa5430da1a7cf901a0e53ed', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='What are the latest developments in artificial intelligence?'))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6'), Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='fa9fcf69-c049-48df-a535-7fc72ccb6c94', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Searching for information...'))], reference_task_ids=None, role=<Role.agent: 'agent'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6'), Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='acb00d8d-352e-4b8e-a76d-612f68a84ec3', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Processing the results...'))], reference_task_ids=None, role=<Role.agent: 'agent'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6')] id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6' kind='task' metadata=None status=TaskStatus(message=Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='acb00d8d-352e-4b8e-a76d-612f68a84ec3', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Processing the results...'))], reference_task_ids=None, role=<Role.agent: 'agent'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6'), state=<TaskState.working: 'working'>, timestamp='2025-10-28T23:39:22.244464+00:00')) ---\n",
"{\n",
" \"contextId\": \"94f9cbdb-857a-479f-a71d-6c5fe87e91c6\",\n",
" \"final\": false,\n",
" \"kind\": \"status-update\",\n",
" \"status\": {\n",
" \"message\": {\n",
" \"contextId\": \"94f9cbdb-857a-479f-a71d-6c5fe87e91c6\",\n",
" \"kind\": \"message\",\n",
" \"messageId\": \"acb00d8d-352e-4b8e-a76d-612f68a84ec3\",\n",
" \"parts\": [\n",
" {\n",
" \"kind\": \"text\",\n",
" \"text\": \"Processing the results...\"\n",
" }\n",
" ],\n",
" \"role\": \"agent\",\n",
" \"taskId\": \"a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6\"\n",
" },\n",
" \"state\": \"working\",\n",
" \"timestamp\": \"2025-10-28T23:39:22.244464+00:00\"\n",
" },\n",
" \"taskId\": \"a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6\"\n",
"}\n",
"\n",
"--- Chunk 4 (artifacts=[Artifact(artifact_id='712a07b2-a577-4e58-bd4b-e8e97083b486', description=None, extensions=None, metadata=None, name='result', parts=[Part(root=TextPart(kind='text', metadata=None, text=\"The latest developments in artificial intelligence include several exciting trends and advancements:\\n\\n1. Quantum AI: This approach leverages quantum computing to enhance AI algorithms, enabling faster solutions to complex problems in fields like material science, encryption, and system optimization.\\n\\n2. AutoML and Multi-modal AI: Automation in machine learning processes is advancing, and AI systems are increasingly capable of processing and integrating multiple data types such as text, images, and speech.\\n\\n3. Explainable AI: There is a growing focus on transparency in AI decision-making to make AI systems more understandable and trustworthy.\\n\\n4. Enhanced Computer Vision and Neural Networks: Improvements continue in AI's ability to interpret visual data and learn more efficiently.\\n\\n5. Digital Twins: AI-driven digital simulations of real-world objects are becoming more prominent, aiding industries in modeling and optimization.\\n\\n6. Autonomous AI Agents: AI systems are being designed to proactively perform tasks and make decisions, reducing human cognitive load and improving workflows in sectors like healthcare and banking.\\n\\n7. AI as Scientific Collaborators: AI systems are now autonomously generating, testing, and validating scientific hypotheses, contributing to research in biology and other fields.\\n\\n8. Structured Reasoning in Embodied AI: AI systems are beginning to reason step-by-step before acting in the physical world, enhancing robotics and automation.\\n\\n9. AI Safety and Policy: There is ongoing development in AI safety research and evolving geopolitical approaches to AI governance.\\n\\nThese developments reflect a broad and dynamic evolution of AI technologies impacting various industries and research domains.\"))])] context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6' history=[Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='ee844a5befa5430da1a7cf901a0e53ed', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='What are the latest developments in artificial intelligence?'))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6'), Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='fa9fcf69-c049-48df-a535-7fc72ccb6c94', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Searching for information...'))], reference_task_ids=None, role=<Role.agent: 'agent'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6'), Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='acb00d8d-352e-4b8e-a76d-612f68a84ec3', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Processing the results...'))], reference_task_ids=None, role=<Role.agent: 'agent'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6')] id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6' kind='task' metadata=None status=TaskStatus(message=Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='acb00d8d-352e-4b8e-a76d-612f68a84ec3', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Processing the results...'))], reference_task_ids=None, role=<Role.agent: 'agent'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6'), state=<TaskState.working: 'working'>, timestamp='2025-10-28T23:39:22.244464+00:00')) ---\n",
"{\n",
" \"artifact\": {\n",
" \"artifactId\": \"712a07b2-a577-4e58-bd4b-e8e97083b486\",\n",
" \"name\": \"result\",\n",
" \"parts\": [\n",
" {\n",
" \"kind\": \"text\",\n",
" \"text\": \"The latest developments in artificial intelligence include several exciting trends and advancements:\\n\\n1. Quantum AI: This approach leverages quantum computing to enhance AI algorithms, enabling faster solutions to complex problems in fields like material science, encryption, and system optimization.\\n\\n2. AutoML and Multi-modal AI: Automation in machine learning processes is advancing, and AI systems are increasingly capable of processing and integrating multiple data types such as text, images, and speech.\\n\\n3. Explainable AI: There is a growing focus on transparency in AI decision-making to make AI systems more understandable and trustworthy.\\n\\n4. Enhanced Computer Vision and Neural Networks: Improvements continue in AI's ability to interpret visual data and learn more efficiently.\\n\\n5. Digital Twins: AI-driven digital simulations of real-world objects are becoming more prominent, aiding industries in modeling and optimization.\\n\\n6. Autonomous AI Agents: AI systems are being designed to proactively perform tasks and make decisions, reducing human cognitive load and improving workflows in sectors like healthcare and banking.\\n\\n7. AI as Scientific Collaborators: AI systems are now autonomously generating, testing, and validating scientific hypotheses, contributing to research in biology and other fields.\\n\\n8. Structured Reasoning in Embodied AI: AI systems are beginning to reason step-by-step before acting in the physical world, enhancing robotics and automation.\\n\\n9. AI Safety and Policy: There is ongoing development in AI safety research and evolving geopolitical approaches to AI governance.\\n\\nThese developments reflect a broad and dynamic evolution of AI technologies impacting various industries and research domains.\"\n",
" }\n",
" ]\n",
" },\n",
" \"contextId\": \"94f9cbdb-857a-479f-a71d-6c5fe87e91c6\",\n",
" \"kind\": \"artifact-update\",\n",
" \"taskId\": \"a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6\"\n",
"}\n",
"\n",
"--- Chunk 5 (artifacts=[Artifact(artifact_id='712a07b2-a577-4e58-bd4b-e8e97083b486', description=None, extensions=None, metadata=None, name='result', parts=[Part(root=TextPart(kind='text', metadata=None, text=\"The latest developments in artificial intelligence include several exciting trends and advancements:\\n\\n1. Quantum AI: This approach leverages quantum computing to enhance AI algorithms, enabling faster solutions to complex problems in fields like material science, encryption, and system optimization.\\n\\n2. AutoML and Multi-modal AI: Automation in machine learning processes is advancing, and AI systems are increasingly capable of processing and integrating multiple data types such as text, images, and speech.\\n\\n3. Explainable AI: There is a growing focus on transparency in AI decision-making to make AI systems more understandable and trustworthy.\\n\\n4. Enhanced Computer Vision and Neural Networks: Improvements continue in AI's ability to interpret visual data and learn more efficiently.\\n\\n5. Digital Twins: AI-driven digital simulations of real-world objects are becoming more prominent, aiding industries in modeling and optimization.\\n\\n6. Autonomous AI Agents: AI systems are being designed to proactively perform tasks and make decisions, reducing human cognitive load and improving workflows in sectors like healthcare and banking.\\n\\n7. AI as Scientific Collaborators: AI systems are now autonomously generating, testing, and validating scientific hypotheses, contributing to research in biology and other fields.\\n\\n8. Structured Reasoning in Embodied AI: AI systems are beginning to reason step-by-step before acting in the physical world, enhancing robotics and automation.\\n\\n9. AI Safety and Policy: There is ongoing development in AI safety research and evolving geopolitical approaches to AI governance.\\n\\nThese developments reflect a broad and dynamic evolution of AI technologies impacting various industries and research domains.\"))])] context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6' history=[Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='ee844a5befa5430da1a7cf901a0e53ed', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='What are the latest developments in artificial intelligence?'))], reference_task_ids=None, role=<Role.user: 'user'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6'), Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='fa9fcf69-c049-48df-a535-7fc72ccb6c94', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Searching for information...'))], reference_task_ids=None, role=<Role.agent: 'agent'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6'), Message(context_id='94f9cbdb-857a-479f-a71d-6c5fe87e91c6', extensions=None, kind='message', message_id='acb00d8d-352e-4b8e-a76d-612f68a84ec3', metadata=None, parts=[Part(root=TextPart(kind='text', metadata=None, text='Processing the results...'))], reference_task_ids=None, role=<Role.agent: 'agent'>, task_id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6')] id='a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6' kind='task' metadata=None status=TaskStatus(message=None, state=<TaskState.completed: 'completed'>, timestamp='2025-10-28T23:39:29.619466+00:00')) ---\n",
"{\n",
" \"contextId\": \"94f9cbdb-857a-479f-a71d-6c5fe87e91c6\",\n",
" \"final\": true,\n",
" \"kind\": \"status-update\",\n",
" \"status\": {\n",
" \"state\": \"completed\",\n",
" \"timestamp\": \"2025-10-28T23:39:29.619466+00:00\"\n",
" },\n",
" \"taskId\": \"a3ef20e7-d3b6-4f6c-84f0-5fd3f17206c6\"\n",
"}\n",
"\n",
"============================================================\n",
"STREAMING COMPLETE - Received 5 chunks\n",
"============================================================\n"
]
}
],
"source": [
"import json\n",
"import dataclasses\n",
"from dataclasses import asdict\n",
"\n",
"# Construct the message (NOT a SendMessageRequest)\n",
"message = {\n",
" \"role\": \"user\",\n",
" \"parts\": [\n",
" {\"kind\": \"text\", \"text\": \"What are the latest developments in artificial intelligence?\"}\n",
" ],\n",
" \"message_id\": uuid4().hex,\n",
"}\n",
"\n",
"def to_jsonable(obj):\n",
" # pydantic v2 models\n",
" if hasattr(obj, \"model_dump\"):\n",
" return obj.model_dump(mode=\"json\", exclude_none=True)\n",
" # dataclasses\n",
" if dataclasses.is_dataclass(obj):\n",
" return asdict(obj)\n",
" # dicts are fine\n",
" if isinstance(obj, dict):\n",
" return obj\n",
" # strings/numbers, etc.\n",
" return obj\n",
"\n",
"print(\"Sending message to agent...\")\n",
"print(\"\\n\" + \"=\" * 60)\n",
"print(\"STREAMING RESPONSE CHUNKS\")\n",
"print(\"=\" * 60)\n",
"\n",
"chunk_count = 0\n",
"try:\n",
" # BaseClient.send_message yields streaming events\n",
" async for chunk in client.send_message(message):\n",
" chunk_count += 1\n",
"\n",
" # Normalize common shapes:\n",
" # 1) (event_type, payload)\n",
" # 2) object with .type/.delta/.message\n",
" # 3) raw dict / model\n",
" if isinstance(chunk, tuple) and len(chunk) == 2:\n",
" etype, payload = chunk\n",
" else:\n",
" etype = getattr(chunk, \"type\", None)\n",
" payload = (\n",
" getattr(chunk, \"delta\", None)\n",
" or getattr(chunk, \"message\", None)\n",
" or getattr(chunk, \"payload\", None)\n",
" or chunk\n",
" )\n",
"\n",
" print(f\"\\n--- Chunk {chunk_count} ({etype or 'event'}) ---\")\n",
"\n",
" # Prefer printing text deltas inline for a nicer UX\n",
" if isinstance(payload, str):\n",
" print(payload, end=\"\", flush=True)\n",
" else:\n",
" print(json.dumps(to_jsonable(payload), indent=2, default=str))\n",
"\n",
"finally:\n",
" print(\"\\n\" + \"=\" * 60)\n",
" print(f\"STREAMING COMPLETE - Received {chunk_count} chunks\")\n",
" print(\"=\" * 60)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 4: Multi-Turn Conversations\n",
"\n",
"A2A supports multi-turn conversations where the agent maintains context across messages. This is crucial for follow-up questions or complex interactions.\n",
"\n",
"### Key Concepts:\n",
"- **context_id**: Identifies the conversation thread\n",
"- **task_id**: Identifies the specific task within a context\n",
"- Both IDs must be included in follow-up messages to maintain context\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# First message in a multi-turn conversation\n",
"first_message_payload = {\n",
" 'message': {\n",
" 'role': 'user',\n",
" 'parts': [\n",
" {\n",
" 'kind': 'text',\n",
" 'text': 'Find me recent papers on transformer architectures',\n",
" }\n",
" ],\n",
" 'message_id': uuid4().hex,\n",
" },\n",
"}\n",
"\n",
"request = SendMessageRequest(\n",
" id=str(uuid4()),\n",
" params=MessageSendParams(**first_message_payload),\n",
")\n",
"\n",
"print(\"Sending first message in conversation...\")\n",
"response = await client.send_message(request)\n",
"\n",
"print(\"\\n\" + \"=\"*60)\n",
"print(\"FIRST RESPONSE\")\n",
"print(\"=\"*60)\n",
"print(response.model_dump(mode='json', exclude_none=True))\n",
"\n",
"# Extract IDs for context continuation\n",
"task_id = response.root.result.id\n",
"context_id = response.root.result.context_id\n",
"\n",
"print(f\"\\n✓ Captured conversation IDs:\")\n",
"print(f\" task_id: {task_id}\")\n",
"print(f\" context_id: {context_id}\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Second message - includes context IDs for continuation\n",
"second_message_payload = {\n",
" 'message': {\n",
" 'role': 'user',\n",
" 'parts': [\n",
" {\n",
" 'kind': 'text',\n",
" 'text': 'Can you summarize the key findings?'\n",
" }\n",
" ],\n",
" 'message_id': uuid4().hex,\n",
" 'task_id': task_id,\n",
" 'context_id': context_id,\n",
" },\n",
"}\n",
"\n",
"second_request = SendMessageRequest(\n",
" id=str(uuid4()),\n",
" params=MessageSendParams(**second_message_payload),\n",
")\n",
"\n",
"print(\"Sending follow-up message with context...\")\n",
"second_response = await client.send_message(second_request)\n",
"\n",
"print(\"\\n\" + \"=\"*60)\n",
"print(\"FOLLOW-UP RESPONSE\")\n",
"print(\"=\"*60)\n",
"print(second_response.model_dump(mode='json', exclude_none=True))\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 5: Streaming Responses\n",
"\n",
"For longer responses, streaming allows you to receive the agent's reply incrementally as it's generated, rather than waiting for the complete response.\n",
"\n",
"### Benefits:\n",
"- Lower latency - see results as they're generated\n",
"- Better user experience - can start displaying content immediately\n",
"- More efficient for long-running operations\n",
"\n",
"### How it Works:\n",
"- Use `SendStreamingMessageRequest` instead of `SendMessageRequest`\n",
"- Call `send_message_streaming()` instead of `send_message()`\n",
"- Iterate over the response stream with `async for`\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Reuse the first message payload for streaming\n",
"streaming_request = SendStreamingMessageRequest(\n",
" id=str(uuid4()), params=MessageSendParams(**send_message_payload)\n",
")\n",
"\n",
"print(\"Sending streaming request...\")\n",
"print(\"\\n\" + \"=\"*60)\n",
"print(\"STREAMING RESPONSE CHUNKS\")\n",
"print(\"=\"*60 + \"\\n\")\n",
"\n",
"# Get the streaming response\n",
"stream_response = client.send_message_streaming(streaming_request)\n",
"\n",
"# Process each chunk as it arrives\n",
"chunk_count = 0\n",
"async for chunk in stream_response:\n",
" chunk_count += 1\n",
" print(f\"--- Chunk {chunk_count} ---\")\n",
" print(chunk.model_dump(mode='json', exclude_none=True))\n",
" print()\n",
"\n",
"print(f\"\\n✓ Received {chunk_count} chunks total\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cleanup\n",
"\n",
"Always remember to close the HTTP client when you're done to free up resources.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Close the HTTP client when done\n",
"await httpx_client.aclose()\n",
"print(\"✓ HTTP client closed\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summary\n",
"\n",
"Congratulations! You've learned how to:\n",
"\n",
"1. **Discover agents** using AgentCards (both public and extended)\n",
"2. **Initialize an A2A client** using ClientFactory\n",
"3. **Send single messages** for one-shot interactions\n",
"4. **Maintain context** in multi-turn conversations using task_id and context_id\n",
"5. **Stream responses** for better user experience with long-running operations\n",
"\n",
"## Next Steps\n",
"\n",
"- Explore different message types (with images, files, etc.)\n",
"- Implement error handling for production use\n",
"- Build a conversation UI that maintains context\n",
"- Experiment with different agents supporting the A2A protocol\n",
"- Integrate A2A clients into your applications\n",
"\n",
"## Troubleshooting\n",
"\n",
"### Common Issues:\n",
"\n",
"1. **Connection Refused Error**\n",
" - Make sure the agent service is running: `cd a2a_service && uv run python -m a2a_service`\n",
" - Check that the service is on the correct port (default: 10000)\n",
"\n",
"2. **Timeout Errors**\n",
" - LLM responses can take time, especially with the helpfulness evaluation loop\n",
" - The timeout is set to 60 seconds, but you can increase it if needed\n",
"\n",
"3. **Import Errors**\n",
" - Make sure you have installed dependencies: `uv sync` or `pip install a2a-sdk httpx`\n",
"\n",
"4. **Authentication Errors for Extended Card**\n",
" - The example uses a dummy token - replace with real authentication in production\n",
"\n",
"## Additional Resources\n",
"\n",
"- [A2A Protocol Specification](https://github.com/missingstudio/a2a-protocol)\n",
"- [LangGraph Documentation](https://python.langchain.com/docs/langgraph)\n",
"- [Project README](../README.md)\n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"ename": "RuntimeError",
"evalue": "asyncio.run() cannot be called from a running event loop",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mRuntimeError\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[11]\u001b[39m\u001b[32m, line 22\u001b[39m\n\u001b[32m 19\u001b[39m res = r.json()\n\u001b[32m 20\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m res.get(\u001b[33m\"\u001b[39m\u001b[33mresult\u001b[39m\u001b[33m\"\u001b[39m, [])\n\u001b[32m---> \u001b[39m\u001b[32m22\u001b[39m agents = \u001b[43masyncio\u001b[49m\u001b[43m.\u001b[49m\u001b[43mrun\u001b[49m\u001b[43m(\u001b[49m\u001b[43msearch_agents\u001b[49m\u001b[43m(\u001b[49m\u001b[43mskills\u001b[49m\u001b[43m=\u001b[49m\u001b[43m[\u001b[49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43mtranslate\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 23\u001b[39m \u001b[38;5;28mprint\u001b[39m(json.dumps(agents, indent=\u001b[32m2\u001b[39m))\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.local/share/uv/python/cpython-3.12.9-linux-x86_64-gnu/lib/python3.12/asyncio/runners.py:191\u001b[39m, in \u001b[36mrun\u001b[39m\u001b[34m(main, debug, loop_factory)\u001b[39m\n\u001b[32m 161\u001b[39m \u001b[38;5;250m\u001b[39m\u001b[33;03m\"\"\"Execute the coroutine and return the result.\u001b[39;00m\n\u001b[32m 162\u001b[39m \n\u001b[32m 163\u001b[39m \u001b[33;03mThis function runs the passed coroutine, taking care of\u001b[39;00m\n\u001b[32m (...)\u001b[39m\u001b[32m 187\u001b[39m \u001b[33;03m asyncio.run(main())\u001b[39;00m\n\u001b[32m 188\u001b[39m \u001b[33;03m\"\"\"\u001b[39;00m\n\u001b[32m 189\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m events._get_running_loop() \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m 190\u001b[39m \u001b[38;5;66;03m# fail fast with short traceback\u001b[39;00m\n\u001b[32m--> \u001b[39m\u001b[32m191\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(\n\u001b[32m 192\u001b[39m \u001b[33m\"\u001b[39m\u001b[33masyncio.run() cannot be called from a running event loop\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 194\u001b[39m \u001b[38;5;28;01mwith\u001b[39;00m Runner(debug=debug, loop_factory=loop_factory) \u001b[38;5;28;01mas\u001b[39;00m runner:\n\u001b[32m 195\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m runner.run(main)\n",
"\u001b[31mRuntimeError\u001b[39m: asyncio.run() cannot be called from a running event loop"
]
}
],
"source": [
"import httpx, uuid, asyncio, json\n",
"\n",
"REGISTRY_RPC = \"https://api.a2a-registry.dev/jsonrpc\"\n",
"\n",
"async def search_agents(skills=None, query=None):\n",
" payload = {\n",
" \"jsonrpc\": \"2.0\",\n",
" \"method\": \"search_agents\",\n",
" \"params\": {\n",
" \"skills\": skills,\n",
" \"query\": query,\n",
" \"protocol_version\": \"0.3.0\"\n",
" },\n",
" \"id\": str(uuid.uuid4())\n",
" }\n",
" async with httpx.AsyncClient(timeout=15) as client:\n",
" r = await client.post(REGISTRY_RPC, json=payload)\n",
" r.raise_for_status()\n",
" res = r.json()\n",
" return res.get(\"result\", [])\n",
"\n",
"agents = asyncio.run(search_agents(skills=[\"translate\"]))\n",
"print(json.dumps(agents, indent=2))\n"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: HTTP Request: POST https://api.a2a-registry.dev/jsonrpc \"HTTP/1.1 405 Method Not Allowed\"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"🔍 Searching public A2A Registry...\n"
]
},
{
"ename": "HTTPStatusError",
"evalue": "Client error '405 Method Not Allowed' for url 'https://api.a2a-registry.dev/jsonrpc'\nFor more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/405",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mHTTPStatusError\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[12]\u001b[39m\u001b[32m, line 31\u001b[39m\n\u001b[32m 28\u001b[39m \u001b[38;5;28mprint\u001b[39m(json.dumps(agents, indent=\u001b[32m2\u001b[39m))\n\u001b[32m 30\u001b[39m \u001b[38;5;66;03m# ✅ Works fine inside Jupyter/IPython\u001b[39;00m\n\u001b[32m---> \u001b[39m\u001b[32m31\u001b[39m \u001b[38;5;28;01mawait\u001b[39;00m main()\n",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[12]\u001b[39m\u001b[32m, line 27\u001b[39m, in \u001b[36mmain\u001b[39m\u001b[34m()\u001b[39m\n\u001b[32m 25\u001b[39m \u001b[38;5;28;01masync\u001b[39;00m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34mmain\u001b[39m():\n\u001b[32m 26\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33m\"\u001b[39m\u001b[33m🔍 Searching public A2A Registry...\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m---> \u001b[39m\u001b[32m27\u001b[39m agents = \u001b[38;5;28;01mawait\u001b[39;00m search_agents(skills=[\u001b[33m\"\u001b[39m\u001b[33mtranslate\u001b[39m\u001b[33m\"\u001b[39m]) \u001b[38;5;66;03m# change skills/query here\u001b[39;00m\n\u001b[32m 28\u001b[39m \u001b[38;5;28mprint\u001b[39m(json.dumps(agents, indent=\u001b[32m2\u001b[39m))\n",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[12]\u001b[39m\u001b[32m, line 22\u001b[39m, in \u001b[36msearch_agents\u001b[39m\u001b[34m(skills, query)\u001b[39m\n\u001b[32m 20\u001b[39m \u001b[38;5;28;01masync\u001b[39;00m \u001b[38;5;28;01mwith\u001b[39;00m httpx.AsyncClient(timeout=\u001b[32m15\u001b[39m) \u001b[38;5;28;01mas\u001b[39;00m client:\n\u001b[32m 21\u001b[39m response = \u001b[38;5;28;01mawait\u001b[39;00m client.post(REGISTRY_RPC, json=payload)\n\u001b[32m---> \u001b[39m\u001b[32m22\u001b[39m \u001b[43mresponse\u001b[49m\u001b[43m.\u001b[49m\u001b[43mraise_for_status\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 23\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m response.json().get(\u001b[33m\"\u001b[39m\u001b[33mresult\u001b[39m\u001b[33m\"\u001b[39m, [])\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/AIE8-Staging/15_A2A_LangGraph/.venv/lib/python3.12/site-packages/httpx/_models.py:829\u001b[39m, in \u001b[36mResponse.raise_for_status\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 827\u001b[39m error_type = error_types.get(status_class, \u001b[33m\"\u001b[39m\u001b[33mInvalid status code\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 828\u001b[39m message = message.format(\u001b[38;5;28mself\u001b[39m, error_type=error_type)\n\u001b[32m--> \u001b[39m\u001b[32m829\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m HTTPStatusError(message, request=request, response=\u001b[38;5;28mself\u001b[39m)\n",
"\u001b[31mHTTPStatusError\u001b[39m: Client error '405 Method Not Allowed' for url 'https://api.a2a-registry.dev/jsonrpc'\nFor more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/405"
]
}
],
"source": [
"import httpx\n",
"import uuid\n",
"import json\n",
"import asyncio\n",
"\n",
"REGISTRY_RPC = \"https://api.a2a-registry.dev/jsonrpc\"\n",
"\n",
"async def search_agents(skills=None, query=None):\n",
" payload = {\n",
" \"jsonrpc\": \"2.0\",\n",
" \"method\": \"search_agents\",\n",
" \"params\": {\n",
" \"skills\": skills,\n",
" \"query\": query,\n",
" \"protocol_version\": \"0.3.0\"\n",
" },\n",
" \"id\": str(uuid.uuid4())\n",
" }\n",
"\n",
" async with httpx.AsyncClient(timeout=15) as client:\n",
" response = await client.post(REGISTRY_RPC, json=payload)\n",
" response.raise_for_status()\n",
" return response.json().get(\"result\", [])\n",
"\n",
"async def main():\n",
" print(\"🔍 Searching public A2A Registry...\")\n",
" agents = await search_agents(skills=[\"translate\"]) # change skills/query here\n",
" print(json.dumps(agents, indent=2))\n",
"\n",
"# ✅ Works fine inside Jupyter/IPython\n",
"await main()\n"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🔎 Searching public registry for skills=['translate'] …\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: HTTP Request: GET https://api.a2a-registry.dev/health \"HTTP/1.1 404 Not Found\"\n",
"INFO: HTTP Request: POST https://api.a2a-registry.dev/jsonrpc \"HTTP/1.1 405 Method Not Allowed\"\n",
"INFO: HTTP Request: POST https://api.a2a-registry.dev/agents/search \"HTTP/1.1 405 Method Not Allowed\"\n",
"INFO: HTTP Request: POST https://api.a2a-registry.dev/v1/agents/search \"HTTP/1.1 405 Method Not Allowed\"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"[info] JSON-RPC https://api.a2a-registry.dev/jsonrpc returned 405; trying REST …\n",
"[info] REST POST https://api.a2a-registry.dev/agents/search returned 405\n",
"[info] REST POST https://api.a2a-registry.dev/v1/agents/search returned 405\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: HTTP Request: GET https://api.a2a-registry.dev/agents?skills=translate \"HTTP/1.1 404 Not Found\"\n",
"INFO: HTTP Request: GET https://api.a2a-registry.dev/agents?skills=translate \"HTTP/1.1 404 Not Found\"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"[info] REST GET https://api.a2a-registry.dev/agents params={'skills': ['translate']} → 404\n",
"[info] REST GET https://api.a2a-registry.dev/agents params={'skills': 'translate'} → 404\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: HTTP Request: GET https://api.a2a-registry.dev/v1/agents?skills=translate \"HTTP/1.1 404 Not Found\"\n",
"INFO: HTTP Request: GET https://api.a2a-registry.dev/v1/agents?skills=translate \"HTTP/1.1 404 Not Found\"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"[info] REST GET https://api.a2a-registry.dev/v1/agents params={'skills': ['translate']} → 404\n",
"[info] REST GET https://api.a2a-registry.dev/v1/agents params={'skills': 'translate'} → 404\n",
"Meta: {'error': 'no_supported_endpoint_found', 'base': 'https://api.a2a-registry.dev'}\n",
"[]\n"
]
}
],
"source": [
"import json\n",
"import uuid\n",
"import httpx\n",
"\n",
"BASE = \"https://api.a2a-registry.dev\" # change if you want to test others\n",
"JSONRPC = f\"{BASE}/jsonrpc\"\n",
"REST_SEARCHS = [\n",
" f\"{BASE}/agents/search\",\n",
" f\"{BASE}/v1/agents/search\",\n",
"]\n",
"REST_GETS = [\n",
" f\"{BASE}/agents\",\n",
" f\"{BASE}/v1/agents\",\n",
"]\n",
"HEALTH = f\"{BASE}/health\"\n",
"\n",
"async def registry_search(skills=None, query=None, protocol_version=\"0.3.0\", timeout=20):\n",
" \"\"\"Search an A2A-like registry via JSON-RPC with REST fallbacks.\n",
" Returns: list (possibly empty) and a dict describing which path worked.\n",
" \"\"\"\n",
" headers = {\"Accept\": \"application/json\", \"Content-Type\": \"application/json\"}\n",
"\n",
" async with httpx.AsyncClient(timeout=timeout, headers=headers) as cli:\n",
" # 0) Health (best-effort)\n",
" try:\n",
" h = await cli.get(HEALTH)\n",
" if h.status_code // 100 == 2:\n",
" pass\n",
" except Exception:\n",
" # Non-fatal\n",
" pass\n",
"\n",
" # 1) Try JSON-RPC\n",
" payload = {\n",
" \"jsonrpc\": \"2.0\",\n",
" \"method\": \"search_agents\",\n",
" \"params\": {\"skills\": skills, \"query\": query, \"protocol_version\": protocol_version},\n",
" \"id\": str(uuid.uuid4()),\n",
" }\n",
" try:\n",
" r = await cli.post(JSONRPC, json=payload)\n",
" if r.status_code == 200:\n",
" data = r.json()\n",
" if isinstance(data, dict) and \"result\" in data:\n",
" return data[\"result\"], {\"path\": JSONRPC, \"mode\": \"jsonrpc\"}\n",
" else:\n",
" # 405/404/etc: fall through\n",
" print(f\"[info] JSON-RPC {JSONRPC} returned {r.status_code}; trying REST …\")\n",
" except httpx.HTTPError as e:\n",
" print(f\"[info] JSON-RPC failed: {e}; trying REST …\")\n",
"\n",
" # 2) Try REST POST variants\n",
" rest_body = {\"skills\": skills, \"query\": query, \"protocol_version\": protocol_version}\n",
" for url in REST_SEARCHS:\n",
" try:\n",
" r = await cli.post(url, json=rest_body)\n",
" if r.status_code == 200:\n",
" return r.json(), {\"path\": url, \"mode\": \"rest_post\"}\n",
" else:\n",
" print(f\"[info] REST POST {url} returned {r.status_code}\")\n",
" except httpx.HTTPError as e:\n",
" print(f\"[info] REST POST {url} failed: {e}\")\n",
"\n",
" # 3) Try REST GET variants with query params\n",
" params = {}\n",
" if query: params[\"query\"] = query\n",
" if skills:\n",
" # registries may accept repeated params or comma-separated\n",
" # we’ll try both styles\n",
" candidates = [\n",
" params | {\"skills\": skills}, # skills=['a','b']\n",
" params | {\"skills\": \",\".join(skills)}, # skills='a,b'\n",
" ]\n",
" else:\n",
" candidates = [params]\n",
"\n",
" for url in REST_GETS:\n",
" for p in candidates:\n",
" try:\n",
" r = await cli.get(url, params=p)\n",
" if r.status_code == 200:\n",
" return r.json(), {\"path\": url, \"mode\": \"rest_get\", \"params\": p}\n",
" else:\n",
" print(f\"[info] REST GET {url} params={p} → {r.status_code}\")\n",
" except httpx.HTTPError as e:\n",
" print(f\"[info] REST GET {url} failed: {e}\")\n",
"\n",
" # If nothing hit, return empty with reason\n",
" return [], {\"error\": \"no_supported_endpoint_found\", \"base\": BASE}\n",
"\n",
"async def main():\n",
" print(\"🔎 Searching public registry for skills=['translate'] …\")\n",
" results, meta = await registry_search(skills=[\"translate\"])\n",
" print(\"Meta:\", meta)\n",
" print(json.dumps(results, indent=2))\n",
"\n",
"# ✅ Notebook-friendly\n",
"await main()\n"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"import json, uuid\n",
"import httpx\n",
"\n",
"# Candidate well-known paths across spec versions\n",
"WELL_KNOWN_CANDIDATES = [\n",
" \"/.well-known/agent-card.json\", # newer naming\n",
" \"/.well-known/agent.json\", # earlier spec\n",
" \"/agent.card.json\" # some servers expose an extended card here\n",
"]\n",
"\n",
"async def fetch_agent_card(base_url: str, timeout: float = 10.0):\n",
" \"\"\"Try common well-known paths and return the first AgentCard found.\"\"\"\n",
" base = base_url.rstrip(\"/\")\n",
" headers = {\"Accept\": \"application/json\"}\n",
" async with httpx.AsyncClient(timeout=timeout, headers=headers) as cli:\n",
" for path in WELL_KNOWN_CANDIDATES:\n",
" try:\n",
" r = await cli.get(base + path)\n",
" if r.status_code == 200 and r.headers.get(\"content-type\",\"\").startswith(\"application/json\"):\n",
" return {\"source\": base + path, \"card\": r.json()}\n",
" except httpx.HTTPError:\n",
" pass\n",
" return None\n",
"\n",
"async def search_self_hosted_registry(registry_base: str, skills=None, query=None, protocol_version=\"0.3.0\", timeout: float = 15.0):\n",
" \"\"\"Query your own A2A Registry (if you run one). Tries JSON-RPC then REST as per docs.\"\"\"\n",
" base = registry_base.rstrip(\"/\")\n",
" headers = {\"Accept\": \"application/json\", \"Content-Type\": \"application/json\"}\n",
" jsonrpc = f\"{base}/jsonrpc\"\n",
" rest_search = f\"{base}/agents/search\"\n",
"\n",
" payload = {\n",
" \"jsonrpc\": \"2.0\",\n",
" \"method\": \"search_agents\",\n",
" \"params\": {\"skills\": skills, \"query\": query, \"protocol_version\": protocol_version},\n",
" \"id\": str(uuid.uuid4())\n",
" }\n",
" async with httpx.AsyncClient(timeout=timeout, headers=headers) as cli:\n",
" # Try JSON-RPC\n",
" try:\n",
" r = await cli.post(jsonrpc, json=payload)\n",
" if r.status_code == 200:\n",
" data = r.json()\n",
" if isinstance(data, dict) and \"result\" in data:\n",
" return {\"mode\": \"jsonrpc\", \"results\": data[\"result\"]}\n",
" except httpx.HTTPError:\n",
" pass\n",
" # Fallback to REST\n",
" try:\n",
" r2 = await cli.post(rest_search, json={\"skills\": skills, \"query\": query, \"protocol_version\": protocol_version})\n",
" if r2.status_code == 200:\n",
" return {\"mode\": \"rest\", \"results\": r2.json()}\n",
" except httpx.HTTPError:\n",
" pass\n",
" return {\"mode\": None, \"results\": []}\n",
"\n",
"# Example usage in a notebook:\n",
"# 1) Direct well-known discovery against known agent hosts you control:\n",
"# cards = [await fetch_agent_card(u) for u in [\"https://your-agent.example.com\", \"http://localhost:10000\"]]\n",
"# print(json.dumps([c for c in cards if c], indent=2))\n",
"\n",
"# 2) If you run a registry locally or in your VPC:\n",
"# reg = await search_self_hosted_registry(\"http://localhost:8000\", skills=[\"weather_forecast\"])\n",
"# print(json.dumps(reg, indent=2))\n"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO: HTTP Request: GET http://localhost:10000/.well-known/agent-card.json \"HTTP/1.1 200 OK\"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"=== AgentCard discovery (well-known) ===\n",
"\n",
"Host: http://localhost:10000\n",
" tried http://localhost:10000/.well-known/agent-card.json -> 200 (application/json)\n",
" ✅ FOUND AgentCard at: http://localhost:10000/.well-known/agent-card.json\n",
" name/title: General Purpose Agent\n",
" capabilities keys: ['pushNotifications', 'streaming']\n",
"\n",
"=== Self-hosted registry search (optional) ===\n",
"No SELF_HOSTED_REGISTRY set. Skipping registry query.\n"
]
}
],
"source": [
"import json, uuid\n",
"import httpx\n",
"from typing import List, Optional, Dict, Any\n",
"\n",
"# Candidate well-known paths across spec versions\n",
"WELL_KNOWN_CANDIDATES = [\n",
" \"/.well-known/agent-card.json\", # newer naming\n",
" \"/.well-known/agent.json\", # earlier spec\n",
" \"/agent.card.json\", # some servers expose an extended card here\n",
"]\n",
"\n",
"async def fetch_agent_card(base_url: str, timeout: float = 10.0) -> Dict[str, Any]:\n",
" \"\"\"\n",
" Try common well-known paths and return the first AgentCard found.\n",
" Always returns diagnostics: list of (url, status|ERROR, content_type|message).\n",
" \"\"\"\n",
" base = base_url.rstrip(\"/\")\n",
" headers = {\"Accept\": \"application/json\"}\n",
" attempts = []\n",
" async with httpx.AsyncClient(timeout=timeout, headers=headers) as cli:\n",
" for path in WELL_KNOWN_CANDIDATES:\n",
" url = base + path\n",
" try:\n",
" r = await cli.get(url)\n",
" attempts.append((url, r.status_code, r.headers.get(\"content-type\")))\n",
" if r.status_code == 200 and (r.headers.get(\"content-type\") or \"\").startswith(\"application/json\"):\n",
" return {\"source\": url, \"card\": r.json(), \"attempts\": attempts}\n",
" except httpx.HTTPError as e:\n",
" attempts.append((url, \"ERROR\", str(e)))\n",
" return {\"source\": None, \"card\": None, \"attempts\": attempts}\n",
"\n",
"async def search_self_hosted_registry(\n",
" registry_base: str,\n",
" skills: Optional[List[str]] = None,\n",
" query: Optional[str] = None,\n",
" protocol_version: str = \"0.3.0\",\n",
" timeout: float = 15.0,\n",
") -> Dict[str, Any]:\n",
" \"\"\"\n",
" Query your own A2A Registry (if you run one). Tries JSON-RPC then REST.\n",
" Returns diagnostics and results (possibly empty).\n",
" \"\"\"\n",
" base = registry_base.rstrip(\"/\")\n",
" headers = {\"Accept\": \"application/json\", \"Content-Type\": \"application/json\"}\n",
" jsonrpc = f\"{base}/jsonrpc\"\n",
" rest_search = f\"{base}/agents/search\"\n",
" diag = {\"jsonrpc\": None, \"rest\": None}\n",
"\n",
" payload = {\n",
" \"jsonrpc\": \"2.0\",\n",
" \"method\": \"search_agents\",\n",
" \"params\": {\"skills\": skills, \"query\": query, \"protocol_version\": protocol_version},\n",
" \"id\": str(uuid.uuid4())\n",
" }\n",
"\n",
" async with httpx.AsyncClient(timeout=timeout, headers=headers) as cli:\n",
" # JSON-RPC\n",
" try:\n",
" r = await cli.post(jsonrpc, json=payload)\n",
" diag[\"jsonrpc\"] = {\"url\": jsonrpc, \"status\": r.status_code, \"ct\": r.headers.get(\"content-type\")}\n",
" if r.status_code == 200:\n",
" data = r.json()\n",
" if isinstance(data, dict) and \"result\" in data:\n",
" return {\"mode\": \"jsonrpc\", \"results\": data[\"result\"], \"diagnostics\": diag}\n",
" except httpx.HTTPError as e:\n",
" diag[\"jsonrpc\"] = {\"url\": jsonrpc, \"status\": \"ERROR\", \"error\": str(e)}\n",
"\n",
" # REST\n",
" try:\n",
" r2 = await cli.post(rest_search, json={\"skills\": skills, \"query\": query, \"protocol_version\": protocol_version})\n",
" diag[\"rest\"] = {\"url\": rest_search, \"status\": r2.status_code, \"ct\": r2.headers.get(\"content-type\")}\n",
" if r2.status_code == 200:\n",
" return {\"mode\": \"rest\", \"results\": r2.json(), \"diagnostics\": diag}\n",
" except httpx.HTTPError as e:\n",
" diag[\"rest\"] = {\"url\": rest_search, \"status\": \"ERROR\", \"error\": str(e)}\n",
"\n",
" return {\"mode\": None, \"results\": [], \"diagnostics\": diag}\n",
"\n",
"# -------- Runner that ALWAYS prints something -------- #\n",
"\n",
"AGENT_HOSTS = [\n",
" # 🔧 Put the hosts you control here:\n",
" \"http://localhost:10000\",\n",
" # \"https://your-agent.example.com\",\n",
"]\n",
"\n",
"SELF_HOSTED_REGISTRY = None # e.g. \"http://localhost:8000\" if you run one\n",
"\n",
"async def main():\n",
" print(\"=== AgentCard discovery (well-known) ===\")\n",
" if not AGENT_HOSTS:\n",
" print(\"No AGENT_HOSTS configured. Add base URLs to probe.\")\n",
" else:\n",
" found_any = False\n",
" for host in AGENT_HOSTS:\n",
" result = await fetch_agent_card(host)\n",
" # Print diagnostics for each attempt\n",
" print(f\"\\nHost: {host}\")\n",
" for url, status, info in result[\"attempts\"]:\n",
" print(f\" tried {url} -> {status} ({info})\")\n",
" if result[\"card\"]:\n",
" found_any = True\n",
" print(\" ✅ FOUND AgentCard at:\", result[\"source\"])\n",
" # print condensed summary\n",
" card = result[\"card\"]\n",
" name = card.get(\"name\") or card.get(\"title\")\n",
" caps = card.get(\"capabilities\") or card.get(\"skills\")\n",
" print(\" name/title:\", name)\n",
" if isinstance(caps, list):\n",
" print(\" skills:\", [c.get(\"id\") or c.get(\"name\") for c in caps])\n",
" else:\n",
" print(\" capabilities keys:\", list((caps or {}).keys()))\n",
" if not found_any:\n",
" print(\"\\nNo AgentCards found on provided hosts.\")\n",
"\n",
" print(\"\\n=== Self-hosted registry search (optional) ===\")\n",
" if SELF_HOSTED_REGISTRY:\n",
" reg = await search_self_hosted_registry(SELF_HOSTED_REGISTRY, skills=[\"weather_forecast\"])\n",
" print(\"Diagnostics:\", json.dumps(reg.get(\"diagnostics\"), indent=2))\n",
" print(\"Mode:\", reg.get(\"mode\"))\n",
" print(\"Results:\", json.dumps(reg.get(\"results\"), indent=2))\n",
" else:\n",
" print(\"No SELF_HOSTED_REGISTRY set. Skipping registry query.\")\n",
"\n",
"# ✅ Notebook-friendly: just run\n",
"await main()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.9"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

Short answer: LangGraph doesn’t (yet) ship a first-class A2A wrapper. Today you’ve got three pragmatic patterns:

  1. Use MCP as the bridge (recommended when the remote exposes MCP or you can add an adapter). LangGraph has mature, supported MCP adapters (multi-server, streamable HTTP/stdio). You can expose your graph as MCP and/or consume MCP tools inside a graph. (Langchain AI)

  2. Call A2A directly from a node (custom “handoff” node). Treat the A2A agent like a remote service (JSON-RPC/SSE). This is a small amount of glue code and works today.

  3. Run an adapter service (A2A⇄MCP shim). Present A2A agents as MCP tools (or vice-versa) so you can stay on the happy path with LangGraph’s MCP support. Community packages exist that advertise LangGraph compatibility. (PyPI)

There are community requests for native A2A nodes/sub-graphs in LangGraph, but they’re not part of the public, stable API yet. (LangChain Forum)


Minimal, production-lean code

A) Direct A2A call from a LangGraph node (Python)

Use a normal node/handoff that calls your A2A server via JSON-RPC. Keep it pure I/O so it’s easy to retry/trace.

from typing import TypedDict, Annotated, Sequence
import asyncio, uuid, httpx
from langgraph.graph import StateGraph, END

A2A_URL = "http://localhost:10000/jsonrpc"  # your A2A server

class AgentState(TypedDict):
    messages: Annotated[Sequence[dict], "conversation history"]

async def a2a_send(message: dict) -> dict:
    payload = {
        "jsonrpc": "2.0",
        "method": "message.send",
        "params": {"message": message},
        "id": str(uuid.uuid4()),
    }
    async with httpx.AsyncClient(timeout=60) as cli:
        r = await cli.post(A2A_URL, json=payload)
        r.raise_for_status()
        data = r.json()
        if "error" in data:
            raise RuntimeError(data["error"])
        return data["result"]

async def call_remote_agent(state: AgentState) -> AgentState:
    user_msg = next(m for m in reversed(state["messages"]) if m["role"] == "user")
    result = await a2a_send({
        "role": "user",
        "parts": [{"kind": "text", "text": user_msg["content"]}],
        "message_id": uuid.uuid4().hex,
    })
    # Normalize A2A's response into your state schema
    content = result.get("content") or result.get("text") or str(result)
    return {"messages": [*state["messages"], {"role": "assistant", "content": content}]}

# Graph
graph = StateGraph(AgentState)
graph.add_node("remote_a2a", call_remote_agent)
graph.set_entry_point("remote_a2a")
graph.add_edge("remote_a2a", END)
app = graph.compile()

Notes

  • If your A2A server streams, replace a2a_send with an async generator consuming SSE and build up the assistant message before returning.
  • Wrap with retries/jitter and log a corr_id (UUID) into LangSmith run metadata for observability.

B) Using MCP adapters (preferred when available)

If your remote exposes MCP (or you add a shim), you can load tools and let the graph’s agent do tool-use natively.

from langchain_mcp_adapters import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
import asyncio

async def build_agent():
    async with MultiServerMCPClient.from_servers([
        # stdio or streamable-http
        {"transport": "streamable-http", "url": "https://mcp.your-host.com/mcp"},
    ]) as mcp:
        tools = await mcp.get_tools()
        llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
        agent = create_react_agent(llm, tools)   # tools are MCP-backed
        return agent

# agent = asyncio.run(build_agent())
  • LangGraph/LC officially support this path; you get multi-server discovery, streaming, auth headers, etc. out of the box. (Langchain AI)
  • LangGraph Server can expose your graph as an MCP endpoint at /mcp, so other frameworks (or your own graphs) can use it as a tool. (LangChain Docs)

When to choose which

  • You control both sides or can add adapters → MCP route (cleanest, best-supported; future-proof in LangGraph). (Langchain AI)
  • You must talk to a raw A2A server with no MCP → implement the small handoff node (snippet A).
  • Heterogeneous estate (A2A + MCP + others) → run a small A2A⇄MCP shim so agents appear as tools. Community packages advertise this pattern. (PyPI)

Related LangGraph work

  • LangGraph’s broader Agent Protocol/interoperability push (runs/threads/memory) points in the same direction, but it’s distinct from A2A. Useful context if you’re designing for cross-framework federation. (LangChain Changelog)

If you share the exact A2A server semantics (method names, streaming shape), I can drop in a streaming-aware version of the node and a tiny “capability cache” so the graph lazily discovers the remote’s skills once and avoids prompt/token bloat on every call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment