Créer un agent Agent2Agent

Agent Runtime vous permet de développer et de déployer des agents à l'aide du protocole Agent2Agent (A2A). A2A est une norme ouverte conçue pour permettre une communication et une collaboration fluides entre les agents d'IA.

Ce document explique comment développer et tester un agent A2A localement, y compris comment définir des composants tels que AgentCard et AgentExecutor.

Pour en savoir plus sur la gestion de vos agents déployés, consultez Gérer les agents déployés.

Le workflow de base comprend les étapes suivantes :

  1. Définir les composants clés
  2. Créer un agent local
  3. Tester l'agent local

Définir les composants de l'agent

Pour créer un agent A2A, vous devez définir les composants suivants : un AgentCard, un AgentExecutor et un LlmAgent ADK.

  • AgentCard contient un document de métadonnées qui décrit les fonctionnalités de votre agent. AgentCard est comme une carte de visite que d'autres agents peuvent utiliser pour découvrir ce que votre agent peut faire. Pour en savoir plus, consultez la spécification de la carte d'agent.
  • AgentExecutor contient la logique de base de l'agent et définit la manière dont il gère les tâches. C'est là que vous implémentez le comportement de l'agent. Pour en savoir plus à ce sujet dans la spécification du protocole A2A.
  • Facultatif : LlmAgent définit l'agent ADK, y compris ses instructions système, son modèle génératif et ses outils.

Définir un AgentCard

L'exemple de code suivant définit un AgentCard pour un agent de taux de change :

from a2a.types import AgentCard, AgentSkill
from vertexai.agent_engines.templates.a2a import create_agent_card

# Define the skill for the CurrencyAgent
currency_skill = AgentSkill(
    id='get_exchange_rate',
    name='Get Currency Exchange Rate',
    description='Retrieves the exchange rate between two currencies on a specified date.',
    tags=['Finance', 'Currency', 'Exchange Rate'],
    examples=[
        'What is the exchange rate from USD to EUR?',
        'How many Japanese Yen is 1 US dollar worth today?',
    ],
)

# Create the agent card using the utility function
agent_card = create_agent_card(
    agent_name='Currency Exchange Agent',
    description='An agent that can provide currency exchange rates',
    skills=[currency_skill]
)

Définir un AgentExecutor

L'exemple de code suivant définit un AgentExecutor qui répond avec le taux de change. Il prend une instance CurrencyAgent et initialise ADK Runner pour exécuter les requêtes.

import requests
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a import types as a2a_types
from a2a.types import Part

from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.genai import types as genai_types

class CurrencyAgentExecutorWithRunner(AgentExecutor):
    """Executor that takes an LlmAgent instance and initializes the ADK Runner internally."""

    def __init__(self, agent: LlmAgent):
        self.agent = agent
        self.runner = None

    def _init_adk(self):
        if not self.runner:
            self.runner = Runner(
                app_name=self.agent.name,
                agent=self.agent,
                artifact_service=InMemoryArtifactService(),
                session_service=InMemorySessionService(),
                memory_service=InMemoryMemoryService(),
            )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        task_id = context.task_id
        updater = TaskUpdater(
            event_queue=event_queue,
            task_id=task_id or "",
            context_id=context.context_id or "",
        )
        await updater.cancel()

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        self._init_adk() # Initialize on first execute call

        if not context.message:
            return

        user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user'

        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        
        task = a2a_types.Task(
            id=context.task_id,
            context_id=context.context_id,
            status=a2a_types.TaskStatus(state=a2a_types.TaskState.TASK_STATE_SUBMITTED),
            history=[context.message] if context.message else [],
        )
        await event_queue.enqueue_event(task)

        await updater.start_work()

        query = context.get_user_input()
        content = genai_types.Content(role='user', parts=[genai_types.Part.from_text(text=query)])

        try:
            session = await self.runner.session_service.get_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            ) or await self.runner.session_service.create_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            )

            final_event = None
            async for event in self.runner.run_async(
                session_id=session.id,
                user_id=user_id,
                new_message=content
            ):
                if event.is_final_response():
                    final_event = event

            if final_event and final_event.content and final_event.content.parts:
                response_text = "".join(
                    part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
                )
                if response_text:
                    await updater.add_artifact(
                        [Part(text=response_text)],
                        name='result',
                        last_chunk=True,
                    )
                    await updater.complete()
                    return

            await updater.update_status(
                a2a_types.TaskState.TASK_STATE_FAILED,
                message=updater.new_agent_message([Part(text='Failed to generate a final response with text content.')]),
            )

        except Exception as e:
            await updater.update_status(
                a2a_types.TaskState.TASK_STATE_FAILED,
                message=updater.new_agent_message([Part(text=f"An error occurred: {str(e)}")]),
            )

Définir un LlmAgent

Commencez par définir un outil de change que LlmAgent pourra utiliser :

def get_exchange_rate(
    currency_from: str = "USD",
    currency_to: str = "EUR",
    currency_date: str = "latest",
):
    """Retrieves the exchange rate between two currencies on a specified date.
    Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
    exchange rate data.
    """
    try:
        response = requests.get(
            f"https://api.frankfurter.app/{currency_date}",
            params={"from": currency_from, "to": currency_to},
        )
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": str(e)}

Définissez ensuite un LlmAgent ADK qui utilise l'outil.

my_llm_agent = LlmAgent(
    model='gemini-2.0-flash',
    name='currency_exchange_agent',
    description='An agent that can provide currency exchange rates.',
    instruction="""You are a helpful currency exchange assistant.
                   Use the get_exchange_rate tool to answer user questions.
                   If the tool returns an error, inform the user about the error.""",
    tools=[get_exchange_rate],
)

Créer un agent local

Une fois que vous avez défini les composants de votre agent, créez une instance de la A2aAgent classe qui utilise les AgentCard, AgentExecutor et LlmAgent pour commencer les tests locaux.

from vertexai.agent_engines.templates.a2a import A2aAgent

a2a_agent = A2aAgent(
    agent_card=agent_card, # Assuming agent_card is defined
    agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner(
        agent=my_llm_agent,
    )
)
a2a_agent.set_up()

Le modèle d'agent A2A vous aide à créer un service compatible avec A2A. Le service fait office de wrapper, en vous évitant d'avoir à gérer la couche de conversion.

Tester l'agent local

L'agent de taux de change est compatible avec les trois méthodes suivantes :

  • handle_authenticated_agent_card
  • on_message_send
  • on_get_task

Tester handle_authenticated_agent_card

Le code suivant récupère la carte authentifiée de l'agent, qui décrit ses fonctionnalités.

# Test the `authenticated_agent_card` endpoint.
response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None)
print(response_get_card)

Tester on_message_send

Le code suivant simule un client envoyant un nouveau message à l'agent. A2aAgent crée une tâche et renvoie son ID.

from a2a.types import SendMessageRequest, Message, Part
from a2a.server.context import ServerCallContext

# 1. Define the message
message = Message(
    role="ROLE_USER",
    message_id="local-test-message-id",
    parts=[Part(text="What is the exchange rate from USD to EUR today?")]
)

# 2. Construct the request
request = SendMessageRequest(message=message)

# 3. Construct context
context = ServerCallContext()

# 4. Call the agent
send_message_response = await a2a_agent.on_message_send(request=request, context=context)

print(send_message_response)

Tester on_get_task

Le code suivant récupère l'état et le résultat d'une tâche. Le résultat indique que la tâche est terminée et inclut l'artefact de réponse "Hello World".

from a2a.types import GetTaskRequest

# 1. Provide the task_id from the previous step.
# In a real application, you would store and retrieve this ID.
task_id_to_get = send_message_response.id

# 2. Construct the request
request = GetTaskRequest(id=task_id_to_get)

# 3. Call the agent's handler to get the task status.
# Reusing the context constructed in the previous step
task_status_response = await a2a_agent.on_get_task(request=request, context=context)

print(f"Successfully retrieved status for Task ID: {task_id_to_get}")
print("\nFull task status response:")
print(task_status_response)

Étape suivante

Guide

Découvrez les cinq façons de déployer un agent sur Agent Platform Runtime en fonction de vos besoins de développement.

Guide

Utilisez un agent Agent2Agent avec Agent Platform Runtime.

Guide

Créez et déployez un agent de base, puis utilisez le service d'évaluation Gen AI pour l'évaluer.

Dépannage

Découvrez comment résoudre les erreurs courantes lors de la création d'agents personnalisés.

Ressource

Trouvez des ressources et de l'aide pour Google Agent Platform.

Ressource

Explorez des exemples Agent2Agent en Python sur GitHub.