Questa pagina descrive come utilizzare lo streaming bidirezionale con il runtime di Vertex AI Agent Engine.
Panoramica
Lo streaming bidirezionale fornisce un canale di comunicazione persistente e bidirezionale tra l'applicazione e l'agente, consentendoti di andare oltre i pattern di richiesta-risposta a turni. Lo streaming bidirezionale funziona per i casi d'uso in cui l'agente deve elaborare le informazioni e rispondere continuamente, ad esempio interagire con input audio o video a bassa latenza.
Lo streaming bidirezionale con il runtime di Vertex AI Agent Engine supporta i casi d'uso degli agenti interattivi in tempo reale e lo scambio di dati per le API live multimodali. Lo streaming bidirezionale è supportato per tutti i framework e i metodi di streaming bidirezionale personalizzati sono disponibili tramite la registrazione di metodi personalizzati. Puoi utilizzare lo streaming bidirezionale per interagire direttamente con l'API Gemini Live o utilizzando l'Agent Development Kit (ADK) in Vertex AI Agent Engine.
Il deployment di un agente remoto con metodi di query bidirezionali è ben supportato dall'SDK Google GenAI. Per eseguire il deployment di un agente con funzionalità bidirezionali, imposta la modalità server dell'agente EXPERIMENTAL quando utilizzi l'SDK o chiami l'API Agent Engine.
Sviluppare un agente
Durante lo sviluppo di un agente, segui questi passaggi per implementare lo streaming bidirezionale:
Registra metodi personalizzati (facoltativo)
Definisci un metodo di query di streaming bidirezionale
Per rendere l'agente "compatibile con la comunicazione bidirezionale", devi definire un metodo bidi_stream_query che accetta in modo asincrono le richieste di streaming come input e restituisce risposte di streaming. Ad esempio, il seguente modello estende il modello di base per trasmettere in streaming richieste e risposte ed è implementabile in Agent Engine:
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Quando utilizzi l'API di streaming bidirezionale, tieni presente quanto segue:
asyncio.Queue: puoi inserire qualsiasi tipo di dati in questa coda di richieste in attesa di essere inviati all'API del modello.Timeout massimo: il timeout massimo per la query di streaming bidirezionale è di 10 minuti. Se l'agente richiede tempi di elaborazione più lunghi, valuta la possibilità di suddividere l'attività in parti più piccole e utilizzare la sessione o la memoria per mantenere lo stato persistente.
Limita il consumo di contenuti: quando utilizzi contenuti da uno stream bidirezionale, è importante gestire la velocità con cui l'agente elabora i dati in entrata. Se l'agente consuma i dati troppo lentamente, possono verificarsi problemi come aumento della latenza o pressione della memoria sul lato server. Implementa meccanismi per eseguire il pull attivo dei dati quando l'agente è pronto per elaborarli ed evita operazioni di blocco che potrebbero interrompere il consumo di contenuti.
Limita la generazione di contenuti: se riscontri problemi di contropressione (il produttore genera dati più velocemente di quanto il consumatore possa elaborarli), devi limitare la velocità di generazione dei contenuti. In questo modo puoi evitare overflow del buffer e garantire un'esperienza di streaming fluida.
Testa il metodo di query di streaming bidirezionale
Puoi testare la query di streaming bidirezionale localmente chiamando il metodo bidi_stream_query e scorrendo i risultati:
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
La stessa connessione di query bidirezionale può gestire più richieste e risposte. Per ogni nuova richiesta dalla coda, l'esempio seguente genera uno stream di blocchi contenenti informazioni diverse sulla risposta:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
(Facoltativo) Registra metodi personalizzati
Le operazioni possono essere registrate come modalità di esecuzione standard (rappresentate da una stringa vuota ""), di streaming (stream) o di streaming bidirezionale (bidi_stream).
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
Esegui il deployment di un agente
Dopo aver sviluppato l'agente come live_agent, puoi eseguirne il deployment in Agent Engine creando un'istanza di Agent Engine.
Tieni presente che con l'SDK GenAI tutte le configurazioni di deployment (pacchetti aggiuntivi e controlli delle risorse personalizzati) vengono assegnate come valore di config durante la creazione dell'istanza di Agent Engine.
Inizializza il client GenAI:
import vertexai
from vertexai import types as vertexai_types
client = vertexai.Client(project=PROJECT, location=LOCATION)
Esegui il deployment dell'agente in Agent Engine. Tieni presente che agent_server_mode EXPERIMENTAL è obbligatorio per un agente che supporta lo streaming bidirezionale:
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
"agent_server_mode": vertexai_types.AgentServerMode.EXPERIMENTAL,
},
)
Per informazioni sui passaggi eseguiti in background durante il deployment, consulta Creare un'istanza di AgentEngine.
Recupera l'ID risorsa dell'agente:
remote_live_agent.api_resource.name
Utilizzare un agente
Se hai definito un'operazione bidi_stream_query durante lo sviluppo dell'agente, puoi eseguire una query di streaming bidirezionale sull'agente in modo asincrono utilizzando l'SDK GenAI per Python.
Puoi modificare l'esempio seguente con tutti i dati riconoscibili dall'agente, utilizzando qualsiasi logica di terminazione applicabile per lo stream di input e lo stream di output:
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
Il runtime di Vertex AI Agent Engine trasmette le risposte in streaming come sequenza di oggetti generati in modo iterativo. Ad esempio, un insieme di due risposte nel primo turno potrebbe avere il seguente aspetto:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Utilizzare un agente dell'Agent Development Kit
Se hai sviluppato l'agente utilizzando l'Agent Development Kit (ADK), puoi utilizzare lo streaming bidirezionale per interagire con l'API Gemini Live.
L'esempio seguente crea un agente conversazionale che accetta domande di testo dell'utente e riceve dati audio di risposta dell'API Gemini Live:
import numpy as np
from google.adk.agents.live_request_queue import LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text == "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))
Passaggi successivi
- Scopri di più sull'API Gemini Live.