Streaming dua arah

Streaming dua arah di Agent Runtime memungkinkan komunikasi dua arah yang persisten antara aplikasi Anda dan agen, sehingga melampaui pola permintaan-respons konvensional. Dokumen ini menjelaskan cara mengembangkan, menguji, dan men-deploy agen streaming dua arah untuk kasus penggunaan real-time seperti interaksi audio atau video.

Ringkasan

Streaming dua arah menyediakan saluran komunikasi dua arah yang persisten antara aplikasi Anda dan agen, sehingga Anda dapat melampaui pola permintaan-respons berbasis giliran. Streaming dua arah berfungsi untuk kasus penggunaan saat agen Anda perlu memproses informasi dan merespons secara terus-menerus, seperti berinteraksi dengan input audio atau video dengan latensi rendah.

Streaming dua arah dengan Agent Runtime mendukung kasus penggunaan agen interaktif real-time dan pertukaran data untuk Live API multimodal. Streaming dua arah didukung untuk semua framework, dan metode streaming dua arah kustom tersedia melalui pendaftaran metode kustom. Anda dapat menggunakan streaming dua arah untuk berinteraksi dengan Gemini Live API secara langsung atau menggunakan Agent Development Kit (ADK) di Agent Platform.

Deployment agen jarak jauh dengan metode kueri dua arah didukung dengan baik oleh Google GenAI SDK. Untuk men-deploy agen yang mendukung dua arah, tetapkan mode server agen EXPERIMENTAL saat menggunakan SDK atau memanggil Agent Platform API.

Mengembangkan agen

Saat mengembangkan agen, gunakan langkah-langkah berikut untuk menerapkan streaming dua arah:

Menentukan metode kueri streaming dua arah

Untuk membuat agen Anda "mendukung dua arah", Anda harus menentukan metode bidi_stream_query yang secara asinkron mengambil permintaan streaming sebagai input dan menampilkan respons streaming. Sebagai contoh, template berikut memperluas template dasar untuk melakukan streaming permintaan dan respons serta dapat di-deploy di Gemini Enterprise Agent Platform:

import asyncio
from typing import Any, AsyncIterable

class BidiStreamingAgent(StreamingAgent):

    async def bidi_stream_query(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        from langchain.load.dump import dumpd

        while True:
            request = await request_queue.get()
            # This is just an illustration, you're free to use any termination mechanism.
            if request == "END":
                break
            for chunk in self.graph.stream(request):
                yield dumpd(chunk)

agent = BidiStreamingAgent(
    model=model,                # Required.
    tools=[get_exchange_rate],  # Optional.
    project="PROJECT_ID",
    location="LOCATION",
)
agent.set_up()

Perhatikan hal-hal berikut saat menggunakan API streaming dua arah:

  • asyncio.Queue: Anda dapat menempatkan jenis data apa pun dalam antrean permintaan ini untuk menunggu dikirim ke API model.

  • Waktu tunggu maksimum: Waktu tunggu maksimum untuk kueri streaming dua arah adalah 10 menit. Jika agen Anda memerlukan waktu pemrosesan yang lebih lama, pertimbangkan untuk membagi tugas menjadi bagian yang lebih kecil dan gunakan sesi atau memori untuk mempertahankan status.

  • Membatasi penggunaan konten: Saat menggunakan konten dari streaming dua arah, penting untuk mengelola kecepatan agen Anda memproses data yang masuk. Jika agen Anda menggunakan data terlalu lambat, hal ini dapat menyebabkan masalah seperti peningkatan latensi atau tekanan memori di sisi server. Terapkan mekanisme untuk secara aktif menarik data saat agen Anda siap memprosesnya, dan hindari operasi pemblokiran yang dapat menghentikan penggunaan konten.

  • Membatasi pembuatan konten: Jika Anda mengalami masalah tekanan balik (saat produsen membuat data lebih cepat daripada yang dapat diproses konsumen), Anda harus membatasi kecepatan pembuatan konten. Hal ini dapat membantu mencegah overflow buffer dan memastikan pengalaman streaming yang lancar.

Menguji metode kueri streaming dua arah

Anda dapat menguji kueri streaming dua arah secara lokal dengan memanggil metode bidi_stream_query dan melakukan iterasi melalui hasilnya:

import asyncio
import pprint
import time

request_queue = asyncio.Queue()

async def generate_input():
    # This is just an illustration, you're free to use any appropriate input generator.
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Swedish currency"}
    )
    time.sleep(5)
    request_queue.put_nowait(
        {"input": "What is the exchange rate from US dolloars to Euro currency"}
    )
    time.sleep(5)
    request_queue.put_nowait("END")

async def print_query_result():
    async for chunk in agent.bidi_stream_query(request_queue):
        pprint.pprint(chunk, depth=1)

input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())

await asyncio.gather(input_task, output_task, return_exceptions=True)

Koneksi kueri dua arah yang sama dapat menangani beberapa permintaan dan respons. Untuk setiap permintaan baru dari antrean, contoh berikut akan membuat streaming potongan yang berisi informasi berbeda tentang respons:

{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}

Opsional: Mendaftarkan metode kustom

Operasi dapat didaftarkan sebagai mode eksekusi standar (diwakili oleh string kosong ""), streaming (stream), atau streaming dua arah (bidi_stream).

from typing import AsyncIterable, Iterable

class CustomAgent(BidiStreamingAgent):

    # ... same get_state and get_state_history function definition.

    async def get_state_bidi_mode(
        self,
        request_queue: asyncio.Queue[Any]
    ) -> AsyncIterable[Any]:
        while True:
            request = await request_queue.get()
            if request == "END":
                break
            yield self.graph.get_state(request)._asdict()

    def register_operations(self):
        return {
            # The list of synchrounous operations to be registered
            "": ["query", "get_state"]
            # The list of streaming operations to be registered
            "stream": ["stream_query", "get_state_history"]
            # The list of bidi streaming operations to be registered
            "bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
        }

Men-deploy agen

Setelah mengembangkan agen sebagai live_agent, Anda dapat men-deploy agen ke Agent Platform dengan membuat instance Agent Platform.

Perhatikan bahwa dengan GenAI SDK, semua konfigurasi deployment (paket tambahan dan kontrol resource yang disesuaikan) ditetapkan sebagai nilai config saat membuat instance Agent Platform.

Menginisialisasi klien GenAI:

import vertexai
from vertexai import types as vertexai_types

client = vertexai.Client(project=PROJECT, location=LOCATION)

Men-deploy agen ke Agent Platform. Perhatikan bahwa agent_server_mode EXPERIMENTAL diperlukan untuk agen yang mendukung streaming dua arah:

remote_live_agent = client.agent_engines.create(
    agent=live_agent,
    config={
        "staging_bucket": STAGING_BUCKET,
        "requirements": [
            "google-cloud-aiplatform[agent_engines,adk]==1.88.0",
            "cloudpickle==3.0",
            "websockets"
        ],
        "agent_server_mode": vertexai_types.AgentServerMode.EXPERIMENTAL,
    },
)

Untuk mengetahui informasi tentang langkah-langkah yang terjadi di latar belakang selama deployment, lihat Membuat instance Agent Runtime.

Mendapatkan ID resource agen:

remote_live_agent.api_resource.name

Menggunakan agen

Jika Anda menentukan operasi bidi_stream_query saat mengembangkan agen, Anda dapat melakukan kueri streaming dua arah secara asinkron menggunakan GenAI SDK for Python.

Anda dapat mengubah contoh berikut dengan data apa pun yang dapat dikenali oleh agen Anda, menggunakan logika penghentian yang berlaku untuk streaming input dan streaming output:

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={"class_method": "bidi_stream_query"}
        ) as connection:
    while True:
        #
        input_str = input("Enter your question: ")
        if input_str == "exit":
            break
        await connection.send({"input": input_str})

        while True:
            response = await connection.receive()
            print(response)
            if response["bidiStreamOutput"]["output"] == "end of turn":
                break

Agent Runtime melakukan streaming respons sebagai urutan objek yang dibuat secara berulang. Misalnya, kumpulan dua respons pada giliran pertama mungkin terlihat seperti berikut:

Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}

Enter your next question: exit

Menggunakan agen Agent Development Kit

Jika Anda mengembangkan agen menggunakan Agent Development Kit (ADK), Anda dapat menggunakan streaming dua arah untuk berinteraksi dengan Gemini Live API.

Contoh berikut membuat agen percakapan yang mengambil pertanyaan teks pengguna dan menerima data audio respons Gemini Live API:

import numpy as np
from google.adk.agents.live_request_queue import LiveRequest
from google.adk.events import Event
from google.genai import types

def prepare_live_request(input_text: str) -> LiveRequest:
    part = types.Part.from_text(text=input_text)
    content = types.Content(parts=[part])
    return LiveRequest(content=content)

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={
            "class_method": "bidi_stream_query",
            "input": {"input_str": "hello"},
        }) as connection:
    first_req = True
    while True:
        input_text = input("Enter your question: ")
        if input_text == "exit":
            break
        if first_req:
            await connection.send({
                "user_id": USER_ID,
                "live_request": prepare_live_request(input_text).dict()
            })
            first_req = False
        else:
            await connection.send(prepare_live_request(input_text).dict())
        audio_data = []
        while True:
            async def receive():
                return await connection.receive()

            receiving = asyncio.Task(receive())
            done, _ = await asyncio.wait([receiving])
            if receiving not in done:
                receiving.cancel()
                break
            event = Event.model_validate(receiving.result()["bidiStreamOutput"])
            part = event.content and event.content.parts and event.content.parts[0]

            if part.inline_data and part.inline_data.data:
                chunk_data = part.inline_data.data
                data = np.frombuffer(chunk_data, dtype=np.int16)
                audio_data.append(data)
            else:
                print(part)
        if audio_data:
            concatenated_audio = np.concatenate(audio_data)
            display(Audio(concatenated_audio, rate=24000, autoplay=True))

Langkah berikutnya