Agent Runtime の双方向ストリーミングにより、アプリケーションとエージェント間の永続的な双方向通信が可能になり、従来のリクエスト / レスポンス パターンを超えた通信が可能になります。このドキュメントでは、音声や動画のインタラクションなどのリアルタイム ユースケース向けに、双方向ストリーミング エージェントを開発、テスト、デプロイする方法について説明します。
概要
双方向ストリーミングは、アプリケーションとエージェント間の永続的な双方向通信チャネルを提供し、ターンベースのリクエスト / レスポンス パターンを超えた通信を可能にします。双方向ストリーミングは、エージェントが情報を処理して継続的に応答する必要があるユースケース(低レイテンシで音声入力や動画入力とやり取りするなど)に適しています。
Agent Runtime の双方向ストリーミングは、マルチモーダル ライブ API のインタラクティブなリアルタイム エージェントのユースケースとデータ交換をサポートしています。双方向 ストリーミングはすべてのフレームワークでサポートされており、カスタム双方向ストリーミング メソッドはカスタム メソッドを登録することで使用できます。 双方向ストリーミングを使用すると、 Gemini Live API と直接やり取りしたり、Agent Development Kit(ADK)を Agent Platform で使用してやり取りしたりできます。
双方向クエリメソッドを使用したリモート エージェントのデプロイは、Google GenAI SDKで十分にサポートされています。双方向対応のエージェントをデプロイするには、SDK を使用するか Agent Platform API
を呼び出すときに、EXPERIMENTAL エージェント サーバーモードを設定します。
エージェントを開発する
エージェントの開発中に双方向ストリーミングを実装する手順は次のとおりです。
双方向ストリーミング クエリメソッドを定義する
エージェントを「双方向対応」にするには、ストリーム リクエストを非同期で入力として受け取り、ストリーミング レスポンスを出力する bidi_stream_query
メソッドを定義する必要があります。たとえば、次のテンプレートは、リクエストとレスポンスをストリーミングするように基本テンプレートを拡張したもので、Gemini Enterprise Agent Platform にデプロイできます。
import asyncio
from typing import Any, AsyncIterable
class BidiStreamingAgent(StreamingAgent):
async def bidi_stream_query(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
from langchain.load.dump import dumpd
while True:
request = await request_queue.get()
# This is just an illustration, you're free to use any termination mechanism.
if request == "END":
break
for chunk in self.graph.stream(request):
yield dumpd(chunk)
agent = BidiStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
双方向ストリーミング API を使用する場合は、次の点に注意してください。
asyncio.Queue: このリクエスト キューには、モデル API に送信されるまで待機する任意のデータ型を配置できます。最大タイムアウト: 双方向ストリーミング クエリの最大タイムアウトは 10 分です。エージェントで処理時間が長くなる場合は、タスクを小さなチャンクに分割し、セッションまたはメモリを使用して状態を保持することを検討してください。
コンテンツ消費をスロットリングする: 双方向ストリームからコンテンツを消費する場合は、エージェントが受信データを処理するレートを管理することが重要です。エージェントによるデータの消費が遅すぎると、レイテンシの増加やサーバー側のメモリ負荷の増加などの問題が発生する可能性があります。エージェントがデータを処理する準備ができたら、データを積極的に pull するメカニズムを実装し、コンテンツの消費を停止させる可能性のあるオペレーションのブロックを回避します。
コンテンツ生成をスロットリングする: バックプレッシャーの問題(消費者が処理できるよりも速い速度でプロデューサーがデータを生成する)が発生した場合は、コンテンツ生成率をスロットリングする必要があります。これにより、バッファ オーバーフローを防ぎ、スムーズなストリーミングを実現できます。
双方向ストリーミング クエリメソッドをテストする
bidi_stream_query メソッドを呼び出して結果を反復処理することで、双方向ストリーミング クエリをローカルでテストできます。
import asyncio
import pprint
import time
request_queue = asyncio.Queue()
async def generate_input():
# This is just an illustration, you're free to use any appropriate input generator.
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Swedish currency"}
)
time.sleep(5)
request_queue.put_nowait(
{"input": "What is the exchange rate from US dolloars to Euro currency"}
)
time.sleep(5)
request_queue.put_nowait("END")
async def print_query_result():
async for chunk in agent.bidi_stream_query(request_queue):
pprint.pprint(chunk, depth=1)
input_task = asyncio.create_task(generate_input())
output_task = asyncio.create_task(print_query_result())
await asyncio.gather(input_task, output_task, return_exceptions=True)
同じ双方向クエリ接続で複数のリクエストとレスポンスを処理できます。次の例では、キューからの新しいリクエストごとに、レスポンスに関するさまざまな情報を含むチャンクのストリームを生成します。
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to 10.5751 SEK. \n'}
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...], 'output': 'The exchange rate from US dollars to Euro currency is 1 USD to 0.86 EUR. \n'}
省略可: カスタム メソッドを登録する
オペレーションは、標準(空の文字列 "" で表される)、ストリーミング(stream)、双方向ストリーミング(bidi_stream)のいずれかの実行モードとして登録できます。
from typing import AsyncIterable, Iterable
class CustomAgent(BidiStreamingAgent):
# ... same get_state and get_state_history function definition.
async def get_state_bidi_mode(
self,
request_queue: asyncio.Queue[Any]
) -> AsyncIterable[Any]:
while True:
request = await request_queue.get()
if request == "END":
break
yield self.graph.get_state(request)._asdict()
def register_operations(self):
return {
# The list of synchrounous operations to be registered
"": ["query", "get_state"]
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"]
# The list of bidi streaming operations to be registered
"bidi_stream": ["bidi_stream_query", "get_state_bidi_mode"]
}
エージェントをデプロイする
エージェントを live_agent として開発したら、Agent Platform インスタンスを作成してエージェントを Agent Platform にデプロイできます。
GenAI SDK では、Agent Platform インスタンスの作成時に、すべてのデプロイ構成(追加パッケージとカスタマイズされたリソース制御)が config の値として割り当てられます。
GenAI クライアントを初期化します。
import vertexai
from vertexai import types as vertexai_types
client = vertexai.Client(project=PROJECT, location=LOCATION)
エージェントを Agent Platform にデプロイします。双方向ストリーミングをサポートするエージェントには、EXPERIMENTAL
agent_server_mode が必要です。
remote_live_agent = client.agent_engines.create(
agent=live_agent,
config={
"staging_bucket": STAGING_BUCKET,
"requirements": [
"google-cloud-aiplatform[agent_engines,adk]==1.88.0",
"cloudpickle==3.0",
"websockets"
],
"agent_server_mode": vertexai_types.AgentServerMode.EXPERIMENTAL,
},
)
デプロイ中にバックグラウンドで実行される手順については、Agent Runtime インスタンスを作成するをご覧ください。
エージェントのリソース ID を取得します。
remote_live_agent.api_resource.name
エージェントを使用する
エージェントの開発時に bidi_stream_query オペレーションを定義した場合は、GenAI SDK for Python を使用してエージェントに非同期で双方向ストリーミング クエリを実行できます。
次の例は、エージェントが認識できる任意のデータで変更できます。入力ストリームと出力ストリームには、適用可能な終了ロジックを使用します。
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={"class_method": "bidi_stream_query"}
) as connection:
while True:
#
input_str = input("Enter your question: ")
if input_str == "exit":
break
await connection.send({"input": input_str})
while True:
response = await connection.receive()
print(response)
if response["bidiStreamOutput"]["output"] == "end of turn":
break
Agent Runtime は、反復的に生成されたオブジェクトのシーケンスとしてレスポンスをストリーミングします。たとえば、最初のターンの 2 つのレスポンスのセットは次のようになります。
Enter your next question: Weather in San Diego?
{'bidiStreamOutput': {'output': "FunctionCall: {'name': 'get_current_weather', 'args': {'location': 'San Diego'}}\n"}}
{'bidiStreamOutput': {'output': 'end of turn'}}
Enter your next question: exit
Agent Development Kit エージェントを使用する
Agent Development Kit(ADK)を使用してエージェントを開発した場合は、双方向ストリーミングを使用して Gemini Live API とやり取りできます。
次の例では、ユーザーのテキストによる質問を受け取り、Gemini Live API レスポンスの音声データを受け取る会話エージェントを作成します。
import numpy as np
from google.adk.agents.live_request_queue import LiveRequest
from google.adk.events import Event
from google.genai import types
def prepare_live_request(input_text: str) -> LiveRequest:
part = types.Part.from_text(text=input_text)
content = types.Content(parts=[part])
return LiveRequest(content=content)
async with client.aio.live.agent_engines.connect(
agent_engine=remote_live_agent.api_resource.name,
config={
"class_method": "bidi_stream_query",
"input": {"input_str": "hello"},
}) as connection:
first_req = True
while True:
input_text = input("Enter your question: ")
if input_text == "exit":
break
if first_req:
await connection.send({
"user_id": USER_ID,
"live_request": prepare_live_request(input_text).dict()
})
first_req = False
else:
await connection.send(prepare_live_request(input_text).dict())
audio_data = []
while True:
async def receive():
return await connection.receive()
receiving = asyncio.Task(receive())
done, _ = await asyncio.wait([receiving])
if receiving not in done:
receiving.cancel()
break
event = Event.model_validate(receiving.result()["bidiStreamOutput"])
part = event.content and event.content.parts and event.content.parts[0]
if part.inline_data and part.inline_data.data:
chunk_data = part.inline_data.data
data = np.frombuffer(chunk_data, dtype=np.int16)
audio_data.append(data)
else:
print(part)
if audio_data:
concatenated_audio = np.concatenate(audio_data)
display(Audio(concatenated_audio, rate=24000, autoplay=True))