Questa pagina mostra come sviluppare e testare un agente Agent2Agent (A2A). Il protocollo A2A è uno standard aperto progettato per consentire una comunicazione e una collaborazione senza interruzioni tra gli agenti AI. Questa guida si concentra sul workflow locale, che ti consente di definire e verificare la funzionalità dell'agente prima del deployment.
Il flusso di lavoro principale prevede i seguenti passaggi:
Definisci i componenti dell'agente
Per creare un agente A2A, devi definire i seguenti componenti: un AgentCard, un AgentExecutor e un LlmAgent ADK.
AgentCardcontiene un documento di metadati che descrive le funzionalità dell'agente.AgentCardè come un biglietto da visita che altri agenti possono utilizzare per scoprire cosa può fare il tuo agente. Per maggiori dettagli, consulta la specifica della scheda dell'agente.AgentExecutorcontiene la logica principale dell'agente e definisce come gestisce le attività. È qui che implementi il comportamento dell'agente. Puoi saperne di più a riguardo nella specifica del protocollo A2A.- (Facoltativo)
LlmAgentdefinisce l'agente ADK, incluse le istruzioni di sistema, il modello generativo e gli strumenti.
Definisci un AgentCard
Il seguente esempio di codice definisce un AgentCard per un agente di cambio valuta:
from a2a.types import AgentCard, AgentSkill
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card
# Define the skill for the CurrencyAgent
currency_skill = AgentSkill(
id='get_exchange_rate',
name='Get Currency Exchange Rate',
description='Retrieves the exchange rate between two currencies on a specified date.',
tags=['Finance', 'Currency', 'Exchange Rate'],
examples=[
'What is the exchange rate from USD to EUR?',
'How many Japanese Yen is 1 US dollar worth today?',
],
)
# Create the agent card using the utility function
agent_card = create_agent_card(
agent_name='Currency Exchange Agent',
description='An agent that can provide currency exchange rates',
skills=[currency_skill]
)
Definisci un AgentExecutor
Il seguente esempio di codice definisce un AgentExecutor che risponde con il tasso di cambio. Accetta un'istanza di CurrencyAgent e inizializza ADK Runner per eseguire le richieste.
import requests
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, TextPart, UnsupportedOperationError, Part
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError
from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions import InMemorySessionService
from google.genai import types
class CurrencyAgentExecutorWithRunner(AgentExecutor):
"""Executor that takes an LlmAgent instance and initializes the ADK Runner internally."""
def __init__(self, agent: LlmAgent):
self.agent = agent
self.runner = None
def _init_adk(self):
if not self.runner:
self.runner = Runner(
app_name=self.agent.name,
agent=self.agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
async def cancel(self, context: RequestContext, event_queue: EventQueue):
raise ServerError(error=UnsupportedOperationError())
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
self._init_adk() # Initialize on first execute call
if not context.message:
return
user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user'
updater = TaskUpdater(event_queue, context.task_id, context.context_id)
if not context.current_task:
await updater.submit()
await updater.start_work()
query = context.get_user_input()
content = types.Content(role='user', parts=[types.Part(text=query)])
try:
session = await self.runner.session_service.get_session(
app_name=self.runner.app_name,
user_id=user_id,
session_id=context.context_id,
) or await self.runner.session_service.create_session(
app_name=self.runner.app_name,
user_id=user_id,
session_id=context.context_id,
)
final_event = None
async for event in self.runner.run_async(
session_id=session.id,
user_id=user_id,
new_message=content
):
if event.is_final_response():
final_event = event
if final_event and final_event.content and final_event.content.parts:
response_text = "".join(
part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
)
if response_text:
await updater.add_artifact(
[TextPart(text=response_text)],
name='result',
)
await updater.complete()
return
await updater.update_status(
TaskState.failed,
message=new_agent_text_message('Failed to generate a final response with text content.'),
final=True
)
except Exception as e:
await updater.update_status(
TaskState.failed,
message=new_agent_text_message(f"An error occurred: {str(e)}"),
final=True,
)
Definisci un LlmAgent
Innanzitutto, definisci uno strumento di cambio valuta da utilizzare per LlmAgent:
def get_exchange_rate(
currency_from: str = "USD",
currency_to: str = "EUR",
currency_date: str = "latest",
):
"""Retrieves the exchange rate between two currencies on a specified date.
Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
exchange rate data.
"""
try:
response = requests.get(
f"https://api.frankfurter.app/{currency_date}",
params={"from": currency_from, "to": currency_to},
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
Quindi, definisci un LlmAgent ADK che utilizza lo strumento.
my_llm_agent = LlmAgent(
model='gemini-2.0-flash',
name='currency_exchange_agent',
description='An agent that can provide currency exchange rates.',
instruction="""You are a helpful currency exchange assistant.
Use the get_exchange_rate tool to answer user questions.
If the tool returns an error, inform the user about the error.""",
tools=[get_exchange_rate],
)
Crea un agente locale
Dopo aver definito i componenti dell'agente, crea un'istanza della
A2aAgent classe che utilizza AgentCard, AgentExecutor, e LlmAgent per
iniziare i test locali.
from vertexai.preview.reasoning_engines import A2aAgent
a2a_agent = A2aAgent(
agent_card=agent_card, # Assuming agent_card is defined
agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner(
agent=my_llm_agent,
)
)
a2a_agent.set_up()
Il modello di agente A2A ti aiuta a creare un servizio conforme ad A2A. Il servizio funge da wrapper, sottraendo il livello di conversione.
Testa l'agente locale
L'agente di cambio valuta supporta i seguenti tre metodi:
handle_authenticated_agent_cardon_message_sendon_get_task
Testa handle_authenticated_agent_card
Il seguente codice recupera la scheda autenticata dell'agente, che descrive le funzionalità dell'agente.
# Test the `authenticated_agent_card` endpoint.
response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None)
print(response_get_card)
Testa on_message_send
Il seguente codice simula un client che invia un nuovo messaggio all'agente. The
A2aAgent crea una nuova attività e restituisce l'ID dell'attività.
import json
from starlette.requests import Request
import asyncio
# 1. Define the message payload you want to send.
message_data = {
"message": {
"messageId": "local-test-message-id",
"content":[
{
"text": "What is the exchange rate from USD to EUR today?"
}
],
"role": "ROLE_USER",
},
}
# 2. Construct the request
scope = {
"type": "http",
"http_version": "1.1",
"method": "POST",
"headers": [(b"content-type", b"application/json")],
}
async def receive():
byte_data = json.dumps(message_data).encode("utf-8")
return {"type": "http.request", "body": byte_data, "more_body": False}
post_request = Request(scope, receive=receive)
# 3. Call the agent
send_message_response = await a2a_agent.on_message_send(request=post_request, context=None)
print(send_message_response)
Testa on_get_task
Il seguente codice recupera lo stato e il risultato di un'attività. L'output mostra che l'attività è stata completata e include l'artefatto di risposta "Hello World".
from starlette.requests import Request
import asyncio
# 1. Provide the task_id from the previous step.
# In a real application, you would store and retrieve this ID.
task_id_to_get = send_message_response['task']['id']
# 2. Define the path parameters for the request.
task_data = {"id": task_id_to_get}
# 3. Construct the starlette.requests.Request object directly.
scope = {
"type": "http",
"http_version": "1.1",
"method": "GET",
"headers": [],
"query_string": b'',
"path_params": task_data,
}
async def empty_receive():
return {"type": "http.disconnect"}
get_request = Request(scope, empty_receive)
# 4. Call the agent's handler to get the task status.
task_status_response = await a2a_agent.on_get_task(request=get_request, context=None)
print(f"Successfully retrieved status for Task ID: {task_id_to_get}")
print("\nFull task status response:")
print(task_status_response)
Passaggi successivi
- Repository di esempi di Agent2Agent
- Utilizza un agente Agent2Agent.
- Valuta un agente.
- Esegui il deployment di un agente.
- Risolvi i problemi di sviluppo di un agente.
- Richiedi assistenza.