本教程介绍了如何使用 WebSocket 连接到 Live API。 您向模型发送音频文件,并收到响应的音频。如需详细了解 WebSockets,请参阅 WebSockets API 文档。
准备工作
在开始之前,完成以下前提条件。
向 Vertex AI 进行身份验证。您可以使用 API 密钥或应用默认凭证 (ADC)。如需了解详情,请参阅身份验证方法。
安装
websockets库。pip install websockets为项目 ID 和位置设置环境变量。将
PROJECT_ID替换为您的Google Cloud 项目 ID。export GOOGLE_CLOUD_PROJECT=PROJECT_ID export GOOGLE_CLOUD_LOCATION=global
开始音频会话
以下示例建立会话、从文件流式传输音频,并打印响应中收到的音频块的大小。
import asyncio
import websockets
import json
import base64
import os
import sys
# Replace the [PROJECT_ID] and [LOCATION] with your Google Cloud Project ID and location.
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
LOCATION = os.environ.get("GOOGLE_CLOUD_LOCATION")
# Authentication
token_list = !gcloud auth application-default print-access-token
ACCESS_TOKEN = token_list[0]
# Configuration
MODEL_ID = "gemini-live-2.5-flash-preview-native-audio-09-2025"
# Construct the WSS URL
HOST = f"{LOCATION}-aiplatform.googleapis.com"
path = "google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent"
URI = f"wss://{HOST}/ws/{path}"
MODEL_RESOURCE = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{MODEL_ID}"
async def main():
headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
async with websockets.connect(URI, additional_headers=headers) as ws:
print("Session established.")
# Send Setup (Handshake)
await ws.send(json.dumps({
"setup": {
"model": MODEL_RESOURCE,
"generation_config": { "response_modalities": ["AUDIO"] }
}
}))
# Define Tasks
async def send_audio():
# Download sample if missing
if not os.path.exists("input.wav"):
!wget -q https://storage.googleapis.com/cloud-samples-data/generative-ai/audio/where_the_nearest_train_station_is.wav -O input.wav
with open("input.wav", "rb") as f:
while chunk := f.read(1024):
msg = {
"realtime_input": {
"media_chunks": [{
"mime_type": "audio/pcm;rate=16000",
"data": base64.b64encode(chunk).decode("utf-8")
}]
}
}
await ws.send(json.dumps(msg))
await asyncio.sleep(0.01)
print("Done sending audio.")
async def receive_audio():
async for msg in ws:
data = json.loads(msg)
try:
parts = data["serverContent"]["modelTurn"]["parts"]
for part in parts:
if "inlineData" in part:
b64_audio = part["inlineData"]["data"]
print(f"Received chunk: {len(b64_audio)} bytes")
except KeyError:
pass
if data.get("serverContent", {}).get("turnComplete"):
print("Turn complete. Exiting.")
break
# Run Together
await asyncio.gather(send_audio(), receive_audio())
if __name__ == "__main__":
# Check if running in Jupyter/Colab
if "ipykernel" in sys.modules:
await main()
else:
asyncio.run(main())