BigQuery 订阅

本文档简要介绍了 BigQuery 订阅, 其工作流和相关属性。

BigQuery 订阅是一种导出订阅 可在收到消息时将其写入现有 BigQuery 表。您无需配置单独的订阅者客户端。 您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API 来创建、更新、列出、分离或删除 BigQuery 订阅。

如果没有 BigQuery 订阅类型, 您需要拉取或推送订阅以及订阅者 (例如 Dataflow)以读取消息并将其写入 BigQuery 表。如果消息在存储到 BigQuery 表之前不需要进行 额外处理,则无需运行 Dataflow 作业, 您可以改用 BigQuery 订阅。

不过,对于 Pub/Sub 系统,如果数据在存储到 BigQuery 表之前需要进行一些数据转换,我们仍建议使用 Dataflow 流水线。如需了解如何使用 Dataflow 将数据从 Pub/Sub 流式传输到 BigQuery 并进行转换,请参阅从 Pub/Sub 流式传输到 BigQuery

Dataflow 中的 Pub/Sub Subscription to BigQuery 模板默认强制执行“恰好一次”传送 。这通常通过 Dataflow 流水线中的重复数据消除机制来实现。不过,BigQuery 订阅仅支持“至少一次”传送。如果您的使用场景对精确重复数据消除至关重要,请考虑在 BigQuery 中使用下游流程来处理潜在的重复项。

准备工作

在阅读本文档之前,请确保您熟悉以下内容:

BigQuery 订阅工作流

下图显示了 BigQuery 订阅和 BigQuery 之间的工作流。

BigQuery 订阅的消息流
图 1.BigQuery 订阅的工作流

下面简要介绍了引用图 1 的工作流:

  1. Pub/Sub 使用 BigQuery Storage Write API 将数据发送到 BigQuery 表。
  2. 消息会分批发送到 BigQuery 表。
  3. 成功完成写入操作后,API 会返回 OK 响应。
  4. 如果写入操作中存在任何失败, Pub/Sub 消息本身会被否定确认。然后,系统会重新发送该 消息。如果消息失败次数过多,并且订阅中配置了 死信主题,则该消息会被移至 死信主题。

BigQuery 订阅的属性

您为 BigQuery 订阅配置的属性 决定了 Pub/Sub 将消息写入的 BigQuery 表以及该表的架构类型。

如需了解详情,请参阅 BigQuery 属性

架构兼容性

仅当您在创建 BigQuery 订阅时选择 使用主题架构 选项时,本部分才适用。

Pub/Sub 和 BigQuery 使用不同的方式来 定义其架构。Pub/Sub 架构以 Apache Avro 或 Protocol Buffer 格式定义,而 BigQuery 架构则使用 多种格式定义。

以下是有关 Pub/Sub 主题与 BigQuery 表之间的架构兼容性的 重要信息列表。

  • 任何包含格式不正确的字段的消息都不会写入 BigQuery。

  • 在 BigQuery 架构中,INTSMALLINTINTEGERBIGINTTINYINTBYTEINTINTEGER 的别名;DECIMALNUMERIC 的别名;BIGDECIMALBIGNUMERIC 的别名。

  • 如果主题架构中的类型为 string,而 BigQuery 表中的类型为 JSONTIMESTAMPDATETIMEDATETIMENUMERICBIGNUMERIC,则 Pub/Sub 消息中此字段的任何值都必须遵循为 BigQuery 数据 类型指定的格式。

  • 系统支持一些 Avro 逻辑类型,如下表所示。 任何未列出的逻辑类型都仅与它们 注释的等效 Avro 类型匹配,如 Avro 规范中所述。

以下是将不同架构格式映射到 BigQuery 数据类型的集合。

Avro 类型

Avro 类型 BigQuery 数据类型
null Any NULLABLE
boolean BOOLEAN
int INTEGERNUMERICBIGNUMERIC
long INTEGERNUMERICBIGNUMERIC
float FLOAT64NUMERICBIGNUMERIC
double FLOAT64NUMERICBIGNUMERIC
bytes BYTESNUMERICBIGNUMERIC
string STRINGJSON、 、TIMESTAMPDATETIME、 、DATETIME、 、NUMERICBIGNUMERIC
record RECORD/STRUCT
array of Type REPEATED Type
map with value type ValueType REPEATED STRUCT <key STRING, value ValueType>
union with two types, one that is null and the other Type NULLABLE Type
other unions 无法映射
fixed BYTESNUMERICBIGNUMERIC
enum INTEGER

Avro 逻辑类型

Avro 逻辑类型 BigQuery 数据类型
timestamp-micros TIMESTAMP
timestamp-millis TIMESTAMP
date DATE
time-micros TIME
time-millis TIME
duration INTERVAL
decimal NUMERICBIGNUMERIC

Protocol Buffer 类型

Protocol Buffer 类型 BigQuery 数据类型
double FLOAT64NUMERICBIGNUMERIC
float FLOAT64NUMERICBIGNUMERIC
int32 INTEGERNUMERIC、 、BIGNUMERICDATE
int64 INTEGERNUMERIC、 、BIGNUMERICDATE、 、DATETIMETIMESTAMP
uint32 INTEGERNUMERIC、 、BIGNUMERICDATE
uint64 NUMERICBIGNUMERIC
sint32 INTEGERNUMERICBIGNUMERIC
sint64 INTEGERNUMERIC、 、BIGNUMERICDATE、 、DATETIMETIMESTAMP
fixed32 INTEGERNUMERIC、 、BIGNUMERICDATE
fixed64 NUMERICBIGNUMERIC
sfixed32 INTEGERNUMERIC、 、BIGNUMERICDATE
sfixed64 INTEGERNUMERIC、 、BIGNUMERICDATE、 、DATETIMETIMESTAMP
bool BOOLEAN
string STRINGJSON、 、TIMESTAMPDATETIME、 、DATETIME、 、NUMERICBIGNUMERIC
bytes BYTESNUMERICBIGNUMERIC
enum INTEGER
message RECORD/STRUCT
oneof 无法映射
map<KeyType, ValueType> REPEATED RECORD<key KeyType, value ValueType>
enum INTEGER
repeated/array of Type REPEATED Type

日期和时间整数表示法

从整数映射到日期或时间类型之一时,该数字必须 表示正确的值。以下是从 BigQuery 数据 类型到表示它们的整数的映射。

BigQuery 数据类型 整数表示法
DATE 自 Unix 纪元(1970 年 1 月 1 日)以来的天数
DATETIME 以微秒为单位的日期和时间,使用 CivilTimeEncoder表示为民用时间
TIME 以微秒为单位的时间,使用 CivilTimeEncoder 表示为民用时间
TIMESTAMP 自 Unix 纪元(1970 年 1 月 1 日 00:00:00 UTC)以来的微秒数

BigQuery 变更数据捕获注入

当订阅属性中的 use_table_schemause_topic_schema 设置为 true 时,BigQuery 订阅支持 变更数据捕获 (CDC) 注入更新。如需将该功能与 use_topic_schema搭配使用,请使用以下字段设置主题的架构

  • _CHANGE_TYPE(必需):设置为 UPSERTDELETEstring 字段。

    • 如果写入 BigQuery 表的 Pub/Sub 消息的 _CHANGE_TYPE 设置为 UPSERT,则 BigQuery 会更新具有相同键的行(如果存在),否则会插入新行。

    • 如果写入 BigQuery 表的 Pub/Sub 消息的 _CHANGE_TYPE 设置为 DELETE, 则 BigQuery 会删除表中具有 相同键的行(如果存在)。

  • _CHANGE_SEQUENCE_NUMBER(可选):设置为确保按顺序处理对 BigQuery 表进行的 更新和删除的string字段。同一行键的消息必须包含单调 递增值,用于 _CHANGE_SEQUENCE_NUMBER。序列号 小于为某行处理的最高序列号的消息 对 BigQuery 表中的行没有任何影响。序列号必须遵循 _CHANGE_SEQUENCE_NUMBER格式

如需将该功能与 use_table_schema 搭配使用,请在 JSON 消息中添加上述字段。

如需了解价格,请参阅 BigQuery CDC 注入价格

BigQuery 中的 Apache Iceberg BigLake 表

BigQuery 订阅可以与 BigLake BigQuery 中的 Apache Iceberg 表搭配使用,无需进行任何额外更改。

BigQuery 中的 Apache Iceberg BigLake 表为在上构建开放格式湖仓一体提供了 基础 Google Cloud。这些表 提供与 标准(内置) BigQuery 表相同的全代管式体验,但使用 Parquet 将数据存储在 客户拥有的存储分区中,以便与 Iceberg 开放表格式进行互操作。

如需了解如何在 BigQuery 中创建 Apache Iceberg BigLake 表,请参阅创建 Iceberg 表

处理消息故障

如果 Pub/Sub 消息无法写入 BigQuery,则无法确认该消息。如需转发此类 无法递送的消息,请在 BigQuery 订阅中配置死信 主题。转发到死信主题的 Pub/Sub 消息 包含一个属性 CloudPubSubDeadLetterSourceDeliveryErrorMessage,其中包含 Pub/Sub 消息无法写入 BigQuery 的原因。

如果 Pub/Sub 无法将消息写入 BigQuery, 则 Pub/Sub 会以类似于 推送退避行为的方式退避消息传送。不过,如果订阅附加了死信主题,则当消息失败是由于架构兼容性错误导致时,Pub/Sub 不会退避传送。

配额和限制

每个区域的 BigQuery 订阅者 吞吐量都有配额限制。如需了解详情,请参阅 Pub/Sub 配额 和限制

BigQuery 订阅使用 BigQuery Storage Write API写入数据。如需了解 Storage Write API 的配额和限制,请参阅 Storage Write API 请求。 BigQuery 订阅仅消耗 Storage Write API 的吞吐量配额。在这种情况下,您可以忽略其他 Storage Write API 配额注意事项。

价格

如需了解 BigQuery 订阅的价格,请参阅 Pub/Sub 价格页面

后续步骤