使用 Java 讀取變更串流
Java 適用的 Cloud Bigtable 用戶端程式庫提供低階方法,可處理資料變更記錄。不過,在大多數情況下,我們建議您使用 Dataflow 串流處理變更,而非使用本頁面所述的方法,因為 Dataflow 會為您處理分割區分割和合併作業。
事前準備
使用 Java 讀取變更串流之前,請務必先熟悉變更串流總覽。然後完成下列必要條件。
設定驗證方法
如要在本機開發環境中使用本頁面的 Java 範例,請安裝並初始化 gcloud CLI,然後使用使用者憑證設定應用程式預設憑證。
-
安裝 Google Cloud CLI。
-
若您採用的是外部識別資訊提供者 (IdP),請先使用聯合身分登入 gcloud CLI。
-
如果您使用本機殼層,請為使用者帳戶建立本機驗證憑證:
gcloud auth application-default login
如果您使用 Cloud Shell,則不需要執行這項操作。
如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI。
詳情請參閱 這篇文章,瞭解如何設定本機開發環境的驗證機制。
如要瞭解如何設定正式環境的驗證機制,請參閱「 為在 Google Cloud上執行的程式碼設定應用程式預設憑證 」。
啟用變更串流
您必須先在資料表上啟用變更串流,才能讀取資料表。您也可以建立新資料表,並啟用變更串流。
必要的角色
如要取得讀取 Bigtable 變更串流所需的權限,請要求系統管理員授予您下列 IAM 角色。
- Bigtable 管理員
(
roles/bigtable.admin) 在包含您打算串流變更的資料表的 Bigtable 執行個體上
將 Java 用戶端程式庫新增為依附元件
在 Maven pom.xml 檔案中新增類似下列的程式碼。將 VERSION 替換為您使用的用戶端程式庫版本。版本必須為 2.21.0 以上。
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
判斷資料表的分區
如要開始發出 ReadChangeStream 要求,您必須知道資料表的分區。可以使用 GenerateInitialChangeStreamPartitions 方法判斷這項資訊。以下範例說明如何使用這個方法取得 ByteStringRanges 串流,代表資料表中的每個分區。每個 ByteStringRange 都包含分區的開始和結束鍵。
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
處理每個分區的變更
然後使用 ReadChangeStream 方法處理每個分割區的變更。以下範例說明如何從目前時間開始,開啟分區的串流。
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery 接受下列引數:
- 串流分區 (必要) - 要從中串流變更的分區
- 下列其中一項:
- 開始時間 - 開始處理變更的提交時間戳記
- 接續符記 - 代表要從哪個位置繼續串流的符記
- 結束時間 (選用) - 達到此時間時,系統會停止處理變更的修訂時間戳記。如未提供值,串流會繼續讀取。
- 心跳時間長度 (選用) - 沒有新變化時的心跳訊息頻率 (預設為五秒)
變更串流錄製格式
傳回的變更串流記錄是下列三種回應類型之一:
CloseStream:表示用戶端應停止從串流讀取資料的訊息。- 狀態 - 顯示關閉串流的原因。下列其中一項:
OK- 已達指定分區的結束時間OUT_OF_RANGE- 指定的分區已不存在,表示該分區已分割或合併。每個新分割區都必須建立新的ReadChangeStream要求。
NewPartitions:提供OUT_OF_RANGE回應的最新分區資訊。ChangeStreamContinuationTokens- 用於從相同位置繼續提出新ReadChangeStream要求的符記清單。每個NewPartition一個。
- 狀態 - 顯示關閉串流的原因。下列其中一項:
Heartbeat- 定期傳送的訊息,內含可用於檢查串流狀態的資訊。EstimatedLowWatermark- 估算指定分區的低水位線ContinuationToken:從目前位置繼續串流指定分割區的權杖。
資料變更記錄內容
如要瞭解變更串流記錄,請參閱「資料變更記錄的內容」。
處理分區異動
資料表的分區變更時,ReadChangeStream 請求會傳回 CloseStream 訊息,其中包含從新分區繼續串流所需的資訊。
如果是分割,這會包含多個新分區,以及每個分區對應的 ContinuationToken。如要從相同位置繼續串流新分區,請為每個新分區發出新的 ReadChangeStream 要求,並附上對應的符記。
舉例來說,如果您正在串流分區 [A,C),而該分區一分為二,變成 [A,B) 和 [B,C),則預期會發生下列事件序列:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
如要從傳送下列 ReadChangeStreamQuery 要求時的位置繼續串流每個分區,請執行下列操作:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
如要合併,並從相同分割區繼續作業,您需要傳送新的 ReadChangeStream 要求,其中包含合併分割區的每個權杖。
舉例來說,如果您串流兩個分區 ([A,B) 和 [B,C)),並將其合併至分區 [A,C),則預期會發生下列事件序列:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
如要從相同位置繼續串流分割區 [A, C),請傳送類似下列內容的 ReadChangeStreamQuery:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));