通过 WebSockets 开始使用 Live API

本教程介绍了如何使用 WebSocket 连接到 Live API。 您向模型发送音频文件,并收到响应的音频。如需详细了解 WebSockets,请参阅 WebSockets API 文档

准备工作

在开始之前,完成以下前提条件。

  1. 向 Vertex AI 进行身份验证。您可以使用 API 密钥或应用默认凭证 (ADC)。如需了解详情,请参阅身份验证方法

  2. 安装 websockets 库。

    pip install websockets
    
  3. 为项目 ID 和位置设置环境变量。将 PROJECT_ID 替换为您的Google Cloud 项目 ID。

    export GOOGLE_CLOUD_PROJECT=PROJECT_ID
    export GOOGLE_CLOUD_LOCATION=global
    

开始音频会话

以下示例建立会话、从文件流式传输音频,并打印响应中收到的音频块的大小。

import asyncio
import websockets
import json
import base64
import os
import sys

# Replace the [PROJECT_ID] and [LOCATION] with your Google Cloud Project ID and location.
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
LOCATION = os.environ.get("GOOGLE_CLOUD_LOCATION")

# Authentication
token_list = !gcloud auth application-default print-access-token
ACCESS_TOKEN = token_list[0]

# Configuration
MODEL_ID = "gemini-live-2.5-flash-preview-native-audio-09-2025"

# Construct the WSS URL
HOST = f"{LOCATION}-aiplatform.googleapis.com"
path = "google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent"
URI = f"wss://{HOST}/ws/{path}"
MODEL_RESOURCE = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{MODEL_ID}"


async def main():
   headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
  
   async with websockets.connect(URI, additional_headers=headers) as ws:
       print("Session established.")

       # Send Setup (Handshake)
       await ws.send(json.dumps({
           "setup": {
               "model": MODEL_RESOURCE,
               "generation_config": { "response_modalities": ["AUDIO"] }
           }
       }))

       # Define Tasks
       async def send_audio():
           # Download sample if missing
           if not os.path.exists("input.wav"):
               !wget -q https://storage.googleapis.com/cloud-samples-data/generative-ai/audio/where_the_nearest_train_station_is.wav -O input.wav

           with open("input.wav", "rb") as f:
               while chunk := f.read(1024):
                   msg = {
                       "realtime_input": {
                           "media_chunks": [{
                               "mime_type": "audio/pcm;rate=16000",
                               "data": base64.b64encode(chunk).decode("utf-8")
                           }]
                       }
                   }
                   await ws.send(json.dumps(msg))
                   await asyncio.sleep(0.01)
           print("Done sending audio.")

       async def receive_audio():
           async for msg in ws:
               data = json.loads(msg)
               try:
                   parts = data["serverContent"]["modelTurn"]["parts"]
                   for part in parts:
                       if "inlineData" in part:
                           b64_audio = part["inlineData"]["data"]
                           print(f"Received chunk: {len(b64_audio)} bytes")
               except KeyError:
                   pass

               if data.get("serverContent", {}).get("turnComplete"):
                   print("Turn complete. Exiting.")
                   break

       # Run Together
       await asyncio.gather(send_audio(), receive_audio())

if __name__ == "__main__":
   # Check if running in Jupyter/Colab
   if "ipykernel" in sys.modules:
       await main()
   else:
       asyncio.run(main())

后续步骤