Streaming bidirecional

A transmissão bidirecional no Agent Runtime permite uma comunicação persistente e bidirecional entre seu aplicativo e um agente, indo além dos padrões convencionais de solicitação-resposta. Este documento explica como desenvolver, testar e implantar agentes de streaming bidirecional para casos de uso em tempo real, como interação de áudio ou vídeo.

Visão geral

O streaming bidirecional oferece um canal de comunicação persistente e bidirecional entre seu aplicativo e o agente, permitindo que você vá além dos padrões de solicitação-resposta baseados em turnos. O streaming bidirecional funciona para casos de uso em que o agente precisa processar informações e responder continuamente, como interagir com entradas de áudio ou vídeo com baixa latência.

O streaming bidirecional com o Agent Runtime oferece suporte a casos de uso de agentes interativos e em tempo real, além de troca de dados para APIs multimodais dinâmicas. O streaming bidirecional é compatível com todos os frameworks, e os métodos personalizados de streaming bidirecional estão disponíveis registrando métodos personalizados. É possível usar o streaming bidirecional para interagir com a API Gemini Live diretamente ou usando o Kit de Desenvolvimento de Agente (ADK) na plataforma de agentes.

A implantação de um agente remoto com métodos de consulta bidirecional é bem compatível com o SDK da IA generativa do Google. Para implantar um agente bidirecional, defina o modo de servidor do agente EXPERIMENTAL ao usar o SDK ou chamar a API da Plataforma de Agentes.

Desenvolver um agente

Ao desenvolver um agente, siga estas etapas para implementar o streaming bidirecional:

Definir um método de consulta de streaming bidirecional

Para tornar seu agente compatível com bidi, defina um método bidi_stream_query que receba solicitações de stream de forma assíncrona como entrada e gere respostas de streaming. Por exemplo, o modelo a seguir estende o modelo básico para transmitir solicitações e respostas e pode ser implantado na plataforma de agentes do Gemini Enterprise:

import asyncio
from typing import Any, AsyncIterable

class BidiStreamingAgent(StreamingAgent):

    async def bidi_stream_query(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        from langchain.load.dump import dumpd

        while True:
            request = await request_queue.get()
            # This is just an illustration, you're free to use any termination mechanism.
            if request == "END":
                break
            for chunk in self.graph.stream(request):
                yield dumpd(chunk)

agent = BidiStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Tenha em mente o seguinte ao usar a API de streaming bidirecional:

  • asyncio.Queue: você pode colocar qualquer tipo de dados nessa fila de solicitações para aguardar o envio à API do modelo.

  • Tempo limite máximo: o tempo limite máximo para consultas de streaming bidirecional é de 10 minutos. Se o agente precisar de mais tempo de processamento, divida a tarefa em partes menores e use sessão ou memória para manter o estado persistente.

  • Limitar o consumo de conteúdo: ao consumir conteúdo de um fluxo bidirecional, é importante gerenciar a taxa em que o agente processa os dados recebidos. Se o agente consumir dados muito lentamente, isso poderá causar problemas como aumento da latência ou pressão de memória no lado do servidor. Implemente mecanismos para extrair dados ativamente quando o agente estiver pronto para processá-los e evite bloquear operações que possam interromper o consumo de conteúdo.

  • Limitar a geração de conteúdo: se você encontrar problemas de contrapressão (em que o produtor gera dados mais rápido do que o consumidor consegue processar), limite a taxa de geração de conteúdo. Isso pode ajudar a evitar estouros de buffer e garantir uma experiência de streaming tranquila.

Testar o método de consulta de streaming bidirecional

Teste a consulta de streaming bidirecional localmente chamando o método bidi_stream_query e iterando os resultados:

import asyncio
import pprint
import time

request_queue = asyncio.Queue()

async def generate_input():
    # This is just an illustration, you're free to use any appropriate input generator.
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Swedish currency"}
    )
    time.sleep(5)
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Euro currency"}
    )
    time.sleep(5)
    request_queue.put_nowait("END")

async def print_query_result():
    async for chunk in agent.bidi_stream_query(request_queue):
        pprint.pprint(chunk, depth=1)

input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())

await asyncio.gather(input_task, output_task, return_exceptions=True)

A mesma conexão de consulta bidirecional pode processar várias solicitações e respostas. Para cada nova solicitação da fila, o exemplo a seguir gera um fluxo de partes que contêm informações diferentes sobre a resposta:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}

Opcional: registrar métodos personalizados

As operações podem ser registradas como modos de execução padrão (representados por uma string vazia ""), streaming (stream) ou streaming bidirecional (bidi_stream).

from typing import AsyncIterable, Iterable

class CustomAgent(BidiStreamingAgent):

    # ... same get_state and get_state_history function definition.

    async def get_state_bidi_mode(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        while True:
            request = await request_queue.get()
            if request == "END":
                break
            yield self.graph.get_state(request)._asdict()

    def register_operations(self):
        return {
            # The list of synchrounous operations to be registered
            "": ["query", "get_state"]
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"]
            # The list of bidi streaming operations to be registered
            "bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
        }

Implantar um agente

Depois de desenvolver o agente como live_agent, crie uma instância do Agent Platform para implantá-lo.

Com o SDK da GenAI, todas as configurações de implantação (pacotes adicionais e controles de recursos personalizados) são atribuídas como um valor de config ao criar a instância da plataforma de agente.

Inicialize o cliente da IA generativa:

import vertexai
from vertexai import types as vertexai_types

client = vertexai.Client(project=PROJECT, location=LOCATION)

Implante o agente na plataforma de agentes. O EXPERIMENTAL agent_server_mode é obrigatório para um agente que oferece suporte a streaming bidirecional:

remote_live_agent = client.agent_engines.create(
    agent=live_agent,
    config={
        "staging_bucket": STAGING_BUCKET,
        "requirements": [
            "google-cloud-aiplatform[agent_engines,adk]==1.88.0",
            "cloudpickle==3.0",
            "websockets"
        ],
        "agent_server_mode": vertexai_types.AgentServerMode.EXPERIMENTAL,
    },
)

Para informações sobre as etapas que acontecem em segundo plano durante a implantação, consulte Criar uma instância do ambiente de execução do agente.

Consiga o ID do recurso do agente:

remote_live_agent.api_resource.name

Usar um agente

Se você definiu uma operação bidi_stream_query ao desenvolver seu agente, é possível transmitir consultas bidirecionais de forma assíncrona usando o SDK da IA generativa para Python.

Você pode modificar o exemplo a seguir com qualquer dado reconhecido pelo seu agente, usando qualquer lógica de encerramento aplicável para fluxo de entrada e fluxo de saída:

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={"class_method": "bidi_stream_query"}
        ) as connection:
    while True:
        #
        input_str = input("Enter your question: ")
        if input_str == "exit":
            break
        await connection.send({"input": input_str})

        while True:
            response = await connection.receive()
            print(response)
            if response["bidiStreamOutput"]["output"] == "end of turn":
                break

O Agent Runtime transmite respostas como uma sequência de objetos gerados de forma iterativa. Por exemplo, um conjunto de duas respostas na primeira vez pode ser assim:

Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}

Enter your next question: exit

Usar um agente do Kit de Desenvolvimento de Agente

Se você desenvolveu seu agente usando o Kit de Desenvolvimento de Agente (ADK), é possível usar o streaming bidirecional para interagir com a API Gemini Live.

O exemplo a seguir cria um agente de conversa que recebe perguntas de texto do usuário e dados de áudio de resposta da API Gemini Live:

import numpy as np
from google.adk.agents.live_request_queue import LiveRequest
from google.adk.events import Event
from google.genai import types

def prepare_live_request(input_text: str) -> LiveRequest:
    part = types.Part.from_text(text=input_text)
    content = types.Content(parts=[part])
    return LiveRequest(content=content)

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={
            "class_method": "bidi_stream_query",
            "input": {"input_str": "hello"},
        }) as connection:
    first_req = True
    while True:
        input_text = input("Enter your question: ")
        if input_text == "exit":
            break
        if first_req:
            await connection.send({
                "user_id": USER_ID,
                "live_request": prepare_live_request(input_text).dict()
            })
            first_req = False
        else:
            await connection.send(prepare_live_request(input_text).dict())
        audio_data = []
        while True:
            async def receive():
                return await connection.receive()

            receiving = asyncio.Task(receive())
            done, _ = await asyncio.wait([receiving])
            if receiving not in done:
                receiving.cancel()
                break
            event = Event.model_validate(receiving.result()["bidiStreamOutput"])
            part = event.content and event.content.parts and event.content.parts[0]

            if part.inline_data and part.inline_data.data:
                chunk_data = part.inline_data.data
                data = np.frombuffer(chunk_data, dtype=np.int16)
                audio_data.append(data)
            else:
                print(part)
        if audio_data:
            concatenated_audio = np.concatenate(audio_data)
            display(Audio(concatenated_audio, rate=24000, autoplay=True))

A seguir