Crea un agente de Agent2Agent

Agent Runtime te permite desarrollar y, luego, implementar agentes con el protocolo Agent2Agent (A2A). A2A es un estándar abierto diseñado para permitir la comunicación y la colaboración fluidas entre agentes de IA.

En este documento, se explica cómo desarrollar y probar un agente de A2A de forma local, lo que incluye la definición de componentes como AgentCard y AgentExecutor.

Para obtener más información sobre cómo administrar los agentes implementados, consulta Administra agentes implementados.

El flujo de trabajo principal implica los siguientes pasos:

  1. Define los componentes clave
  2. Crear agente local
  3. Prueba el agente local

Define los componentes del agente

Para crear un agente de A2A, debes definir los siguientes componentes: un AgentCard, un AgentExecutor y un LlmAgent de ADK.

  • AgentCard contiene un documento de metadatos que describe las capacidades de tu agente. AgentCard es como una tarjeta de presentación que otros agentes pueden usar para descubrir lo que tu agente puede hacer. Para obtener más detalles, consulta la especificación de la tarjeta de agente.
  • AgentExecutor contiene la lógica principal del agente y define cómo maneja las tareas. Aquí es donde implementas el comportamiento del agente. Puedes obtener más información sobre este tema en la especificación del protocolo A2A.
  • Opcional: LlmAgent define el agente del ADK, incluidas sus instrucciones del sistema, su modelo generativo y sus herramientas.

Define un AgentCard

En la siguiente muestra de código, se define un AgentCard para un agente de tipo de cambio:

from a2a.types import AgentCard, AgentSkill
from vertexai.agent_engines.templates.a2a import create_agent_card

# Define the skill for the CurrencyAgent
currency_skill = AgentSkill(
    id='get_exchange_rate',
    name='Get Currency Exchange Rate',
    description='Retrieves the exchange rate between two currencies on a specified date.',
    tags=['Finance', 'Currency', 'Exchange Rate'],
    examples=[
        'What is the exchange rate from USD to EUR?',
        'How many Japanese Yen is 1 US dollar worth today?',
    ],
)

# Create the agent card using the utility function
agent_card = create_agent_card(
    agent_name='Currency Exchange Agent',
    description='An agent that can provide currency exchange rates',
    skills=[currency_skill]
)

Define un AgentExecutor

En el siguiente ejemplo de código, se define un AgentExecutor que responde con el tipo de cambio. Toma una instancia de CurrencyAgent y, luego, inicializa el ADK Runner para ejecutar solicitudes.

import requests
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a import types as a2a_types
from a2a.types import Part

from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.genai import types as genai_types

class CurrencyAgentExecutorWithRunner(AgentExecutor):
    """Executor that takes an LlmAgent instance and initializes the ADK Runner internally."""

    def __init__(self, agent: LlmAgent):
        self.agent = agent
        self.runner = None

    def _init_adk(self):
        if not self.runner:
            self.runner = Runner(
                app_name=self.agent.name,
                agent=self.agent,
                artifact_service=InMemoryArtifactService(),
                session_service=InMemorySessionService(),
                memory_service=InMemoryMemoryService(),
            )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        task_id = context.task_id
        updater = TaskUpdater(
            event_queue=event_queue,
            task_id=task_id or "",
            context_id=context.context_id or "",
        )
        await updater.cancel()

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        self._init_adk() # Initialize on first execute call

        if not context.message:
            return

        user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user'

        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        
        task = a2a_types.Task(
            id=context.task_id,
            context_id=context.context_id,
            status=a2a_types.TaskStatus(state=a2a_types.TaskState.TASK_STATE_SUBMITTED),
            history=[context.message] if context.message else [],
        )
        await event_queue.enqueue_event(task)

        await updater.start_work()

        query = context.get_user_input()
        content = genai_types.Content(role='user', parts=[genai_types.Part.from_text(text=query)])

        try:
            session = await self.runner.session_service.get_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            ) or await self.runner.session_service.create_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            )

            final_event = None
            async for event in self.runner.run_async(
                session_id=session.id,
                user_id=user_id,
                new_message=content
            ):
                if event.is_final_response():
                    final_event = event

            if final_event and final_event.content and final_event.content.parts:
                response_text = "".join(
                    part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
                )
                if response_text:
                    await updater.add_artifact(
                        [Part(text=response_text)],
                        name='result',
                        last_chunk=True,
                    )
                    await updater.complete()
                    return

            await updater.update_status(
                a2a_types.TaskState.TASK_STATE_FAILED,
                message=updater.new_agent_message([Part(text='Failed to generate a final response with text content.')]),
            )

        except Exception as e:
            await updater.update_status(
                a2a_types.TaskState.TASK_STATE_FAILED,
                message=updater.new_agent_message([Part(text=f"An error occurred: {str(e)}")]),
            )

Define un LlmAgent

Primero, define una herramienta de cambio de divisas para que la use LlmAgent:

def get_exchange_rate(
    currency_from: str = "USD",
    currency_to: str = "EUR",
    currency_date: str = "latest",
):
    """Retrieves the exchange rate between two currencies on a specified date.
    Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
    exchange rate data.
    """
    try:
        response = requests.get(
            f"https://api.frankfurter.app/{currency_date}",
            params={"from": currency_from, "to": currency_to},
        )
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": str(e)}

Luego, define un LlmAgent del ADK que use la herramienta.

my_llm_agent = LlmAgent(
    model='gemini-2.0-flash',
    name='currency_exchange_agent',
    description='An agent that can provide currency exchange rates.',
    instruction="""You are a helpful currency exchange assistant.
                   Use the get_exchange_rate tool to answer user questions.
                   If the tool returns an error, inform the user about the error.""",
    tools=[get_exchange_rate],
)

Crea un agente local

Una vez que hayas definido los componentes de tu agente, crea una instancia de la clase A2aAgent que use AgentCard, AgentExecutor y LlmAgent para comenzar las pruebas locales.

from vertexai.agent_engines.templates.a2a import A2aAgent

a2a_agent = A2aAgent(
    agent_card=agent_card, # Assuming agent_card is defined
    agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner(
        agent=my_llm_agent,
    )
)
a2a_agent.set_up()

La plantilla de agente A2A te ayuda a crear un servicio compatible con A2A. El servicio actúa como un wrapper, abstrayendo la capa de conversión.

Prueba el agente local

El agente de tipo de cambio admite los siguientes tres métodos:

  • handle_authenticated_agent_card
  • on_message_send
  • on_get_task

Prueba handle_authenticated_agent_card

El siguiente código recupera la tarjeta autenticada del agente, que describe sus capacidades.

# Test the `authenticated_agent_card` endpoint.
response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None)
print(response_get_card)

Prueba on_message_send

El siguiente código simula un cliente que envía un mensaje nuevo al agente. El método A2aAgent crea una tarea nueva y devuelve su ID.

from a2a.types import SendMessageRequest, Message, Part
from a2a.server.context import ServerCallContext

# 1. Define the message
message = Message(
    role="ROLE_USER",
    message_id="local-test-message-id",
    parts=[Part(text="What is the exchange rate from USD to EUR today?")]
)

# 2. Construct the request
request = SendMessageRequest(message=message)

# 3. Construct context
context = ServerCallContext()

# 4. Call the agent
send_message_response = await a2a_agent.on_message_send(request=request, context=context)

print(send_message_response)

Prueba on_get_task

El siguiente código recupera el estado y el resultado de una tarea. El resultado muestra que la tarea se completó y que incluye el artefacto de respuesta "Hello World".

from a2a.types import GetTaskRequest

# 1. Provide the task_id from the previous step.
# In a real application, you would store and retrieve this ID.
task_id_to_get = send_message_response.id

# 2. Construct the request
request = GetTaskRequest(id=task_id_to_get)

# 3. Call the agent's handler to get the task status.
# Reusing the context constructed in the previous step
task_status_response = await a2a_agent.on_get_task(request=request, context=context)

print(f"Successfully retrieved status for Task ID: {task_id_to_get}")
print("\nFull task status response:")
print(task_status_response)

¿Qué sigue?

Guía

Conoce las cinco formas de implementar un agente en el entorno de ejecución de Agent Platform según tus necesidades de desarrollo.

Guía

Usar un agente Agent2Agent con el entorno de ejecución de Agent Platform

Guía

Crea e implementa un agente básico, y usa Gen AI Evaluation Service para evaluarlo

Solución de problemas

Obtén información para resolver errores comunes cuando creas agentes personalizados.

Recurso

Encuentra recursos y asistencia para Google Agent Platform.

Recurso

Explora ejemplos de Agent2Agent en Python en GitHub.