O streaming bidirecional no Agent Runtime permite uma comunicação persistente e bidirecional entre o aplicativo e um agente, indo além dos padrões convencionais de solicitação-resposta. Este documento explica como desenvolver, testar e implantar agentes de streaming bidirecional para casos de uso em tempo real, como interação de áudio ou vídeo.
Visão geral
O streaming bidirecional oferece um canal de comunicação persistente e bidirecional entre o aplicativo e o agente, permitindo que você vá além dos padrões de solicitação-resposta baseados em turnos. O streaming bidirecional funciona para casos de uso em que o agente precisa processar informações e responder continuamente, como interagir com entradas de áudio ou vídeo com baixa latência.
O streaming bidirecional com o Agent Runtime oferece suporte a casos de uso de agentes interativos e em tempo real e troca de dados para APIs multimodais em tempo real. O streaming bidirecional é compatível com todas as estruturas, e métodos de streaming bidirecional personalizados estão disponíveis através de registrando métodos personalizados. É possível usar o streaming bidirecional para interagir com a API Gemini Live diretamente ou usando o Kit de Desenvolvimento de Agente (ADK) na plataforma de agentes.
A implantação de um agente remoto com métodos de consulta bidirecional tem bom suporte do SDK da IA generativa do Google. Para implantar um agente com capacidade bidirecional, defina o modo de servidor do agente EXPERIMENTAL ao usar o SDK ou chamar a API do Agent Platform.
Desenvolver um agente
Ao desenvolver um agente, siga estas etapas para implementar o streaming bidirecional:
Registrar métodos personalizados (opcional)
Definir um método de consulta de streaming bidirecional
Para tornar seu agente "compatível com bidi", é necessário definir um método bidi_stream_query que receba solicitações de stream de forma assíncrona como entrada e gere respostas de streaming. Como exemplo, o modelo a seguir estende o modelo básico para transmitir solicitações e respostas e pode ser implantado na Gemini Enterprise Agent Platform:
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Ao usar a API de streaming bidirecional, lembre-se do seguinte:
asyncio.Queue: é possível colocar qualquer tipo de dados nessa fila de solicitações para aguardar o envio à API do modelo.Tempo limite máximo: o tempo limite máximo para consulta de streaming bidirecional é de 10 minutos. Se o agente exigir tempos de processamento mais longos, considere dividir a tarefa em partes menores e usar a sessão ou a memória para manter o estado persistido.
Limitar o consumo de conteúdo: ao consumir conteúdo de um stream bidirecional, é importante gerenciar a taxa em que o agente processa os dados recebidos. Se o agente consumir dados muito lentamente, isso poderá gerar problemas como aumento da latência ou pressão de memória no lado do servidor. Implemente mecanismos para extrair dados ativamente quando o agente estiver pronto para processá-los e evite bloquear operações que possam interromper o consumo de conteúdo.
Limitar a geração de conteúdo: se você encontrar problemas de contrapressão (em que o produtor gera dados mais rápido do que o consumidor pode processá-los), limite a taxa de geração de conteúdo. Isso pode ajudar a evitar estouros de buffer e garantir uma experiência de streaming tranquila.
Testar o método de consulta de streaming bidirecional
É possível testar a consulta de streaming bidirecional localmente chamando o método bidi_stream_query e iterando os resultados:
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
A mesma conexão de consulta bidirecional pode processar várias solicitações e respostas. Para cada nova solicitação da fila, o exemplo a seguir gera um stream de blocos que contém informações diferentes sobre a resposta:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
Opcional: registrar métodos personalizados
As operações podem ser registradas como modos de execução padrão (representados por uma string vazia ""), de streaming (stream) ou de streaming bidirecional (bidi_stream).
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
Implantar um agente
Depois de desenvolver o agente como live_agent, é possível implantá-lo no Agent Platform criando uma instância do Agent Platform.
Observe que, com o SDK da IA generativa, todas as configurações de implantação (pacotes adicionais e controles de recursos personalizados) são atribuídas como um valor de config ao criar a instância do Agent Platform.
Inicialize o cliente da IA generativa:
import vertexai
from vertexai import types as vertexai_types
client = vertexai.Client(project=PROJECT, location=LOCATION)
Implante o agente na plataforma de agentes. Observe que o EXPERIMENTAL
agent_server_mode é necessário para um agente que oferece suporte a streaming bidirecional:
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
"agent_server_mode": vertexai_types.AgentServerMode.EXPERIMENTAL,
},
)
Para informações sobre as etapas que acontecem em segundo plano durante a implantação, consulte Criar uma instância do Agent Runtime.
Receba o ID do recurso do agente:
remote_live_agent.api_resource.name
Usar um agente
Se você definiu uma operação bidi_stream_query ao desenvolver o agente, é possível consultar o agente de streaming bidirecional de forma assíncrona usando o SDK da IA generativa para Python.
É possível modificar o exemplo a seguir com qualquer dado reconhecível pelo agente, usando qualquer lógica de encerramento aplicável para o stream de entrada e o stream de saída:
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
O Agent Runtime transmite respostas como uma sequência de objetos gerados de forma iterativa. Por exemplo, um conjunto de duas respostas no primeiro turno pode ser semelhante ao seguinte:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Usar um agente do Kit de Desenvolvimento de Agente
Se você desenvolveu o agente usando o Kit de Desenvolvimento de Agente (ADK), é possível usar o streaming bidirecional para interagir com a API Gemini Live.
O exemplo a seguir cria um agente de conversa que recebe perguntas de texto do usuário e recebe dados de áudio de resposta da API Gemini Live:
import numpy as np
from google.adk.agents.live_request_queue import LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text == "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))