Crea un agente Agent2Agent

Agent Runtime ti consente di sviluppare ed eseguire il deployment di agenti utilizzando il protocollo Agent2Agent (A2A). A2A è uno standard aperto progettato per consentire una comunicazione e una collaborazione fluide tra gli agenti AI.

Questo documento spiega come sviluppare e testare un agente A2A localmente, inclusa la definizione di componenti come AgentCard e AgentExecutor.

Per saperne di più sulla gestione degli agenti di cui è stato eseguito il deployment, vedi Gestire gli agenti di cui è stato eseguito il deployment.

Il flusso di lavoro principale prevede i seguenti passaggi:

  1. Definisci i componenti chiave
  2. Crea agente locale
  3. Testare l'agente locale

Definisci i componenti dell'agente

Per creare un agente A2A, devi definire i seguenti componenti: un AgentCard, un AgentExecutor e un LlmAgent ADK.

  • AgentCard contiene un documento di metadati che descrive le funzionalità dell'agente. AgentCard è come un biglietto da visita che altri agenti possono utilizzare per scoprire cosa può fare il tuo agente. Per maggiori dettagli, consulta le specifiche della scheda dell'agente.
  • AgentExecutor contiene la logica principale dell'agente e definisce il modo in cui gestisce le attività. È qui che implementi il comportamento dell'agente. Per saperne di più, consulta la specifica del protocollo A2A.
  • (Facoltativo) LlmAgent definisce l'agente ADK, incluse le istruzioni di sistema, il modello generativo e gli strumenti.

Definisci un AgentCard

Il seguente esempio di codice definisce un AgentCard per un agente di tassi di cambio:

from a2a.types import AgentCard, AgentSkill
from vertexai.agent_engines.templates.a2a import create_agent_card

# Define the skill for the CurrencyAgent
currency_skill = AgentSkill(
    id='get_exchange_rate',
    name='Get Currency Exchange Rate',
    description='Retrieves the exchange rate between two currencies on a specified date.',
    tags=['Finance', 'Currency', 'Exchange Rate'],
    examples=[
        'What is the exchange rate from USD to EUR?',
        'How many Japanese Yen is 1 US dollar worth today?',
    ],
)

# Create the agent card using the utility function
agent_card = create_agent_card(
    agent_name='Currency Exchange Agent',
    description='An agent that can provide currency exchange rates',
    skills=[currency_skill]
)

Definisci un AgentExecutor

Il seguente esempio di codice definisce un AgentExecutor che risponde con il tasso di cambio. Prende un'istanza CurrencyAgent e inizializza ADK Runner per eseguire le richieste.

import requests
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a import types as a2a_types
from a2a.types import Part

from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.genai import types as genai_types

class CurrencyAgentExecutorWithRunner(AgentExecutor):
    """Executor that takes an LlmAgent instance and initializes the ADK Runner internally."""

    def __init__(self, agent: LlmAgent):
        self.agent = agent
        self.runner = None

    def _init_adk(self):
        if not self.runner:
            self.runner = Runner(
                app_name=self.agent.name,
                agent=self.agent,
                artifact_service=InMemoryArtifactService(),
                session_service=InMemorySessionService(),
                memory_service=InMemoryMemoryService(),
            )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        task_id = context.task_id
        updater = TaskUpdater(
            event_queue=event_queue,
            task_id=task_id or "",
            context_id=context.context_id or "",
        )
        await updater.cancel()

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        self._init_adk() # Initialize on first execute call

        if not context.message:
            return

        user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user'

        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        
        task = a2a_types.Task(
            id=context.task_id,
            context_id=context.context_id,
            status=a2a_types.TaskStatus(state=a2a_types.TaskState.TASK_STATE_SUBMITTED),
            history=[context.message] if context.message else [],
        )
        await event_queue.enqueue_event(task)

        await updater.start_work()

        query = context.get_user_input()
        content = genai_types.Content(role='user', parts=[genai_types.Part.from_text(text=query)])

        try:
            session = await self.runner.session_service.get_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            ) or await self.runner.session_service.create_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            )

            final_event = None
            async for event in self.runner.run_async(
                session_id=session.id,
                user_id=user_id,
                new_message=content
            ):
                if event.is_final_response():
                    final_event = event

            if final_event and final_event.content and final_event.content.parts:
                response_text = "".join(
                    part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
                )
                if response_text:
                    await updater.add_artifact(
                        [Part(text=response_text)],
                        name='result',
                        last_chunk=True,
                    )
                    await updater.complete()
                    return

            await updater.update_status(
                a2a_types.TaskState.TASK_STATE_FAILED,
                message=updater.new_agent_message([Part(text='Failed to generate a final response with text content.')]),
            )

        except Exception as e:
            await updater.update_status(
                a2a_types.TaskState.TASK_STATE_FAILED,
                message=updater.new_agent_message([Part(text=f"An error occurred: {str(e)}")]),
            )

Definisci un LlmAgent

Innanzitutto, definisci uno strumento di conversione della valuta da utilizzare per LlmAgent:

def get_exchange_rate(
    currency_from: str = "USD",
    currency_to: str = "EUR",
    currency_date: str = "latest",
):
    """Retrieves the exchange rate between two currencies on a specified date.
    Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
    exchange rate data.
    """
    try:
        response = requests.get(
            f"https://api.frankfurter.app/{currency_date}",
            params={"from": currency_from, "to": currency_to},
        )
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": str(e)}

Poi, definisci un ADK LlmAgent che utilizza lo strumento.

my_llm_agent = LlmAgent(
    model='gemini-2.0-flash',
    name='currency_exchange_agent',
    description='An agent that can provide currency exchange rates.',
    instruction="""You are a helpful currency exchange assistant.
                   Use the get_exchange_rate tool to answer user questions.
                   If the tool returns an error, inform the user about the error.""",
    tools=[get_exchange_rate],
)

Crea un agente locale

Dopo aver definito i componenti dell'agente, crea un'istanza della classe A2aAgent che utilizza AgentCard, AgentExecutor e LlmAgent per iniziare i test locali.

from vertexai.agent_engines.templates.a2a import A2aAgent

a2a_agent = A2aAgent(
    agent_card=agent_card, # Assuming agent_card is defined
    agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner(
        agent=my_llm_agent,
    )
)
a2a_agent.set_up()

Il modello di agente A2A ti aiuta a creare un servizio conforme ad A2A. Il servizio funge da wrapper, astraendo il livello di conversione.

Testa l'agente locale

L'agente del tasso di cambio supporta i seguenti tre metodi:

  • handle_authenticated_agent_card
  • on_message_send
  • on_get_task

Test handle_authenticated_agent_card

Il codice seguente recupera la scheda autenticata dell'agente, che descrive le sue funzionalità.

# Test the `authenticated_agent_card` endpoint.
response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None)
print(response_get_card)

Test on_message_send

Il seguente codice simula un client che invia un nuovo messaggio all'agente. A2aAgent crea una nuova attività e restituisce il relativo ID.

from a2a.types import SendMessageRequest, Message, Part
from a2a.server.context import ServerCallContext

# 1. Define the message
message = Message(
    role="ROLE_USER",
    message_id="local-test-message-id",
    parts=[Part(text="What is the exchange rate from USD to EUR today?")]
)

# 2. Construct the request
request = SendMessageRequest(message=message)

# 3. Construct context
context = ServerCallContext()

# 4. Call the agent
send_message_response = await a2a_agent.on_message_send(request=request, context=context)

print(send_message_response)

Test on_get_task

Il seguente codice recupera lo stato e il risultato di un'attività. L'output mostra che l'attività è completata e include l'artefatto di risposta "Hello World".

from a2a.types import GetTaskRequest

# 1. Provide the task_id from the previous step.
# In a real application, you would store and retrieve this ID.
task_id_to_get = send_message_response.id

# 2. Construct the request
request = GetTaskRequest(id=task_id_to_get)

# 3. Call the agent's handler to get the task status.
# Reusing the context constructed in the previous step
task_status_response = await a2a_agent.on_get_task(request=request, context=context)

print(f"Successfully retrieved status for Task ID: {task_id_to_get}")
print("\nFull task status response:")
print(task_status_response)

Passaggi successivi

Guida

Scopri i cinque modi per eseguire il deployment di un agente su Agent Platform Runtime in base alle tue esigenze di sviluppo.

Guida

Utilizza un agente Agent2Agent con Agent Platform Runtime.

Guida

Crea e implementa un agente di base e utilizza Gen AI evaluation service per valutarlo

Risoluzione dei problemi

Scopri come risolvere gli errori comuni durante la creazione di agenti personalizzati.

Risorsa

Trova risorse e assistenza per Google Agent Platform.

Risorsa

Esplora gli esempi di Agent2Agent in Python su GitHub.