使用 Dataflow 建構變更串流連線

本頁面說明如何使用變更串流,建立可取用及轉送 Spanner 變更資料的 Dataflow 管道。您可以使用本頁面的程式碼範例,建構自訂管道。

核心概念

以下是變更串流的 Dataflow 管道的一些核心概念。

Dataflow

Dataflow 是一項無伺服器、快速且具成本效益的服務,支援串流和批次處理作業。您可以使用開放原始碼 Apache Beam 程式庫編寫處理工作,並透過這項服務自動佈建基礎架構及管理叢集,享有可攜性。從變更串流讀取資料時,Dataflow 會提供近乎即時的串流。

您可以使用 Dataflow 和 SpannerIO 連接器,取用 Spanner 變更串流。這個連接器提供 Spanner API 的抽象層,可查詢變更串流。使用這個連結器時,您不必管理變更串流分割區生命週期,這是在直接使用 Spanner API 時的必要步驟。連接器會提供資料變更記錄串流,讓您專注於應用程式邏輯,不必費心處理特定 API 詳細資料和動態變更串流分割。在大多數需要讀取變更串流資料的情況下,我們建議使用 SpannerIO 連接器,而非 Spanner API。

Dataflow 範本是預先建構的 Dataflow 管道,可實作常見用途。如需總覽,請參閱「Dataflow 範本」。

Dataflow 管道

Spanner 變更串流 Dataflow 管道由四個主要部分組成:

  1. 含有變更串流的 Spanner 資料庫
  2. SpannerIO 連接器
  3. 使用者定義的轉換和接收器
  4. Apache Beam 接收器 I/O 寫入器

圖片

Spanner 變更串流

如要瞭解如何建立變更串流,請參閱「建立變更串流」。

Apache Beam SpannerIO 連接器

這是先前 Dataflow 區段所述的 SpannerIO 連接器。 這是來源 I/O 連接器,會將資料變更記錄的 PCollection 發送至管道的後續階段。每個發出的資料變更記錄的「事件時間」為提交時間戳記。請注意,發出的記錄是無序的,且 SpannerIO 連接器保證不會有延遲記錄

使用變更串流時,Dataflow 會使用檢查點。因此,每個工作人員最多可能要等待設定的檢查點間隔,才能緩衝變更,然後傳送變更以進行後續處理。

使用者定義的轉換

使用者定義的轉換可讓使用者在 Dataflow 管道中彙整、轉換或修改處理資料。常見用途包括移除個人識別資訊、滿足下游資料格式需求,以及排序。如需轉換的程式設計指南,請參閱 Apache Beam 官方說明文件中的轉換

Apache Beam 接收器 I/O 寫入器

Apache Beam 包含內建的 I/O 連接器,可用於從 Dataflow 管道寫入 BigQuery 等資料接收器。系統原生支援最常見的資料接收器。

Dataflow 範本

Dataflow 範本提供一種方法,可使用 Google Cloud 控制台、 Google Cloud CLI 或 REST API 呼叫,根據常見用途的預先建構 Docker 映像檔建立 Dataflow 工作。

針對 Spanner 變更串流,我們提供三種 Dataflow Flex 範本:

使用 Spanner 變更串流至 Pub/Sub 範本時,請注意下列限制:

設定 Dataflow 範本的 IAM 權限

使用上述三種彈性範本建立 Dataflow 工作前,請確認您已為下列服務帳戶取得必要 IAM 權限

如果您沒有必要的 IAM 權限,則必須指定使用者代管的工作者服務帳戶,才能建立 Dataflow 工作。詳情請參閱「Dataflow 安全性與權限」。

如果您嘗試從 Dataflow 彈性範本執行工作,但缺少所有必要權限,工作可能會失敗,並顯示「failed to read the result file」(無法讀取結果檔案) 錯誤或「permission denied on resource」(資源權限遭拒) 錯誤。詳情請參閱「排解 Flex 範本問題」。

建構 Dataflow pipeline

本節說明如何進行連接器的初始設定,並提供常見整合的範例,以搭配 Spanner 變更串流功能使用。

如要按照這些步驟操作,您需要 Dataflow 的 Java 開發環境。詳情請參閱「使用 Java 建立 Dataflow 管道」。

建立變更串流

如要瞭解如何建立變更串流,請參閱「建立變更串流」。如要繼續進行下一個步驟,您必須擁有已設定變更串流的 Spanner 資料庫。

授予精細的存取權控管權限

如果您預期任何精細存取權控管使用者會執行 Dataflow 工作,請確保使用者已獲授權存取資料庫角色,該角色在變更串流上具有 SELECT 權限,在變更串流的資料表值函式上具有 EXECUTE 權限。此外,請確保主體在 SpannerIO 設定或 Dataflow Flex 範本中指定資料庫角色。

詳情請參閱「關於精細的存取權控管機制」。

將 SpannerIO 連接器新增為依附元件

Apache Beam SpannerIO 連接器會封裝直接使用 Cloud Spanner API 消耗變更串流的複雜性,並將變更串流資料記錄的 PCollection 發送至管道的後續階段。

使用者資料流管道的其他階段可以取用這些物件。變更串流整合作業是 SpannerIO 連接器的一部分。如要使用 SpannerIO 連接器,必須在 pom.xml 檔案中新增依附元件:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

建立中繼資料庫

執行 Apache Beam 管道時,連接器需要追蹤每個分割區。連接器會在初始化期間建立 Spanner 資料表,並將中繼資料儲存在該資料表中。設定連結器時,請指定要建立這個資料表的資料庫。

如「變更串流最佳做法」一文所述,我們建議您為此目的建立新的資料庫,而不是允許連接器使用應用程式的資料庫來儲存其中繼資料表。

使用 SpannerIO 連接器的 Dataflow 工作擁有者,必須透過這個中繼資料庫設定下列 IAM 權限

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

設定連接器

Spanner 變更串流連接器可設定如下:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

以下說明 readChangeStream() 選項:

Spanner 設定 (必填)

用於設定專案、執行個體和資料庫,變更串流是在這些位置建立,且應從這些位置查詢。 此外,如果執行 Dataflow 作業的 IAM 主體是精細存取權控管使用者,也可以選擇指定要使用的資料庫角色。這項工作會假設這個資料庫角色,以存取變更串流。詳情請參閱「關於精細的存取權控管機制」。

變更串流名稱 (必填)

這個名稱是變更串流的專屬 ID。此處的名稱必須與建立時使用的名稱相同。

中繼資料執行個體 ID (選填)

這個執行個體會儲存連接器使用的中繼資料,以控管變更串流 API 資料的取用情形。

中繼資料庫 ID (必要)

這個資料庫會儲存連接器使用的中繼資料,用來控管變更串流 API 資料的取用情形。

中繼資料表名稱 (選用)

只有在更新現有管道時,才應使用這項功能。

這是連接器要使用的現有中繼資料表名稱。連接器會使用這項設定儲存中繼資料,以控管變更串流 API 資料的取用情形。如果省略這個選項,Spanner 會在連接器初始化時,建立名稱由系統產生的新資料表。

RPC 優先順序 (選填)

變更串流查詢使用的要求優先順序。如果省略這個參數,系統會使用 high priority

InclusiveStartAt (必要)

系統會將指定時間戳記之後的變更傳回給呼叫端。

InclusiveEndAt (選填)

系統會將指定時間戳記之前的變更傳回給呼叫者。如果省略這項參數,系統會無限期發出變更。

新增轉換和接收器,處理變更資料

完成上述步驟後,設定的 SpannerIO 連接器即可發出 DataChangeRecord 物件的 PCollection。如需多個範例管道設定,以各種方式處理這項串流資料,請參閱「轉換和接收器範例」。

請注意,SpannerIO 連接器發出的變更串流記錄是無序的。 這是因為 PCollection 不提供任何排序保證。如需排序過的串流,您必須在管道中將記錄分組並排序為轉換:請參閱「範例:依鍵排序」。您可以擴充這個範例,根據記錄的任何欄位排序記錄,例如交易 ID。

轉換和接收器範例

您可以定義自己的轉換作業,並指定接收器將資料寫入其中。 Apache Beam 文件提供許多可套用的轉換,以及可將資料寫入外部系統的現成 I/O 連接器

範例:依鍵排序

這個程式碼範例會使用 Dataflow 連接器,依提交時間戳記排序並依主鍵分組,發出資料變更記錄。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

這個程式碼範例會使用狀態和計時器,為每個鍵緩衝處理記錄,並將計時器的到期時間設為未來某個使用者設定的時間 T (在 BufferKeyUntilOutputTimestamp 函式中定義)。當 Dataflow 水印通過時間 T 時,這段程式碼會清除緩衝區中時間戳記小於 T 的所有記錄,依提交時間戳記排序這些記錄,並輸出鍵/值組合,其中:

  • 這個鍵是輸入鍵,也就是雜湊至大小為 1000 的 bucket 陣列的主要鍵。
  • 這個值是為鍵緩衝處理的排序資料變更記錄。

我們為每個金鑰提供下列保證:

  • 計時器保證會按照到期時間戳記的順序觸發。
  • 下游階段保證會以產生元素的順序接收元素。

舉例來說,如果鍵的值為 100,計時器會分別在 T1T10 觸發,並在每個時間戳記產生一組資料變更記錄。因為 T1 輸出的資料變更記錄是在 T10 輸出的資料變更記錄之前產生,所以系統保證下一個階段會先收到 T1 輸出的資料變更記錄,再收到 T10 輸出的資料變更記錄。這項機制可確保下游處理作業的每個主鍵都有嚴格的提交時間戳記順序。

這個程序會不斷重複,直到管道結束並處理完所有資料變更記錄為止 (如果未指定結束時間,則會無限期重複)。

請注意,這個程式碼範例使用狀態和計時器 (而非視窗),依鍵執行排序作業。理由是我們無法保證視窗會依序處理。也就是說,較舊的時間區間可能會比最近的時間區間晚處理,導致處理順序錯亂。

BreakRecordByModFn

每筆資料變更記錄可能包含多項修改。每個修訂項目代表對單一主鍵值進行插入、更新或刪除作業。這個函式會將每筆資料變更記錄分成多筆資料變更記錄,每筆記錄對應一個模組。

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

這個函式會接收 DataChangeRecord,並輸出以 Spanner 主鍵為索引的 DataChangeRecord,並將主鍵雜湊為整數值。

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

計時器和緩衝區是依按鍵而定。這個函式會緩衝處理每筆資料變更記錄,直到浮水印通過要輸出緩衝處理資料變更記錄的時間戳記為止。

這段程式碼會使用迴圈計時器,判斷何時要排清緩衝區:

  1. 當系統首次看到某個鍵的資料變更記錄時,會將計時器設為在資料變更記錄的提交時間戳記 + incrementIntervalSeconds (使用者可設定的選項) 時觸發。
  2. 計時器觸發時,系統會將緩衝區中時間戳記小於計時器到期時間的所有資料變更記錄,新增至 recordsToOutput。如果緩衝區有時間戳記大於或等於計時器到期時間的資料變更記錄,系統會將這些資料變更記錄加回緩衝區,而不是輸出。然後將下一個計時器設為目前計時器的到期時間加上 incrementIntervalInSeconds
  3. 如果 recordsToOutput 不為空白,函式會依據修訂時間戳記和交易 ID 排序 recordsToOutput 中的資料變更記錄,然後輸出這些記錄。
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

交易排序

您可以將這個管道變更為依交易 ID 和提交時間戳記排序。如要這麼做,請為每個交易 ID / 提交時間戳記配對緩衝處理記錄,而不是為每個 Spanner 鍵緩衝處理記錄。這需要修改 KeyByIdFn 中的程式碼。

範例:彙整交易

這個程式碼範例會讀取資料變更記錄,將屬於同一交易的所有資料變更記錄組合成單一元素,然後輸出該元素。請注意,這個程式碼範例輸出的交易並非依提交時間戳記排序。

這個程式碼範例會使用緩衝區,從資料變更記錄組裝交易。第一次收到屬於交易的資料變更記錄時,系統會讀取資料變更記錄中的 numberOfRecordsInTransaction 欄位,該欄位會說明屬於該交易的預期資料變更記錄數。系統會緩衝處理屬於該交易的資料變更記錄,直到緩衝處理的記錄數符合 numberOfRecordsInTransaction 為止,然後輸出成套的資料變更記錄。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

這個函式會接收 DataChangeRecord,並輸出以交易 ID 為鍵的 DataChangeRecord

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

TransactionBoundaryFn 會收到 KeyByTransactionIdFn 傳送的鍵/值組合,並根據 TransactionId 分組緩衝區。{TransactionId, DataChangeRecord}當緩衝的記錄數等於整個交易中包含的記錄數時,這個函式會依記錄順序排序群組中的 DataChangeRecord 物件,並輸出 {CommitTimestamp, TransactionId}Iterable<DataChangeRecord> 的鍵/值組合。

這裡假設 SortKey 是使用者定義的類別,代表 {CommitTimestamp, TransactionId} 配對。如要進一步瞭解 SortKey,請參閱範例實作

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

範例:依交易標記篩選

當系統為修改使用者資料的交易加上標記時,對應的代碼和類型會儲存在 DataChangeRecord 中。以下範例說明如何根據使用者定義的交易代碼和系統代碼,篩選變更串流記錄:

my-tx-tag 的使用者定義標記篩選條件:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

系統標記篩選/TTL 稽核:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

範例:擷取完整資料列

這個範例適用於名為 Singer 的 Spanner 資料表,定義如下:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

在變更串流的預設 OLD_AND_NEW_VALUES 值擷取模式下,更新 Spanner 資料列時,收到的資料變更記錄只會包含變更的資料欄。記錄中不會包含追蹤但未變更的資料欄。您可以使用模組的主鍵,在資料變更記錄的認可時間戳記執行 Spanner 快照讀取作業,擷取未變更的資料欄,甚至是擷取完整資料列。

請注意,資料庫保留政策可能需要變更為大於或等於變更串流保留政策的值,快照讀取作業才能成功。

另請注意,建議使用 NEW_ROW 值擷取類型,因為這類型的效率較高,且預設會傳回資料列的所有追蹤欄,不需要額外將快照讀取至 Spanner。

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

這項轉換作業會在收到的每筆記錄的提交時間戳記執行過時讀取作業,並將完整資料列對應至 JSON。

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

這段程式碼會建立 Spanner 資料庫用戶端,執行完整資料列擷取作業,並將工作階段集區設定為只有幾個工作階段,在 ToFullReowJsonFn 的一個執行個體中依序執行讀取作業。Dataflow 會確保產生這個函式的多個執行個體,每個執行個體都有自己的用戶端集區。

範例:從 Spanner 遷移至 Pub/Sub

在此情境中,呼叫端會盡快將記錄串流至 Pub/Sub,不會進行任何分組或彙整作業。這很適合觸發下游處理作業,例如將插入 Spanner 資料表的所有新資料列串流至 Pub/Sub,以進行後續處理。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

請注意,Pub/Sub 接收器可設定為確保「僅限一次」語意。

範例:從 Spanner 到 Cloud Storage

在這個情境中,呼叫端會將特定時間範圍內的所有記錄分組,並將各組記錄儲存在不同的 Cloud Storage 檔案中。這非常適合用於分析和時間點封存,與 Spanner 的保留期限無關。

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

請注意,Cloud Storage 接收器預設提供至少一次的語意。經過額外處理後,即可修改為僅須處理一次的語意。

我們也提供這個用途的 Dataflow 範本:請參閱「將變更串流連線至 Cloud Storage」。

範例:從 Spanner 匯出至 BigQuery (分類帳資料表)

在這裡,呼叫端會將變更記錄串流至 BigQuery。每個資料變更記錄都會反映為 BigQuery 中的一列。這類資料非常適合用於分析。這段程式碼會使用先前在「擷取完整資料列」一節中定義的函式,擷取記錄的完整資料列,並將其寫入 BigQuery。

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

請注意,BigQuery Sink 預設提供「至少一次」語意。經過額外處理後,即可修改為僅須處理一次的語意。

我們也為這個用途提供 Dataflow 範本,請參閱「將變更串流連線至 BigQuery」。

監控管道

有兩類指標可用於監控變更串流 Dataflow 管道。

標準 Dataflow 指標

Dataflow 提供多項指標,確保工作正常運作,例如資料更新間隔、系統延遲、工作輸送量、工作站 CPU 使用率等。詳情請參閱「使用 Monitoring 監控 Dataflow 管道」。

對於變更串流管道,應考量兩項主要指標:系統延遲時間資料更新間隔

系統延遲時間會顯示目前處理或等待處理資料項目的最長時間 (以秒為單位)。

資料更新間隔會顯示目前 (即時) 與輸出浮水印之間的時間間隔。時間 T 的輸出浮水印表示,事件時間 (嚴格) 早於 T 的所有元素都已處理完畢,可進行運算。換句話說,資料更新間隔會評估管道處理收到的事件時,是否為最新狀態。

如果管道資源不足,這兩項指標就會受到影響。項目需要等待較長時間才能處理,因此系統延遲時間會增加。資料更新間隔也會增加,因為管道無法跟上收到的資料量。

自訂變更串流指標

這些指標會顯示在 Cloud Monitoring 中,包括:

  • 記錄在 Spanner 中提交,到連接器將記錄發送到 PCollection 的延遲時間 (以直方圖表示)。這項指標可用於查看管道的任何效能 (延遲) 問題。
  • 讀取的資料記錄總數。這項指標會整體顯示連接器發出的記錄數。這個數字應會持續增加,反映基礎 Spanner 資料庫的寫入趨勢。
  • 正在讀取的分區數量。系統應一律讀取分區。如果這個數字為零,表示管道發生錯誤。
  • 執行連接器期間發出的查詢總數。這項指標會整體顯示管道執行期間,對 Spanner 執行個體發出的變更串流查詢。這項指標可用於估算連接器對 Spanner 資料庫造成的負載。

更新現有管道

如果工作相容性檢查通過,您就能更新使用 SpannerIO 連接器處理變更串流的執行中管道。如要這麼做,更新工作時必須明確設定新工作的 metadata table name 參數。使用要更新的工作中的 metadataTable 管道選項值。

如果您使用 Google 提供的 Dataflow 範本,請使用 spannerMetadataTableName 參數設定資料表名稱。您也可以修改現有工作,在連接器設定中透過 withMetadataTable(your-metadata-table-name) 方法明確使用中繼資料表。完成後,請按照 Dataflow 說明文件中的「啟動取代工作」一節指示,更新執行中的工作。

變更串流和 Dataflow 的最佳做法

以下是使用 Dataflow 建立變更串流連線的最佳做法。

使用獨立的中繼資料庫

建議您為 SpannerIO 連接器建立專屬資料庫,用於儲存中繼資料,而非設定為使用應用程式資料庫。

詳情請參閱「考慮使用獨立的中繼資料資料庫」。

調整叢集大小

Spanner 變更串流作業的初始工作站數量,可根據每秒 1,000 次寫入作業配置一個工作站。請注意,這項預估值可能會因多種因素而異,例如每筆交易的大小、單一交易產生的變更串流記錄數量,以及管道中使用的其他轉換、彙整或接收器。

完成初始資源配置後,請務必追蹤「監控管道」一節中提及的指標,確保管道運作正常。建議您先試用初始工作站集區大小,並監控管道處理負載的方式,視需要增加節點數量。CPU 使用率是檢查負載是否正常,以及是否需要更多節點的重要指標。

已知限制

使用 Dataflow 時,Spanner 變更串流有幾項已知限制:

自動調度資源

如要為包含 SpannerIO.readChangeStream 的任何管道啟用自動調度資源功能,必須使用 Apache Beam 2.39.0 以上版本。

如果您使用 2.39.0 之前的 Apache Beam 版本,包含 SpannerIO.readChangeStream 的管道需要明確指定自動調度資源演算法為 NONE,如「水平自動調度資源」一文所述。

如要手動調度 Dataflow 管道資源,而非使用自動調度資源功能,請參閱「手動調度串流管道資源」。

Runner V2

Spanner 變更串流連接器需要 Dataflow Runner V2。執行期間必須手動指定,否則會擲回錯誤。您可以透過 --experiments=use_unified_worker,use_runner_v2 設定工作,指定 Runner V2

快照

Spanner 變更串流連接器不支援 Dataflow 快照

排除中

Spanner 變更串流連接器不支援排空作業。只能取消現有工作。

你也可以更新現有管道,不必停止管道。

OpenCensus

如要使用 OpenCensus 監控管道,請指定 0.28.3 以上版本。

NullPointerException (管道啟動時)

Apache Beam 2.38.0 版的錯誤可能會導致在特定情況下啟動管道時發生 NullPointerException。這樣一來,工作就無法啟動,而是會顯示以下錯誤訊息:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

如要解決這個問題,請使用 Apache Beam 2.39.0 以上版本,或手動將 beam-sdks-java-core 版本指定為 2.37.0

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

更多資訊