WebSocket を使用して Live API を使ってみる

このチュートリアルでは、WebSocket を使用して Live API に接続する方法について説明します。このチュートリアルでは、WebSocket で Live API を使用する Google Cloud プロジェクトを設定し、音声ファイルをモデルに送信して、レスポンスとして音声を受信します。

WebSocket の詳細については、WebSocket API のドキュメントをご覧ください。

始める前に

リクエストを送信する前に、Vertex AI で認証を設定する必要があります。認証は、API キーまたはアプリケーションのデフォルト認証情報(ADC)を使用して設定できます。

このチュートリアルでは、API キーを使用するのが最も簡単な方法です。

代わりに ADC を使用して認証を設定する手順については、クイックスタートをご覧ください。

WebSockets ライブラリをインストールする

以下を実行して websockets ライブラリをインストールします。

pip install websockets

環境変数を設定する

プロジェクト ID とロケーションの環境変数を設定します。PROJECT_ID は、実際のGoogle Cloud プロジェクト ID に置き換えます。

export GOOGLE_CLOUD_PROJECT=PROJECT_ID
export GOOGLE_CLOUD_LOCATION=global

オーディオ セッションを開始する

次の例では、セッションを確立し、ファイルから音声をストリーミングして、レスポンスで受信した音声チャンクのサイズを出力します。

import asyncio
import websockets
import json
import base64
import os
import sys

# Replace the [PROJECT_ID] and [LOCATION] with your Google Cloud Project ID and location.
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
LOCATION = os.environ.get("GOOGLE_CLOUD_LOCATION")

# Authentication
token_list = !gcloud auth application-default print-access-token
ACCESS_TOKEN = token_list[0]

# Configuration
MODEL_ID = "gemini-live-2.5-flash-preview-native-audio-09-2025"

# Construct the WSS URL
HOST = f"{LOCATION}-aiplatform.googleapis.com"
path = "google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent"
URI = f"wss://{HOST}/ws/{path}"
MODEL_RESOURCE = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{MODEL_ID}"


async def main():
   headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
  
   async with websockets.connect(URI, additional_headers=headers) as ws:
       print("Session established.")

       # Send Setup (Handshake)
       await ws.send(json.dumps({
           "setup": {
               "model": MODEL_RESOURCE,
               "generation_config": { "response_modalities": ["AUDIO"] }
           }
       }))

       # Define Tasks
       async def send_audio():
           # Download sample if missing
           if not os.path.exists("input.wav"):
               !wget -q https://storage.googleapis.com/cloud-samples-data/generative-ai/audio/where_the_nearest_train_station_is.wav -O input.wav

           with open("input.wav", "rb") as f:
               while chunk := f.read(1024):
                   msg = {
                       "realtime_input": {
                           "media_chunks": [{
                               "mime_type": "audio/pcm;rate=16000",
                               "data": base64.b64encode(chunk).decode("utf-8")
                           }]
                       }
                   }
                   await ws.send(json.dumps(msg))
                   await asyncio.sleep(0.01)
           print("Done sending audio.")

       async def receive_audio():
           async for msg in ws:
               data = json.loads(msg)
               try:
                   parts = data["serverContent"]["modelTurn"]["parts"]
                   for part in parts:
                       if "inlineData" in part:
                           b64_audio = part["inlineData"]["data"]
                           print(f"Received chunk: {len(b64_audio)} bytes")
               except KeyError:
                   pass

               if data.get("serverContent", {}).get("turnComplete"):
                   print("Turn complete. Exiting.")
                   break

       # Run Together
       await asyncio.gather(send_audio(), receive_audio())

if __name__ == "__main__":
   # Check if running in Jupyter/Colab
   if "ipykernel" in sys.modules:
       await main()
   else:
       asyncio.run(main())

次のステップ