使用 Java 读取变更数据流
Java 版 Cloud Bigtable 客户端库提供处理数据更改记录的初级方法。不过,在大多数情况下,我们建议您使用 Dataflow 流式传输更改,而不是使用本页面中描述的方法,因为 Dataflow 会为您处理分区拆分和合并。
准备工作
在使用 Java 读取变更数据流之前,请务必先熟悉变更数据流概览。然后,完成以下前提条件。
设置身份验证
如需在本地开发环境中使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用您的用户凭据设置应用默认凭据。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如果您使用的是本地 shell,请为您的用户账号创建本地身份验证凭证:
gcloud auth application-default login
如果您使用的是 Cloud Shell,则无需执行此操作。
如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI。
如需了解详情,请参阅 为本地开发环境设置身份验证。
如需了解如何为生产环境设置身份验证,请参阅 为在 Google Cloud上运行的代码设置应用默认凭据 。
启用变更数据流
您必须先在表上启用变更数据流,然后才能读取数据流。您还可以创建启用了变更数据流的新表。
所需的角色
如需获得读取 Bigtable 变更数据流所需的权限,请让您的管理员授予您以下 IAM 角色。
- 针对 Bigtable 实例(包含您计划从中流式传输更改的表)的 Bigtable Administrator (
roles/bigtable.admin)
将 Java 客户端库添加为依赖项
将类似于以下示例的代码添加到您的 Maven pom.xml 文件中。将 VERSION 替换为您正在使用的客户端库的版本。版本必须为 2.21.0 或更高版本。
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
确定表的分区
如需开始发出 ReadChangeStream 请求,您需要知道表的分区。您可以使用 GenerateInitialChangeStreamPartitions 方法确定。以下示例展示了如何使用此方法获取表示表中每个分区的 ByteStringRanges 流。每个 ByteStringRange 都包含分区的开始和结束键。
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
处理每个分区的更改
然后,您可以使用 ReadChangeStream 方法处理每个分区的更改。以下示例演示了如何从当前时间开始打开分区的数据流。
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery 接受以下参数:
- 数据流分区(必需)- 从中流式传输更改的分区
- 下列其中一项:
- 开始时间 - 提交开始处理更改的时间戳
- 连续令牌 - 表示要继续流式传输的位置的令牌
- 结束时间(可选)- 提交在到达时停止处理更改的时间戳。如果您不提供此值,数据流就会继续读取。
- 检测信号时长(可选)- 没有新更改时的检测信号消息的频率(默认为 5 秒)
变更数据流记录格式
返回的变更数据流记录是以下三种响应类型之一:
ChangeStreamMutation- 表示数据更改记录的消息。CloseStream- 表示客户端应停止从数据流读取的消息。- 状态 - 表示关闭数据流的原因。以下状态之一:
OK- 已达到指定分区的结束时间OUT_OF_RANGE- 指定分区不复存在,表示此分区上发生了拆分或合并。需要为每个新分区创建一个新的ReadChangeStream请求。
NewPartitions- 提供有关OUT_OF_RANGE响应的分区信息更新。ChangeStreamContinuationTokens- 用于从同一位置恢复新ReadChangeStream请求的令牌列表。每个NewPartition一个。
- 状态 - 表示关闭数据流的原因。以下状态之一:
Heartbeat- 包含可用于检查数据流状态的检查点的定期消息。EstimatedLowWatermark- 指定分区的低水印估计值ContinuationToken- 用于从当前位置继续流式传输指定分区的令牌。
数据更改记录内容
如需了解变更数据流记录,请参阅数据更改记录中包含的内容。
处理分区中的更改
当表分区发生更改时,ReadChangeStream 请求会返回一条 CloseStream 消息,其中包含从新分区继续流式传输所需的信息。
发生拆分时,将包含多个新分区以及每个分区的相应 ContinuationToken。如需继续从同一位置流式传输新分区,请使用相应令牌对每个新分区发出新的 ReadChangeStream 请求。
例如,如果您要流式传输分区 [A,C) 并将其拆分为 [A,B) 和 [B,C) 两个分区,则会出现以下事件序列:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
若要继续从同一位置流式传输每个分区,请发送以下 ReadChangeStreamQuery 请求:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
发生合并时,如需从同一分区继续,您需要发送包含合并分区中每个令牌的新 ReadChangeStream 请求。
例如,如果您要流式传输 [A,B) 和 [B,C) 两个分区,并且它们合并到分区 [A,C) 中,则会出现以下事件序列:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
如需从同一位置继续流式传输分区 [A, C),请按如下所示发送 ReadChangeStreamQuery:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));