Dataflow で変更をストリーミングする

Bigtable Beam コネクタを使用すると、Dataflow でコード内のパーティション変更の追跡や処理を行うことなく、Bigtable データ変更レコードを読み取ることができます。これは、このロジックをコネクタが処理するためです。

このドキュメントでは、Bigtable Beam コネクタを構成して使用し、Dataflow パイプラインを使用して変更ストリームを読み取る方法について説明します。このドキュメントを読む前に、変更ストリームの概要Dataflow について理解しておく必要があります。

独自のパイプラインの構築に代わる方法

独自の Dataflow パイプラインを構築しない場合は、次のいずれかのオプションを使用できます。

Google 提供の Dataflow テンプレートを使用できます。

コードの出発点として、Bigtable のチュートリアルまたはクイックスタートのコードサンプルを利用することもできます。

生成するコードが google cloud libraries-bom バージョン 26.14.0 以降を使用していることを確認します。

コネクタの詳細

Bigtable Beam コネクタ メソッド(BigtableIO.readChangeStream)を使用すると、処理可能なデータ変更レコード(ChangeStreamMutation)のストリームを読み取ることができます。Bigtable Beam コネクタは、Apache Beam GitHub リポジトリのコンポーネントです。コネクタコードの説明については、BigtableIO.java のコメントをご覧ください。

Beam バージョン 2.48.0 以降のコネクタを使用する必要があります。Apache Beam ランタイムのサポートを確認して、サポートされているバージョンの Java を使用していることを確認します。コネクタを使用するパイプラインを Dataflow にデプロイできます。これは、リソースのプロビジョニングと管理を行い、ストリーム データ処理のスケーラビリティと信頼性をサポートします。

Apache Beam プログラミング モデルの詳細については、Beam のドキュメントをご覧ください。

イベント時間のないデータのグループ化

Bigtable Beam コネクタを使用してストリーミングされるデータ変更レコードは、イベント時間に依存する Dataflow 関数と互換性がありません。

レプリケーションとウォーターマークで説明したように、パーティションのレプリケーションがインスタンスの残りの部分に追いついていない場合、低ウォーターマークは前に進まなくなります。低ウォーターマークが進まなくなると、変更ストリームが停止する可能性があります。

ストリームの停滞を防ぐため、Bigtable Beam コネクタは出力タイムスタンプをゼロにして、すべてのデータを出力します。Dataflow では、ゼロのタイムスタンプを持つデータ変更レコードはすべて遅延データとみなされます。そのため、イベント時間に依存する Dataflow 機能には、Bigtable 変更ストリームと互換性がありません。たとえば、ウィンドウ関数イベント時間トリガーイベント時間タイマーは使用できません。

代わりに、チュートリアルの例に示されているように、非イベント時間トリガーで GlobalWindows を使用して、この遅延データをペインにグループ化できます。トリガーとペインの詳細については、Beam プログラミング ガイドのトリガーをご覧ください。

自動スケーリング

コネクタは Dataflow 自動スケーリングをサポートしています。Runner v2 を使用する場合、これはデフォルトで有効になっています(必須)。Dataflow の自動スケーリング アルゴリズムは、推定された変更ストリームのバックログを考慮します。これは、Dataflow モニタリング ページの Backlog セクションでモニタリングできます。ワーカー数の上限を設定するジョブをデプロイする場合は、--maxNumWorkers フラグを使用します。

自動スケーリングを使用する代わりにパイプラインを手動でスケーリングするには、ストリーミング パイプラインを手動でスケーリングするをご覧ください。

制限事項

Dataflow で Bigtable Beam コネクタを使用する前に、次の制限事項を確認してください。

Dataflow Runner V2

コネクタを実行するには、Dataflow Runner v2 を使用する必要があります。これを有効にするには、コマンドライン引数で --experiments=use_runner_v2 を指定します。Runner v1 で実行すると、次の例外によりパイプラインが失敗します。

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

スナップショット

コネクタは Dataflow のスナップショットをサポートしていません。

重複

Bigtable Beam コネクタは、各行キーと各クラスターの変更をコミット タイムスタンプ順にストリーミングしますが、ストリーム内の以前の時点から再開されることがあるため、重複が発生する可能性があります。

パイプラインの再起動

Dataflow パイプラインが長時間停止している場合、データ変更レコードが保持境界から外れる可能性があります。パイプラインが再開されると、Bigtable はパイプラインを失敗させます。これにより、保持期間内の新しいリクエスト開始時刻で新しいパイプラインを開始できます。Bigtable は、元のパイプラインのリクエスト時間をサイレントに進めるのではなく、これを行うことで、指定された保持期間外のタイムスタンプを持つデータ変更レコードが意図せず削除されるのを防ぎます。

始める前に

コネクタを使用する前に、次の前提条件を満たしてください。

認証を設定する

ローカル開発環境でこのページの Java サンプルを使用するには、gcloud CLI をインストールして初期化し、ユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定します。

  1. Google Cloud CLI をインストールします。

  2. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  3. ローカルシェルを使用している場合は、ユーザー アカウントのローカル認証情報を作成します。

    gcloud auth application-default login

    Cloud Shell を使用している場合は、この操作を行う必要はありません。

    認証エラーが返され、外部 ID プロバイダ(IdP)を使用している場合は、 連携 ID を使用して gcloud CLI にログインしていることを確認します。

詳細については、 ローカル開発環境の認証を設定するをご覧ください。

本番環境での認証の設定については、 Google Cloudで実行されるコードのアプリケーションのデフォルト認証情報を設定する をご覧ください。

変更ストリームを有効にする

変更ストリームを読み取るには、まずテーブルで変更ストリームを有効にする必要があります。新しいテーブルを作成して、変更ストリームを有効にすることもできます。

変更ストリーム メタデータ テーブル

Dataflow を使用して変更をストリーミングすると、Bigtable Beam コネクタによって、デフォルトで __change_stream_md_table という名前のメタデータ テーブルが作成されます。変更ストリームのメタデータ テーブルは、コネクタのオペレーション状態を管理し、データ変更レコードに関するメタデータを保存します。

デフォルトでは、コネクタはストリーミングされるテーブルと同じインスタンスにテーブルを作成します。テーブルが正しく機能するようにするには、メタデータ テーブルのアプリ プロファイルで単一クラスタ ルーティングを使用し、単一行のトランザクションを有効にする必要があります。

Cloud Bigtable コネクタを使用して Bigtable から変更をストリーミングする方法の詳細については、BigtableIO のドキュメントをご覧ください。

必要なロール

Dataflow を使用して Bigtable 変更ストリームの読み取りに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

Bigtable から変更を読み取るには、次のロールが必要です。

  • 変更のストリーミング元となるテーブルを含む Bigtable インスタンスの Bigtable 管理者(roles/bigtable.admin

Dataflow ジョブを実行するには、次のロールが必要です。

ロールの付与の詳細については、アクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

Bigtable Beam コネクタを依存関係として追加する

Maven pom.xml ファイルに次の依存関係のようなコードを追加します。バージョンは 2.48.0 以降にする必要があります。

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

変更ストリームを読み取る

データ変更レコードを読み取る Dataflow パイプラインを構築するには、コネクタを構成してから、変換とシンクを追加します。次に、コネクタを使用して、Beam パイプラインの ChangeStreamMutation オブジェクトを読み込みます。

このセクションの Java で記述されたコードサンプルは、パイプラインを作成して使用し Key-Value ペアを文字列に変換する方法を示しています。各ペアは、行キーと ChangeStreamMutation オブジェクトで構成されます。このパイプラインは、各オブジェクトのエントリをカンマ区切りの文字列に変換します。

パイプラインをビルドする

次の Java のコードサンプルは、パイプラインの構築方法を示しています。

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

データ変更レコードを処理する

このサンプルは、行のデータ変更レコード内のエントリをすべてループ処理し、エントリのタイプに基づいて文字列の変換メソッドを呼び出す方法を示しています。

データ変更レコードに含めることができるエントリタイプのリストについては、データ変更レコードの内容をご覧ください。

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

このサンプルでは、書き込みエントリが変換されています。

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

この例では、セルの削除のエントリが変換されます。

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

このサンプルでは、列ファミリーの削除エントリが変換されています。


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

モニタリング

Google Cloud コンソールの次のリソースでは、Dataflow パイプラインを実行して Bigtable の変更ストリームを読み取る際にGoogle Cloud リソースをモニタリングできます。

特に、次の指標を確認してください。

  • Bigtable のシステム分析情報ページで、次の指標を確認します。
    • 指標 cpu_load_by_app_profile_by_method_by_table変更ストリーム別の CPU 使用率データ。変更ストリームがクラスタの CPU 使用率に与える影響を示します。
    • 変更ストリームのストレージ使用量(バイト)change_stream_log_used_bytes
  • Dataflow のモニタリング ページで、データの更新速度を確認します。この指標は、現在の時刻とウォーターマークの差を示します。通常は 2 分程度ですが、急激な増大により 1 ~ 2 分長くなる場合もあります。データの更新速度からは、データ変更レコードの処理速度が遅いかどうかはわかりません。重要なアプリケーションの健全性とパフォーマンスを継続的に維持するには、Dataflow のデータ鮮度指標をモニタリングし、次の操作を行います。

    • データの更新速度の指標がしきい値を常に上回っている場合は、パイプラインのリソースが不足している可能性があります。Dataflow ワーカーを追加することをおすすめします。
    • Dataflow ワーカーが適切にプロビジョニングされているにもかかわらず、データの更新速度が上昇しているか、常に高い場合は、Google Cloud サポートにお問い合わせください。
  • Dataflow processing_delay_from_commit_timestamp_MEAN 指標は、ジョブの存続期間中のデータ変更レコードの平均処理時間を示します。

Bigtable の server/latencies 指標は、Bigtable 変更ストリームを読み取っている時のデータ変更レコードの処理遅延ではなく、ストリーミング リクエストの継続時間を反映するため、Dataflow パイプラインを監視する場合には役に立ちません。変更ストリームでレイテンシが高くても、リクエストの処理が遅いことを意味するわけではありません。接続が長時間オープン状態だったことを意味します。

次のステップ