Agent2Agent-Agent erstellen

Mit der Agent Runtime können Sie Agenten mit dem Agent2Agent-Protokoll (A2A) entwickeln und bereitstellen. A2A ist ein offener Standard, der eine nahtlose Kommunikation und Zusammenarbeit zwischen KI-Agenten ermöglicht.

In diesem Dokument wird beschrieben, wie Sie einen A2A-Agenten lokal entwickeln und testen. Dazu gehört auch das Definieren von Komponenten wie AgentCard und AgentExecutor.

Weitere Informationen zum Verwalten bereitgestellter Agents finden Sie unter Bereitgestellte Agents verwalten.

Der Kern-Workflow umfasst die folgenden Schritte:

  1. Wichtige Komponenten definieren
  2. Lokalen Agenten erstellen
  3. Lokalen Agenten testen

Agent-Komponenten definieren

Zum Erstellen eines A2A-Agenten müssen Sie die folgenden Komponenten definieren: eine AgentCard, eine AgentExecutor und ein ADK LlmAgent.

  • AgentCard enthält ein Metadatendokument, in dem die Funktionen Ihres Agents beschrieben werden. AgentCard ist wie eine Visitenkarte, mit der andere Agenten herausfinden können, was Ihr Agent kann. Weitere Informationen finden Sie in der Spezifikation für Agentenkarten.
  • AgentExecutor enthält die Kernlogik des KI-Agenten und definiert, wie er Aufgaben verarbeitet. Hier implementieren Sie das Verhalten des KI-Agenten. Weitere Informationen finden Sie in der A2A-Protokollspezifikation.
  • Optional: LlmAgent definiert den ADK-Agenten, einschließlich seiner Systemanweisungen, des generativen Modells und der Tools.

AgentCard definieren

Im folgenden Codebeispiel wird ein AgentCard für einen Agenten für Währungsumrechnungskurse definiert:

from a2a.types import AgentCard, AgentSkill
from vertexai.agent_engines.templates.a2a import create_agent_card

# Define the skill for the CurrencyAgent
currency_skill = AgentSkill(
    id='get_exchange_rate',
    name='Get Currency Exchange Rate',
    description='Retrieves the exchange rate between two currencies on a specified date.',
    tags=['Finance', 'Currency', 'Exchange Rate'],
    examples=[
        'What is the exchange rate from USD to EUR?',
        'How many Japanese Yen is 1 US dollar worth today?',
    ],
)

# Create the agent card using the utility function
agent_card = create_agent_card(
    agent_name='Currency Exchange Agent',
    description='An agent that can provide currency exchange rates',
    skills=[currency_skill]
)

AgentExecutor definieren

Im folgenden Codebeispiel wird eine AgentExecutor definiert, die mit dem Währungsumrechnungskurs antwortet. Es wird eine CurrencyAgent-Instanz verwendet und der ADK Runner wird initialisiert, um Anfragen auszuführen.

import requests
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a import types as a2a_types
from a2a.types import Part

from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.genai import types as genai_types

class CurrencyAgentExecutorWithRunner(AgentExecutor):
    """Executor that takes an LlmAgent instance and initializes the ADK Runner internally."""

    def __init__(self, agent: LlmAgent):
        self.agent = agent
        self.runner = None

    def _init_adk(self):
        if not self.runner:
            self.runner = Runner(
                app_name=self.agent.name,
                agent=self.agent,
                artifact_service=InMemoryArtifactService(),
                session_service=InMemorySessionService(),
                memory_service=InMemoryMemoryService(),
            )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        task_id = context.task_id
        updater = TaskUpdater(
            event_queue=event_queue,
            task_id=task_id or "",
            context_id=context.context_id or "",
        )
        await updater.cancel()

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        self._init_adk() # Initialize on first execute call

        if not context.message:
            return

        user_id = context.message.metadata.get('user_id') if context.message and context.message.metadata else 'a2a_user'

        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        
        task = a2a_types.Task(
            id=context.task_id,
            context_id=context.context_id,
            status=a2a_types.TaskStatus(state=a2a_types.TaskState.TASK_STATE_SUBMITTED),
            history=[context.message] if context.message else [],
        )
        await event_queue.enqueue_event(task)

        await updater.start_work()

        query = context.get_user_input()
        content = genai_types.Content(role='user', parts=[genai_types.Part.from_text(text=query)])

        try:
            session = await self.runner.session_service.get_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            ) or await self.runner.session_service.create_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id,
            )

            final_event = None
            async for event in self.runner.run_async(
                session_id=session.id,
                user_id=user_id,
                new_message=content
            ):
                if event.is_final_response():
                    final_event = event

            if final_event and final_event.content and final_event.content.parts:
                response_text = "".join(
                    part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
                )
                if response_text:
                    await updater.add_artifact(
                        [Part(text=response_text)],
                        name='result',
                        last_chunk=True,
                    )
                    await updater.complete()
                    return

            await updater.update_status(
                a2a_types.TaskState.TASK_STATE_FAILED,
                message=updater.new_agent_message([Part(text='Failed to generate a final response with text content.')]),
            )

        except Exception as e:
            await updater.update_status(
                a2a_types.TaskState.TASK_STATE_FAILED,
                message=updater.new_agent_message([Part(text=f"An error occurred: {str(e)}")]),
            )

LlmAgent definieren

Definieren Sie zuerst ein Tool für den Währungsumtausch, das LlmAgent verwenden soll:

def get_exchange_rate(
    currency_from: str = "USD",
    currency_to: str = "EUR",
    currency_date: str = "latest",
):
    """Retrieves the exchange rate between two currencies on a specified date.
    Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
    exchange rate data.
    """
    try:
        response = requests.get(
            f"https://api.frankfurter.app/{currency_date}",
            params={"from": currency_from, "to": currency_to},
        )
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        return {"error": str(e)}

Definieren Sie dann ein ADK-LlmAgent, das das Tool verwendet.

my_llm_agent = LlmAgent(
    model='gemini-2.0-flash',
    name='currency_exchange_agent',
    description='An agent that can provide currency exchange rates.',
    instruction="""You are a helpful currency exchange assistant.
                   Use the get_exchange_rate tool to answer user questions.
                   If the tool returns an error, inform the user about the error.""",
    tools=[get_exchange_rate],
)

Lokalen Agenten erstellen

Nachdem Sie die Komponenten Ihres Agents definiert haben, erstellen Sie eine Instanz der Klasse A2aAgent, die AgentCard, AgentExecutor und LlmAgent verwendet, um mit lokalen Tests zu beginnen.

from vertexai.agent_engines.templates.a2a import A2aAgent

a2a_agent = A2aAgent(
    agent_card=agent_card, # Assuming agent_card is defined
    agent_executor_builder=lambda: CurrencyAgentExecutorWithRunner(
        agent=my_llm_agent,
    )
)
a2a_agent.set_up()

Mit der A2A-Agent-Vorlage können Sie einen A2A-kompatiblen Dienst erstellen. Der Dienst fungiert als Wrapper und abstrahiert die Konvertierungsebene für Sie.

Lokalen Agenten testen

Der Währungsumrechnungs-Agent unterstützt die folgenden drei Methoden:

  • handle_authenticated_agent_card
  • on_message_send
  • on_get_task

handle_authenticated_agent_card testen

Mit dem folgenden Code wird die authentifizierte Karte des Agents abgerufen, in der die Funktionen des Agents beschrieben werden.

# Test the `authenticated_agent_card` endpoint.
response_get_card = await a2a_agent.handle_authenticated_agent_card(request=None, context=None)
print(response_get_card)

on_message_send testen

Der folgende Code simuliert, dass ein Client eine neue Nachricht an den Agenten sendet. Mit A2aAgent wird eine neue Aufgabe erstellt und die ID der Aufgabe zurückgegeben.

from a2a.types import SendMessageRequest, Message, Part
from a2a.server.context import ServerCallContext

# 1. Define the message
message = Message(
    role="ROLE_USER",
    message_id="local-test-message-id",
    parts=[Part(text="What is the exchange rate from USD to EUR today?")]
)

# 2. Construct the request
request = SendMessageRequest(message=message)

# 3. Construct context
context = ServerCallContext()

# 4. Call the agent
send_message_response = await a2a_agent.on_message_send(request=request, context=context)

print(send_message_response)

on_get_task testen

Mit dem folgenden Code werden der Status und das Ergebnis einer Aufgabe abgerufen. Die Ausgabe zeigt, dass die Aufgabe abgeschlossen ist, und enthält das Antwortartefakt „Hello World“.

from a2a.types import GetTaskRequest

# 1. Provide the task_id from the previous step.
# In a real application, you would store and retrieve this ID.
task_id_to_get = send_message_response.id

# 2. Construct the request
request = GetTaskRequest(id=task_id_to_get)

# 3. Call the agent's handler to get the task status.
# Reusing the context constructed in the previous step
task_status_response = await a2a_agent.on_get_task(request=request, context=context)

print(f"Successfully retrieved status for Task ID: {task_id_to_get}")
print("\nFull task status response:")
print(task_status_response)

Nächste Schritte

Leitfaden

Hier erfahren Sie, wie Sie einen Agenten in Agent Platform Runtime bereitstellen können.

Leitfaden

Agent2Agent-Agent mit Agent Platform Runtime verwenden

Leitfaden

Einen einfachen Agenten erstellen und bereitstellen und den Gen AI Evaluation Service verwenden, um den Agenten zu bewerten

Fehlerbehebung

Hier erfahren Sie, wie Sie häufige Fehler beim Erstellen benutzerdefinierter Agents beheben.

Ressource

Hier finden Sie Ressourcen und Support für die Google Agent Platform.

Ressource

Sehen Sie sich die Agent2Agent-Beispiele in Python auf GitHub an.