使用流式 API 构建多模态点餐体验

本指南为工程师提供了相关说明和最佳实践,帮助他们使用 FoodOrderingService.BidiProcessOrder RPC 方法打造外卖订购体验。 这种实时双向流式传输 API 是订餐 AI 智能体的核心,可在各种应用(例如移动应用、语音助理、汽车餐厅和自助服务终端)中实现动态的对话式订单接收。

BidiProcessOrder 概览

BidiProcessOrder 方法会在客户端应用与订餐 AI 代理之间建立持久的双向通信渠道。与标准的一元请求和响应 RPC 不同,这种流式传输方法允许:

  • 低延迟互动:持续交换信息,无需重复发送 HTTP 请求,从而减少开销。
  • 多模态输入:处理音频流(用于语音订购)、文本输入和客户端事件。
  • 实时响应:智能体可以在对话进行过程中发送音频、文本、订单更新和其他信号。

BidiProcessOrder 无法使用 REST 调用。集成必须使用面向连接的协议:

  • gRPC(推荐):为双向流式传输提供强大而高效的框架。
  • WebSocket:适用于因编程语言或网络限制而无法使用 gRPC 的客户端或环境。

如需了解详细的类型定义,请参阅 BidiProcessOrder API 参考文档。WebSocket 集成使用这些类型的 JSON 表示形式,如 WebSocket 部分中所述。

前提条件

在与 BidiProcessOrder 集成之前:

  1. 启用 API:确保已在您的 Google Cloud项目中启用 Food Ordering AI Agent API。 bash gcloud services enable foodorderingaiagent.googleapis.com --project=PROJECT_ID

  2. 身份验证:确定身份验证方法,并设置所有必需的服务账号和 IAM 角色,如身份验证中所述。

  3. 菜单提取:必须提取有效的菜单并将其与 Store 相关联。如需了解详情,请参阅集成菜单数据

身份验证

为了安全地连接到 BidiProcessOrder RPC,您的应用必须使用 Google Cloud 服务账号进行身份验证。

1. 配置服务账号

  • 创建服务账号:在您的 Google Cloud 项目中,创建一个服务账号,供您的应用用于向订餐 AI Agent API 进行身份验证。请参阅创建和管理服务账号
  • 授予 IAM 角色:向此服务账号授予必要的 IAM 角色。调用 BidiProcessOrder 所需的主要角色是:

    • Food Ordering Agent User (roles/foodorderingaiagent.agentUser):允许服务账号连接到订餐服务并处理会话。

    您可以使用 Google Cloud 控制台或 gcloud 授予此角色: bash gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_EMAIL" \ --role="roles/foodorderingaiagent.agentUser"

2. 应用身份验证流程

确切的身份验证流程取决于您的应用架构,尤其是客户端应用(例如移动应用、信息亭软件)是直接连接还是通过您自己的后端连接。

常见场景:对面向消费者的客户端应用进行身份验证

这是移动应用或 Web 应用的典型模式:

  1. Client-to-YourAuth::最终用户客户端应用(移动应用、Web 应用)通过现有的用户身份验证系统(可以是 Firebase Authentication、您自己的 OAuth 服务器等)进行身份验证。
  2. 令牌交换:客户端应用在对用户进行身份验证后,会从您控制的安全后端服务(例如“API 令牌服务”)请求短期有效的令牌。
  3. 访问令牌生成:您的后端服务使用在第 1 步中配置的 Google Cloud 服务账号主账号的凭据,为https://www.googleapis.com/auth/cloud-platform范围生成标准 OAuth 2.0 访问令牌。您可以使用 Google Cloud 身份验证客户端库来实现此目的。

    • 安全性:用于生成这些令牌的服务账号密钥或凭据必须安全地存储在您的后端并进行妥善管理。切勿直接向最终用户客户端应用公开服务账号私钥。 请参阅管理服务账号的最佳实践
  4. 令牌到客户端:您的后端服务将生成的 Google 访问令牌返回给客户端应用。

  5. API 调用:客户端应用使用此 Google 访问令牌对其与 BidiProcessOrder RPC 的 gRPC 或 WebSocket 连接进行身份验证。

3. 使用令牌

  • gRPC:如果提供了服务账号凭据,Google gRPC 客户端库通常会处理令牌刷新和在调用元数据中包含令牌。
  • WebSocket(非浏览器):在 Authorization: Bearer TOKEN 标头中添加令牌。
  • WebSocket(浏览器):如 WebSocket 部分中所述,直接浏览器 WebSocket 连接无法使用授权标头。需要服务器端流式传输代理来验证客户端与 Google Cloud的连接。

连接到 API

您可以使用 gRPC 客户端库或 WebSocket 连接建立流。

gRPC

建议使用 gRPC。您将使用所选语言(例如 Node.js)的客户端库,这些库基于 BidiProcessOrder API 参考

基本步骤包括:

  1. 创建指向订餐 AI Agent API 端点(例如 foodorderingaiagent.googleapis.com)的 gRPC 通道。
  2. 获取 FoodOrderingService 的客户端存根。
  3. 调用 BidiProcessOrder 方法,该方法会返回一个用于发送请求和接收响应的流对象。
  4. 根据您的使用情形实现业务逻辑,该逻辑可同时执行以下操作:
    • 发送来自最终用户的音频、文本和事件输入。
    • 处理来自代理的消息,包括音频、文本和事件。

Node.js


const {FoodOrderingServiceClient} = require('@google-cloud/foodorderingaiagent');

const client = new FoodOrderingServiceClient();

// The stream is initialized immediately. You can now write commands and attach listeners.

const stream = client.bidiProcessOrder();

WebSocket

对于 WebSocket 连接,网址路径为:

wss://foodorderingaiagent.googleapis.com/ws/google.cloud.foodorderingaiagent.v1beta.FoodOrderingService/BidiProcessOrder/locations/LOCATION

  • LOCATION:例如,us

必需的标头

  • AuthorizationBearer TOKEN - 其中 TOKEN 是为您的服务账号获取的 OAuth 2.0 访问令牌。

消息格式

  • 客户端到服务器:发送到 API 的消息(例如 ConfigAudioInputTextInputEventInput)必须是 BidiProcessOrderRequest proto 的 JSON 表示形式,以 websocket.TextMessage 形式发送。
  • 服务器到客户端:从 API (BidiProcessOrderResponse) 收到的消息将作为 websocket.BinaryMessage 发送,但这些二进制消息的内容是 JSON 载荷。
  • 二进制数据:JSON 载荷中的二进制数据(例如,AudioInput 中的 customerAudioAgentAudio 中的 agentAudio)必须采用 base64 编码。

Node.js WebSocket 示例

以下示例展示了如何在 Node.js 中使用 ws 库通过 WebSocket 连接到 API 并与之互动:

const WebSocket = require('ws');

// Replace with your actual values
const location = 'LOCATION';
const projectId = 'PROJECT_ID';
const sessionId = 'SESSION_ID';
const brandId = 'BRAND_ID';
const storeId = 'STORE_ID';
const token = 'OAUTH_TOKEN';

const wsUrl = `wss://foodorderingaiagent.googleapis.com/ws/google.cloud.foodorderingaiagent.v1beta.FoodOrderingService/BidiProcessOrder/locations/${location}`;

const ws = new WebSocket(wsUrl, {
  headers: {
    'Authorization': `Bearer ${token}`
  }
});

ws.on('open', () => {
  console.log('Connected to WebSocket');

  // 1. Send the required initial Config message
  const configRequest = {
    config: {
      session: `projects/${projectId}/locations/${location}/sessions/${sessionId}`,
      store: `projects/${projectId}/locations/${location}/brands/${brandId}/stores/${storeId}`
    }
  };

  // Client-to-server messages are sent as TextMessage
  ws.send(JSON.stringify(configRequest));
  console.log('Sent Config message');
});

ws.on('message', (data, isBinary) => {
  // The documentation specifies that server-to-client messages
  // are sent as BinaryMessage containing a JSON payload.
  if (isBinary) {
    try {
      const response = JSON.parse(data.toString('utf8'));
      console.log('Received response:', response);

      if (response.agentText) {
        console.log(`Agent: ${response.agentText.text}`);
      }

      if (response.agentAudio) {
        const audioBytes = Buffer.from(response.agentAudio.agentAudio, 'base64');
        console.log(`Received ${audioBytes.length} bytes of agent audio.`);
        // Play or process the audio bytes here
      }

      if (response.endSession) {
        console.log('Session ended by agent.');
        ws.close();
      }
    } catch (e) {
      console.error('Failed to parse JSON response:', e);
    }
  }
});

ws.on('close', () => {
  console.log('Connection closed');
});

会话生命周期

每次调用 BidiProcessOrder 都会启动一个会话。只要流处于打开状态,会话就会一直保持活动状态。

1. 启动(配置消息)

  • 建立连接后,客户端发送的第一条消息必须是包含 Config 消息的 BidiProcessOrderRequest
  • Config 中的必填字段
    • session:由客户端生成的唯一会话标识符。格式: projects/PROJECT/locations/LOCATION/sessions/SESSION_ID
      • storeStore 的资源名称。格式: projects/PROJECT/locations/LOCATION/brands/BRAND/stores/STORE
        • 代理使用 store 加载相应的菜单和配置。

Node.js

// Send the first message containing Config
stream.write({
  config: {
    session: client.sessionPath(projectId, location, sessionId),
    store: client.storePath(projectId, location, brandId, storeId),
  }
});

2. 发送输入

  • 在初始 Config 之后,客户端可以发送包含以下输入之一的 BidiProcessOrderRequest 消息流:
    • AudioInput:原始音频数据(通常为 16 位线性 PCM,采样率为 16000 Hz,无标头)。 用于语音互动。
    • TextInput:用户的短信。
    • EventInput:用于指示事件的信号,例如 DriveOffEvent(用于车辆驶离时的得来速用例)、CrewInterjectionEvent(用于在对话过程中有人接管订单接收角色的任何情况)或 OrderStateUpdateEvent(如果订单在客户端修改,例如使用触摸界面)。

Node.js

// Stream user inputs over the active connection
stream.write({textInput: {text: 'Hi, I\'d like to order a cheeseburger.'}});

3. 接收回答

  • 同时,代理会发送回 BidiProcessOrderResponse 消息流。您的客户端必须准备好处理 oneof response 字段中的各种响应类型:
    • AgentAudio:要播放给用户的合成音频字节,用于语音互动。
    • AgentText:代理回复的文本版本。
    • SpeechRecognition:识别出的用户语音的转写内容。
    • UpdatedOrderState: 包含客户的完整当前状态Order, 每当代理更新该状态时,系统都会更新此字段。使用此方法更新应用的订单表示形式。这通常会导致订单状态信息的用户界面或记录系统(例如销售终端系统)发生更新。
    • InterruptionSignal:表示用户打断了智能体的语音。客户端应立即停止播放任何传出的 AgentAudio
    • AgentEvent:需要客户端采取行动的特殊事件,例如 RestartOrder
    • SuggestedOptions:提供用户接下来可能会选择的上下文相关选项,非常适合在屏幕上显示。
    • EndSession: 表示会话已由代理终止(例如,订单完成、用户离开或代理升级)。

Node.js

// Attach event listeners to handle responses sequentially
stream.on('data', (response) => {
  if (response.agentAudio) {
    console.log(`Received ${response.agentAudio.agentAudio.length} bytes of agent audio.`);
  } else if (response.agentText) {
    console.log(`Agent: ${response.agentText.text}`);
  } else if (response.speechRecognition) {
    console.log(`Recognized User Speech: ${response.speechRecognition.transcript}`);
  } else if (response.updatedOrderState) {
    console.log('Order updated.');
  } else if (response.interruptionSignal) {
    console.log('User interrupted the agent. Stop playing audio!');
  } else if (response.endSession) {
    console.log(`Session ended. Type: ${response.endSession.type}, Reason: ${response.endSession.reason}`);
    stream.end();
  }
});

stream.on('error', (err) => {
  console.error('Stream error:', err);
});

4. 关闭数据流

  • 数据流可由客户端或服务器关闭。通常,服务器会使用 EndSession 消息来表示对话结束。当收到此消息时,客户端应关闭流。

处理特定类型的消息

以下部分介绍了如何处理客户端在调用 BidiProcessOrder 时会收到的特定响应类型。

AudioInput

  • 以块的形式流式传输音频(在可用时)。
  • 格式:16 位线性 PCM,16000 Hz 采样率。
  • 音频块包含通常位于 WAV 文件开头的音频文件头。
  • 对于启用了回声消除功能 (Config 中的 enable_echo_cancellation) 的免下车场景,请同时提供 customer_audiocrew_audio

UpdatedOrderState

  • 此消息每次发送时都会提供订单的完整状态。 将订单的任何本地缓存替换为收到的 Order 消息的内容。
  • 使用 Order 商品和修饰符中的 custom_integration_attributesOrder 内容映射到应用记录系统中的等效实体。

InterruptionSignal

  • 收到后,立即停止播放任何 AgentAudio 并清除所有缓冲的代理音频。这样可确保在用户打断智能体的讲话时,对话流程自然流畅。

EndSession

  • 检查 EndType(例如 DRIVE_OFFAGENT_ESCALATION)。
  • 应用应妥善关闭连接并适当过渡用户(例如,在 AGENT_ESCALATION 的情况下通知人工主管,或过渡到订单确认状态)。

最佳做法

  • 异步处理消息:使用线程或非阻塞 I/O 并发发送请求和处理传入的响应,从而最大限度地缩短延迟时间。
  • 重新连接逻辑:在出现网络问题时,实现可靠的重新连接逻辑,并记得发送具有相同会话 ID 的初始 Config 消息以尝试恢复。
  • 错误处理:监控流中的错误。gRPC 和 WebSocket 库提供用于检测流关闭或传输错误的机制。记录这些事件并从容处理。
  • 音频缓冲:仔细管理音频缓冲区,并在必要时实现缓冲,以确保 AgentAudio 的流畅播放和 AudioInput 的及时交付。在确定缓冲方案时,请仔细考虑延迟时间和播放质量之间的权衡取舍。
  • 会话 ID 管理:确保每个不同的订单/对话的会话 ID 都是唯一的。
  • 资源管理:在会话完成时或发生不可恢复的错误时,关闭流并释放资源。
  • 超时:虽然流本身可以长时间存在(默认情况下最长为 15 分钟),但如果需要,请考虑针对特定状态设置应用级超时。

集成流程示例(概念性)

  1. 客户端应用(例如移动应用)发起订单。
  2. 建立与 BidiProcessOrder 的 gRPC/WebSocket 连接。
  3. 发送包含 Config(会话 ID、商店 ID)的 BidiProcessOrderRequest
  4. 接收初始 AgentAudio(例如,欢迎辞)并播放。
  5. 用户说话:捕获音频,以 AudioInput 消息的形式进行流式传输。
  6. 接收 SpeechRecognition(显示转写内容)、AgentAudio(播放回答)和可能的 UpdatedOrderState(更新界面购物车)。
  7. 如果用户中断,则接收 InterruptionSignal 并停止播放。
  8. 继续交换音频或文本输入内容和代理回答。
  9. 用户确认订单:代理发送最终的 UpdatedOrderState
  10. 代理发送 EndSession:客户端关闭流并使用来自最后一个 UpdatedOrderState 的数据在 POS 系统中完成订单。

端到端示例

虽然上述说明逐一介绍了流式传输概念,但下面展示的是完整的端到端集成流程。

Node.js

在尝试此示例之前,请按照《订餐 AI 智能体快速入门:使用客户端库》中的 Node.js 设置说明执行操作。

如需向订餐 AI 智能体进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

const {FoodOrderingServiceClient} = require('@google-cloud/foodorderingaiagent');

async function bidiProcessOrderSample(projectId, location, brand, store, sessionId) {
  const client = new FoodOrderingServiceClient();

  // Create the resource names
  const sessionPath = client.sessionPath(projectId, location, sessionId);
  const storePath = client.storePath(projectId, location, brand, store);

  // Initialize the stream using gRPC. See the WebSocket section for the equivalent WebSocket implementation.
  const stream = client.bidiProcessOrder();

  // Attach event listeners to handle responses sequentially
  stream.on('data', (response) => {
    if (response.agentAudio) {
      console.log(`Received ${response.agentAudio.agentAudio.length} bytes of agent audio.`);
    } else if (response.agentText) {
      console.log(`Agent: ${response.agentText.text}`);
    } else if (response.speechRecognition) {
      console.log(`Recognized User Speech: ${response.speechRecognition.transcript}`);
    } else if (response.updatedOrderState) {
      console.log('Order updated.');
    } else if (response.interruptionSignal) {
      console.log('User interrupted the agent. Stop playing audio!');
    } else if (response.endSession) {
      console.log(`Session ended. Type: ${response.endSession.type}, Reason: ${response.endSession.reason}`);
      stream.end();
    }
  });

  stream.on('error', (err) => {
    console.error('Stream error:', err);
  });

  // 1. Send the first message containing Config
  stream.write({
    config: {
      session: sessionPath,
      store: storePath,
    }
  });

  // 2. Stream user inputs over the active connection
  stream.write({textInput: {text: 'Hi, I\'d like to order a cheeseburger.'}});
}