本文档简要介绍了 BigQuery 订阅, 其工作流和相关属性。
BigQuery 订阅是一种导出订阅 可在收到消息时将其写入现有 BigQuery 表。您无需配置单独的订阅者客户端。 您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API 来创建、更新、列出、分离或删除 BigQuery 订阅。
如果没有 BigQuery 订阅类型, 您需要拉取或推送订阅以及订阅者 (例如 Dataflow)以读取消息并将其写入 BigQuery 表。如果消息在存储到 BigQuery 表之前不需要进行 额外处理,则无需运行 Dataflow 作业, 您可以改用 BigQuery 订阅。
不过,对于 Pub/Sub 系统,如果数据在存储到 BigQuery 表之前需要进行一些数据转换,我们仍建议使用 Dataflow 流水线。如需了解如何使用 Dataflow 将数据从 Pub/Sub 流式传输到 BigQuery 并进行转换,请参阅从 Pub/Sub 流式传输到 BigQuery。
Dataflow 中的 Pub/Sub Subscription to BigQuery 模板默认强制执行“恰好一次”传送 。这通常通过 Dataflow 流水线中的重复数据消除机制来实现。不过,BigQuery 订阅仅支持“至少一次”传送。如果您的使用场景对精确重复数据消除至关重要,请考虑在 BigQuery 中使用下游流程来处理潜在的重复项。
准备工作
在阅读本文档之前,请确保您熟悉以下内容:
Pub/Sub 的工作原理以及不同的 Pub/Sub 术语。
Pub/Sub 支持的不同类型的订阅 ,以及您可能想要使用 BigQuery 订阅的原因。
BigQuery 的工作原理,以及如何配置 和管理 BigQuery 表。
BigQuery 订阅工作流
下图显示了 BigQuery 订阅和 BigQuery 之间的工作流。
下面简要介绍了引用图 1 的工作流:
- Pub/Sub 使用 BigQuery Storage Write API 将数据发送到 BigQuery 表。
- 消息会分批发送到 BigQuery 表。
- 成功完成写入操作后,API 会返回 OK 响应。
- 如果写入操作中存在任何失败, Pub/Sub 消息本身会被否定确认。然后,系统会重新发送该 消息。如果消息失败次数过多,并且订阅中配置了 死信主题,则该消息会被移至 死信主题。
BigQuery 订阅的属性
您为 BigQuery 订阅配置的属性 决定了 Pub/Sub 将消息写入的 BigQuery 表以及该表的架构类型。
如需了解详情,请参阅 BigQuery 属性。
架构兼容性
仅当您在创建 BigQuery 订阅时选择 使用主题架构 选项时,本部分才适用。
Pub/Sub 和 BigQuery 使用不同的方式来 定义其架构。Pub/Sub 架构以 Apache Avro 或 Protocol Buffer 格式定义,而 BigQuery 架构则使用 多种格式定义。
以下是有关 Pub/Sub 主题与 BigQuery 表之间的架构兼容性的 重要信息列表。
任何包含格式不正确的字段的消息都不会写入 BigQuery。
在 BigQuery 架构中,
INT、SMALLINT、INTEGER、BIGINT、TINYINT和BYTEINT是INTEGER的别名;DECIMAL是NUMERIC的别名;BIGDECIMAL是BIGNUMERIC的别名。如果主题架构中的类型为
string,而 BigQuery 表中的类型为JSON、TIMESTAMP、DATETIME、DATE、TIME、NUMERIC或BIGNUMERIC,则 Pub/Sub 消息中此字段的任何值都必须遵循为 BigQuery 数据 类型指定的格式。系统支持一些 Avro 逻辑类型,如下表所示。 任何未列出的逻辑类型都仅与它们 注释的等效 Avro 类型匹配,如 Avro 规范中所述。
以下是将不同架构格式映射到 BigQuery 数据类型的集合。
Avro 类型
| Avro 类型 | BigQuery 数据类型 |
null |
Any NULLABLE |
boolean |
BOOLEAN |
int |
INTEGER、NUMERIC 或
BIGNUMERIC |
long |
INTEGER、NUMERIC 或
BIGNUMERIC |
float |
FLOAT64、NUMERIC 或
BIGNUMERIC |
double |
FLOAT64、NUMERIC 或
BIGNUMERIC |
bytes |
BYTES、NUMERIC 或
BIGNUMERIC |
string |
STRING、JSON、
、TIMESTAMP、DATETIME、
、DATE、TIME、
、NUMERIC 或 BIGNUMERIC |
record |
RECORD/STRUCT |
array of Type |
REPEATED Type |
map with value type ValueType
|
REPEATED STRUCT <key STRING, value
ValueType> |
union with two types, one that is
null and the other Type |
NULLABLE Type |
other unions |
无法映射 |
fixed |
BYTES、NUMERIC 或
BIGNUMERIC |
enum |
INTEGER |
Avro 逻辑类型
| Avro 逻辑类型 | BigQuery 数据类型 |
timestamp-micros |
TIMESTAMP |
timestamp-millis |
TIMESTAMP |
date |
DATE |
time-micros |
TIME |
time-millis |
TIME |
duration |
INTERVAL |
decimal |
NUMERIC 或 BIGNUMERIC |
Protocol Buffer 类型
| Protocol Buffer 类型 | BigQuery 数据类型 |
double |
FLOAT64、NUMERIC 或
BIGNUMERIC |
float |
FLOAT64、NUMERIC 或
BIGNUMERIC |
int32 |
INTEGER、NUMERIC、
、BIGNUMERIC 或 DATE |
int64 |
INTEGER、NUMERIC、
、BIGNUMERIC、DATE、
、DATETIME 或 TIMESTAMP |
uint32 |
INTEGER、NUMERIC、
、BIGNUMERIC 或 DATE |
uint64 |
NUMERIC 或 BIGNUMERIC |
sint32 |
INTEGER、NUMERIC 或
BIGNUMERIC |
sint64 |
INTEGER、NUMERIC、
、BIGNUMERIC、DATE、
、DATETIME 或 TIMESTAMP |
fixed32 |
INTEGER、NUMERIC、
、BIGNUMERIC 或 DATE |
fixed64 |
NUMERIC 或 BIGNUMERIC |
sfixed32 |
INTEGER、NUMERIC、
、BIGNUMERIC 或 DATE |
sfixed64 |
INTEGER、NUMERIC、
、BIGNUMERIC、DATE、
、DATETIME 或 TIMESTAMP |
bool |
BOOLEAN |
string |
STRING、JSON、
、TIMESTAMP、DATETIME、
、DATE、TIME、
、NUMERIC 或 BIGNUMERIC |
bytes |
BYTES、NUMERIC 或
BIGNUMERIC |
enum |
INTEGER |
message |
RECORD/STRUCT |
oneof |
无法映射 |
map<KeyType, ValueType> |
REPEATED RECORD<key KeyType, value
ValueType> |
enum |
INTEGER |
repeated/array of Type |
REPEATED Type |
日期和时间整数表示法
从整数映射到日期或时间类型之一时,该数字必须 表示正确的值。以下是从 BigQuery 数据 类型到表示它们的整数的映射。
| BigQuery 数据类型 | 整数表示法 |
DATE |
自 Unix 纪元(1970 年 1 月 1 日)以来的天数 |
DATETIME |
以微秒为单位的日期和时间,使用 CivilTimeEncoder表示为民用时间 |
TIME |
以微秒为单位的时间,使用 CivilTimeEncoder 表示为民用时间 |
TIMESTAMP |
自 Unix 纪元(1970 年 1 月 1 日 00:00:00 UTC)以来的微秒数 |
BigQuery 变更数据捕获注入
当订阅属性中的 use_table_schema 或 use_topic_schema 设置为 true 时,BigQuery 订阅支持 变更数据捕获 (CDC)
注入更新。如需将该功能与
use_topic_schema搭配使用,请使用以下字段设置主题的架构:
_CHANGE_TYPE(必需):设置为UPSERT或DELETE的string字段。如果写入 BigQuery 表的 Pub/Sub 消息的
_CHANGE_TYPE设置为UPSERT,则 BigQuery 会更新具有相同键的行(如果存在),否则会插入新行。如果写入 BigQuery 表的 Pub/Sub 消息的
_CHANGE_TYPE设置为DELETE, 则 BigQuery 会删除表中具有 相同键的行(如果存在)。
_CHANGE_SEQUENCE_NUMBER(可选):设置为确保按顺序处理对 BigQuery 表进行的 更新和删除的string字段。同一行键的消息必须包含单调 递增值,用于_CHANGE_SEQUENCE_NUMBER。序列号 小于为某行处理的最高序列号的消息 对 BigQuery 表中的行没有任何影响。序列号必须遵循_CHANGE_SEQUENCE_NUMBER格式。
如需将该功能与 use_table_schema 搭配使用,请在
JSON 消息中添加上述字段。
如需了解价格,请参阅 BigQuery CDC 注入价格。
BigQuery 中的 Apache Iceberg BigLake 表
BigQuery 订阅可以与 BigLake BigQuery 中的 Apache Iceberg 表搭配使用,无需进行任何额外更改。
BigQuery 中的 Apache Iceberg BigLake 表为在上构建开放格式湖仓一体提供了 基础 Google Cloud。这些表 提供与 标准(内置) BigQuery 表相同的全代管式体验,但使用 Parquet 将数据存储在 客户拥有的存储分区中,以便与 Iceberg 开放表格式进行互操作。
如需了解如何在 BigQuery 中创建 Apache Iceberg BigLake 表,请参阅创建 Iceberg 表。
处理消息故障
如果 Pub/Sub 消息无法写入
BigQuery,则无法确认该消息。如需转发此类
无法递送的消息,请在
BigQuery 订阅中配置死信
主题。转发到死信主题的 Pub/Sub 消息
包含一个属性
CloudPubSubDeadLetterSourceDeliveryErrorMessage,其中包含
Pub/Sub 消息无法写入
BigQuery 的原因。
如果 Pub/Sub 无法将消息写入 BigQuery, 则 Pub/Sub 会以类似于 推送退避行为的方式退避消息传送。不过,如果订阅附加了死信主题,则当消息失败是由于架构兼容性错误导致时,Pub/Sub 不会退避传送。
配额和限制
每个区域的 BigQuery 订阅者 吞吐量都有配额限制。如需了解详情,请参阅 Pub/Sub 配额 和限制。
BigQuery 订阅使用 BigQuery Storage Write API写入数据。如需了解 Storage Write API 的配额和限制,请参阅 Storage Write API 请求。 BigQuery 订阅仅消耗 Storage Write API 的吞吐量配额。在这种情况下,您可以忽略其他 Storage Write API 配额注意事项。
价格
如需了解 BigQuery 订阅的价格,请参阅 Pub/Sub 价格页面。
后续步骤
创建订阅,例如 BigQuery 订阅。
排查 BigQuery 订阅 问题。
了解 BigQuery。
查看 Pub/Sub 的价格,包括 BigQuery 订阅。
使用
gcloudCLI 命令创建或修改订阅。使用 REST API 创建或修改订阅。