開始使用 WebSocket 搭配 Live API

本教學課程說明如何使用 WebSocket 連線至 Live API。在本教學課程中,您將設定 Google Cloud 專案,透過 WebSocket 使用 Live API、將音訊檔案傳送至模型,並接收音訊回覆。

如要進一步瞭解 WebSocket,請參閱 WebSocket API 說明文件

事前準備

您必須先透過 Vertex AI 設定驗證,才能傳送要求。您可以選擇使用 API 金鑰或應用程式預設憑證 (ADC) 設定驗證。

在本教學課程中,最快速的入門方式是使用 API 金鑰:

如要瞭解如何改用 ADC 設定驗證,請參閱快速入門

安裝 WebSockets 程式庫

執行下列指令來安裝 websockets 程式庫:

pip install websockets

設定環境變數

設定專案 ID 和位置的環境變數。將 PROJECT_ID 替換為專案 ID。Google Cloud

export GOOGLE_CLOUD_PROJECT=PROJECT_ID
export GOOGLE_CLOUD_LOCATION=global

發起語音通話

以下範例會建立工作階段、從檔案串流音訊,並列印回覆中收到的音訊區塊大小。

import asyncio
import websockets
import json
import base64
import os
import sys

# Replace the [PROJECT_ID] and [LOCATION] with your Google Cloud Project ID and location.
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
LOCATION = os.environ.get("GOOGLE_CLOUD_LOCATION")

# Authentication
token_list = !gcloud auth application-default print-access-token
ACCESS_TOKEN = token_list[0]

# Configuration
MODEL_ID = "gemini-live-2.5-flash-preview-native-audio-09-2025"

# Construct the WSS URL
HOST = f"{LOCATION}-aiplatform.googleapis.com"
path = "google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent"
URI = f"wss://{HOST}/ws/{path}"
MODEL_RESOURCE = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{MODEL_ID}"


async def main():
   headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
  
   async with websockets.connect(URI, additional_headers=headers) as ws:
       print("Session established.")

       # Send Setup (Handshake)
       await ws.send(json.dumps({
           "setup": {
               "model": MODEL_RESOURCE,
               "generation_config": { "response_modalities": ["AUDIO"] }
           }
       }))

       # Define Tasks
       async def send_audio():
           # Download sample if missing
           if not os.path.exists("input.wav"):
               !wget -q https://storage.googleapis.com/cloud-samples-data/generative-ai/audio/where_the_nearest_train_station_is.wav -O input.wav

           with open("input.wav", "rb") as f:
               while chunk := f.read(1024):
                   msg = {
                       "realtime_input": {
                           "media_chunks": [{
                               "mime_type": "audio/pcm;rate=16000",
                               "data": base64.b64encode(chunk).decode("utf-8")
                           }]
                       }
                   }
                   await ws.send(json.dumps(msg))
                   await asyncio.sleep(0.01)
           print("Done sending audio.")

       async def receive_audio():
           async for msg in ws:
               data = json.loads(msg)
               try:
                   parts = data["serverContent"]["modelTurn"]["parts"]
                   for part in parts:
                       if "inlineData" in part:
                           b64_audio = part["inlineData"]["data"]
                           print(f"Received chunk: {len(b64_audio)} bytes")
               except KeyError:
                   pass

               if data.get("serverContent", {}).get("turnComplete"):
                   print("Turn complete. Exiting.")
                   break

       # Run Together
       await asyncio.gather(send_audio(), receive_audio())

if __name__ == "__main__":
   # Check if running in Jupyter/Colab
   if "ipykernel" in sys.modules:
       await main()
   else:
       asyncio.run(main())

後續步驟