在持续查询中使用流到流联接

如需请求对此功能的支持或提供反馈,请发送电子邮件至 bq-continuous-queries-feedback@google.com

持续查询支持将 JOIN 作为有状态操作。借助有状态操作,持续查询可以执行复杂的分析,而这需要跨多个行或时间间隔保留信息。借助此功能,您可以在查询运行时将必要的数据存储在内存中,从而关联来自不同流的事件。如需详细了解有状态操作,请参阅 支持的有状态操作

流到流联接是两个或多个接收面向时间的数据注入的表之间的联接操作。

支持的 JOIN 类型

持续查询支持以下 JOIN 类型:

  • 流到流 JOIN - 两个或多个接收面向时间的数据注入的表之间的联接操作。
  • INNER JOIN

不支持的 JOIN 类型

持续查询不支持以下 JOIN 类型:

  • 流到静态表 JOIN - 至少一个联接表是不接收面向时间的数据注入的静态表的联接。静态表的示例是维度表。
  • INNER 以外的其他形式的 JOIN 操作。
  • 没有面向时间的 JOIN 子句的 JOIN。

联接来自多个流的数据

以下查询展示了如何将出租车行程表联接到出租车请求表,并在 5 分钟的时间窗口内找出离请求者最近的可用出租车,并将此数据导出到另一个 BigQuery 表中:

INSERT INTO
 `real_time_taxi_streaming.matched_rides`
SELECT
 requests.timestamp AS request_time,
 requests.request_id,
 taxis.taxi_id,
 ST_DISTANCE(
   ST_GEOGPOINT(requests.longitude, requests.latitude),
   ST_GEOGPOINT(taxis.longitude, taxis.latitude)
   ) AS distance_in_meters,
 taxis.timestamp AS taxi_available_time
FROM
 APPENDS (TABLE `real_time_taxi_streaming.ride_requests`,
   CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS requests
INNER JOIN
 APPENDS (TABLE `real_time_taxi_streaming.taxirides`,
   CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS taxis
ON
 requests.geohash = taxis.geohash
WHERE
 taxis.ride_status = 'available'
 AND taxis._CHANGE_TIMESTAMP BETWEEN(requests._CHANGE_TIMESTAMP - INTERVAL 5 MINUTE) AND requests._CHANGE_TIMESTAMP
 AND ST_DWITHIN(
   ST_GEOGPOINT(requests.longitude, requests.latitude),
   ST_GEOGPOINT(taxis.longitude, taxis.latitude),
   2000 -- Distance in meters
   );

联接注意事项

以下部分介绍了执行流到流联接时需要考虑的事项。

限制

  • 仅支持 INNER JOIN 操作;不支持其他形式,例如 LEFTFULL OUTER
  • INNER JOIN 操作的每一侧都必须为持续查询指定开始时间。
  • 除了联接键(例如 table1.user_id = table2.user_id)之外,JOIN 子句还必须包含一个条件,用于将时间戳列限制为常量间隔。此条件限制了系统等待另一个流中匹配事件到达的时间。例如,您可以指定一个流中的事件只能与另一个流中的事件联接,前提是它们的时间戳在 5 分钟的时间间隔内。您不限于使用对称间隔。例如,您可以在联接条件的一侧使用 5 分钟的间隔,而在另一侧使用 1 小时的间隔。
  • 使用流到流联接启动持续查询时,仅支持 APPENDS 函数。不支持 CHANGES 函数。
  • BigQuery 系统时间列(由 _CHANGE_TIMESTAMP 定义)是联接操作唯一支持的时间戳列。不支持用户定义的列。
  • 与窗口聚合结合使用时,您必须遵循所有 已记录的窗口聚合 限制

价格注意事项

BigQuery 持续查询的费用是根据作业运行时消耗的 计算 容量(槽) 收取的。这种基于计算的模型也适用于有状态操作(例如联接)。由于联接需要系统在查询处于活动状态时存储“状态”,因此它们会消耗额外的槽资源。联接中存储的上下文或数据越多(例如,当您在 JOINWHERE 子句中使用较长的时间间隔时),需要保留的状态就越多,这会导致槽利用率更高。

后续步骤