La transmisión bidireccional en Agent Runtime permite una comunicación bidireccional persistente entre tu aplicación y un agente, lo que va más allá de los patrones convencionales de solicitud y respuesta. En este documento, se explica cómo desarrollar, probar e implementar agentes de transmisión bidireccional para casos de uso en tiempo real, como la interacción de audio o video.
Descripción general
La transmisión bidireccional proporciona un canal de comunicación bidireccional persistente entre tu aplicación y el agente, lo que te permite ir más allá de los patrones de solicitud y respuesta basados en turnos. La transmisión bidireccional funciona para casos de uso en los que tu agente necesita procesar información y responder de forma continua, como interactuar con entradas de audio o video con baja latencia.
La transmisión bidireccional con Agent Runtime admite casos de uso de agentes interactivos en tiempo real y el intercambio de datos para las APIs de Live multimodales. La transmisión bidireccional es compatible con todos los frameworks, y los métodos de transmisión bidireccional personalizados están disponibles a través del registro de métodos personalizados. Puedes usar la transmisión bidireccional para interactuar con la API de Gemini Live directamente o con el Kit de desarrollo de agentes (ADK) en Agent Platform.
El SDK de GenAI de Google admite bien la implementación de un agente remoto con métodos de consulta bidireccionales. Para implementar un agente con capacidad bidireccional, configura el modo de servidor del agente EXPERIMENTAL cuando uses el SDK o llames a la API de Agent Platform.
Desarrolla un agente
Mientras desarrollas un agente, sigue estos pasos para implementar la transmisión bidireccional:
Registra métodos personalizados (opcional)
Define un método de consulta de transmisión bidireccional
Para que tu agente sea "compatible con bidi", debes definir un método bidi_stream_query que tome de forma asíncrona las solicitudes de transmisión como entrada y genere respuestas de transmisión. Como ejemplo, la siguiente plantilla extiende la plantilla básica para transmitir solicitudes y respuestas, y se puede implementar en Agent Platform de Gemini Enterprise:
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Ten en cuenta lo siguiente cuando uses la API de transmisión bidireccional:
asyncio.Queue: Puedes colocar cualquier tipo de datos en esta cola de solicitudes para esperar a que se envíe a la API del modelo.Tiempo de espera máximo: El tiempo de espera máximo para la consulta de transmisión bidireccional es de 10 minutos. Si tu agente requiere tiempos de procesamiento más largos, considera dividir la tarea en partes más pequeñas y usar la sesión o la memoria para mantener el estado persistente.
Limita el consumo de contenido: Cuando consumes contenido de una transmisión bidireccional, es importante administrar la velocidad a la que tu agente procesa los datos entrantes. Si tu agente consume datos demasiado lento, puede generar problemas como el aumento de la latencia o la presión de memoria en el servidor. Implementa mecanismos para extraer datos de forma activa cuando tu agente esté listo para procesarlos y evita bloquear operaciones que puedan detener el consumo de contenido.
Limita la generación de contenido: Si tienes problemas de contrapresión (en los que el productor genera datos más rápido de lo que el consumidor puede procesarlos), debes limitar la tasa de generación de contenido. Esto puede ayudar a evitar desbordamientos de búfer y garantizar una experiencia de transmisión fluida.
Prueba el método de consulta de transmisión bidireccional
Puedes probar la consulta de transmisión bidireccional de forma local llamando al método bidi_stream_query y iterando a través de los resultados:
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
La misma conexión de consulta bidireccional puede controlar varias solicitudes y respuestas. Para cada solicitud nueva de la cola, el siguiente ejemplo genera una transmisión de fragmentos que contiene información diferente sobre la respuesta:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
Opcional: Registra métodos personalizados
Las operaciones se pueden registrar como modos de ejecución estándar (representados por una cadena vacía ""), de transmisión (stream) o de transmisión bidireccional (bidi_stream).
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
Implementa un agente
Una vez que desarrolles tu agente como live_agent, puedes implementarlo en Agent Platform creando una instancia de Agent Platform.
Ten en cuenta que, con el SDK de GenAI, todas las configuraciones de implementación (paquetes adicionales y controles de recursos personalizados) se asignan como un valor de config cuando se crea la instancia de Agent Platform.
Inicializa el cliente de GenAI:
import vertexai
from vertexai import types as vertexai_types
client = vertexai.Client(project=PROJECT, location=LOCATION)
Implementa el agente en Agent Platform. Ten en cuenta que se requiere el agent_server_mode EXPERIMENTAL para un agente que admita la transmisión bidireccional:
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
"agent_server_mode": vertexai_types.AgentServerMode.EXPERIMENTAL,
},
)
Para obtener información sobre los pasos que se realizan en segundo plano durante la implementación, consulta Crea una instancia de Agent Runtime.
Obtén el ID de recurso del agente:
remote_live_agent.api_resource.name
Usa un agente
Si definiste una operación bidi_stream_query cuando desarrollaste tu agente, puedes consultar el agente de transmisión bidireccional de forma asíncrona con el SDK de GenAI para Python.
Puedes modificar el siguiente ejemplo con cualquier dato que tu agente pueda reconocer, usando cualquier lógica de finalización aplicable para la transmisión de entrada y la transmisión de salida:
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
Agent Runtime transmite respuestas como una secuencia de objetos generados de forma iterativa. Por ejemplo, un conjunto de dos respuestas en el primer turno podría verse de la siguiente manera:
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Usa un agente del Kit de desarrollo de agentes
Si desarrollaste tu agente con el Kit de desarrollo de agentes (ADK), puedes usar la transmisión bidireccional para interactuar con la API de Gemini Live.
En el siguiente ejemplo, se crea un agente de conversación que toma preguntas de texto del usuario y recibe datos de audio de respuesta de la API de Gemini Live:
import numpy as np
from google.adk.agents.live_request_queue import LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text == "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))