Bidirektionales Streaming in der Agent Runtime ermöglicht eine dauerhafte bidirektionale Kommunikation zwischen Ihrer Anwendung und einem Agenten, die über herkömmliche Anfrage-Antwort-Muster hinausgeht. In diesem Dokument wird erläutert, wie Sie bidirektionale Streaming-Agenten für Echtzeit-Anwendungsfälle wie Audio- oder Videointeraktion entwickeln, testen und bereitstellen.
Übersicht
Bidirektionales Streaming bietet einen dauerhaften bidirektionalen Kommunikationskanal zwischen Ihrer Anwendung und dem Agenten, mit dem Sie über turnbasierte Anfrage-Antwort-Muster hinausgehen können. Bidirektionales Streaming eignet sich für Anwendungsfälle, in denen Ihr Agent Informationen verarbeiten und kontinuierlich antworten muss, z. B. bei der Interaktion mit Audio- oder Videoeingaben mit niedriger Latenz.
Bidirektionales Streaming mit der Agent Runtime unterstützt interaktive Echtzeit-Anwendungsfälle für Agenten und den Datenaustausch für multimodale Live-APIs. Bidirektionales Streaming wird für alle Frameworks unterstützt. Benutzerdefinierte bidirektionale Streaming Methoden sind durch die Registrierung benutzerdefinierter Methoden verfügbar. Sie können bidirektionales Streaming verwenden, um direkt mit der Gemini Live API oder mit dem Agent Development Kit (ADK) auf der Agent Platform zu interagieren.
Die Bereitstellung eines Remote-Agenten mit bidirektionalen Abfragemethoden wird vom Google GenAI SDK gut unterstützt. Wenn Sie einen bidirektionalen Agenten bereitstellen möchten, legen Sie den Agentenservermodus EXPERIMENTAL fest, wenn Sie das SDK verwenden oder die Agent Platform API aufrufen.
Agent entwickeln
Führen Sie beim Entwickeln eines Agenten die folgenden Schritte aus, um bidirektionales Streaming zu implementieren:
Benutzerdefinierte Methoden registrieren (optional)
Bidirektionale Streaming-Abfragemethode definieren
Damit Ihr Agent bidirektional arbeiten kann, müssen Sie eine bidi_stream_query-Methode definieren, die asynchron Streamanfragen als Eingabe akzeptiert und Streamingantworten ausgibt. Die folgende Vorlage erweitert beispielsweise die grundlegende Vorlage, um Anfragen und Antworten zu streamen, und kann auf der Gemini Enterprise Agent Platform bereitgestellt werden:
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Beachten Sie bei der Verwendung der bidirektionalen Streaming-API Folgendes:
asyncio.Queue: Sie können jeden Datentyp in diese Anfragewarteschlange einfügen, um ihn an die Modell-API zu senden.Maximale Zeitüberschreitung: Die maximale Zeitüberschreitung für die bidirektionale Streaming-Abfrage beträgt 10 Minuten. Wenn Ihr Agent längere Verarbeitungszeiten benötigt, sollten Sie die Aufgabe in kleinere Teile aufteilen und Sitzungen oder Arbeitsspeicher verwenden, um den Status beizubehalten.
Inhaltsverbrauch drosseln: Wenn Sie Inhalte aus einem bidirektionalen Stream verwenden, ist es wichtig, die Rate zu verwalten, mit der Ihr Agent eingehende Daten verarbeitet. Wenn Ihr Agent Daten zu langsam verarbeitet, kann dies zu Problemen wie erhöhter Latenz oder Arbeitsspeicherlast auf der Serverseite führen. Implementieren Sie Mechanismen, um Daten aktiv abzurufen, wenn Ihr Agent bereit ist, sie zu verarbeiten, und vermeiden Sie blockierende Vorgänge, die den Inhaltsverbrauch unterbrechen könnten.
Inhaltsgenerierung drosseln: Wenn Sie Probleme mit dem Rückstau haben (der Producer generiert Daten schneller als der Consumer sie verarbeiten kann), sollten Sie die Rate der Inhaltsgenerierung drosseln. So lassen sich Pufferüberläufe vermeiden und ein reibungsloses Streaming gewährleisten.
Bidirektionale Streaming-Abfragemethode testen
Sie können die bidirektionale Streaming-Abfrage lokal testen, indem Sie die Methode bidi_stream_query aufrufen und die Ergebnisse durchlaufen:
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
Dieselbe bidirektionale Abfrageverbindung kann mehrere Anfragen und Antworten verarbeiten. Für jede neue Anfrage aus der Warteschlange generiert das folgende Beispiel einen Stream von Chunks mit verschiedenen Informationen zur Antwort:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
Optional: Benutzerdefinierte Methoden registrieren
Vorgänge können entweder als Standardmodus (durch einen leeren String "" dargestellt), Streamingmodus (stream) oder bidirektionaler Streamingmodus (bidi_stream) registriert werden.
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
Agent bereitstellen
Nachdem Sie Ihren Agenten als live_agent entwickelt haben, können Sie ihn auf der Agent Platform bereitstellen, indem Sie eine Agent Platform-Instanz erstellen.
Beachten Sie, dass mit dem GenAI SDK alle Bereitstellungskonfigurationen (zusätzliche Pakete und benutzerdefinierte Ressourcensteuerungen) als Wert von config zugewiesen werden, wenn die Agent Platform-Instanz erstellt wird.
Initialisieren Sie den GenAI-Client:
import vertexai
from vertexai import types as vertexai_types
client = vertexai.Client(project=PROJECT, location=LOCATION)
Stellen Sie den Agenten auf der Agent Platform bereit. Für einen Agenten, der bidirektionales Streaming unterstützt, ist der agent_server_mode EXPERIMENTAL erforderlich:
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
"agent_server_mode": vertexai_types.AgentServerMode.EXPERIMENTAL,
},
)
Informationen zu den Schritten, die im Hintergrund während der Bereitstellung ausgeführt werden, finden Sie unter Agent Runtime-Instanz erstellen.
Rufen Sie die Agentenressourcen-ID ab:
remote_live_agent.api_resource.name
Agent verwenden
Wenn Sie beim Entwickeln Ihres Agenten einen bidi_stream_query-Vorgang definiert haben, können Sie den Agenten mit dem GenAI SDK für Python asynchron bidirektional abfragen.
Sie können das folgende Beispiel mit allen Daten ändern, die von Ihrem Agenten erkannt werden, und eine beliebige geeignete Beendigungslogik für den Eingabe- und Ausgabestream verwenden:
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
Die Agent Runtime streamt Antworten als Sequenz von iterativ generierten Objekten. Ein Satz von zwei Antworten in der ersten Runde könnte beispielsweise so aussehen:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Agent Development Kit-Agent verwenden
Wenn Sie Ihren Agenten mit dem Agent Development Kit (ADK) entwickelt haben, können Sie bidirektionales Streaming verwenden, um mit der Gemini Live API zu interagieren.
Im folgenden Beispiel wird ein Konversationsagent erstellt, der Textfragen von Nutzern entgegennimmt und Audiodaten von Gemini Live API-Antworten empfängt:
import numpy as np
from google.adk.agents.live_request_queue import LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text == "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))