Bigtable 変更ストリームを処理する

このチュートリアルでは、Bigtable テーブルの変更ストリームをソースとするデータベース変更のリアルタイム ストリーム用のデータ パイプラインを Dataflow にデプロイする方法について説明します。パイプラインの出力は Cloud Storage 上の一連のファイルに書き込まれます。

音楽再生アプリケーションのサンプル データセットを用意しています。このチュートリアルでは、再生された曲を追跡し、一定期間にわたり上位 5 位をランク付けします。

このチュートリアルは、コードの記述と Google Cloudへのデータ パイプラインのデプロイに精通している技術ユーザーを対象としています。

環境を準備する

コードを取得する

サンプルコードを含むリポジトリのクローンを作成します。このリポジトリをすでにダウンロードしている場合は、最新バージョンを pull して取得します。

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

バケットの作成

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    Bigtable インスタンスを作成する

    このチュートリアルでは、既存のインスタンスを使用することも、近くのリージョンにデフォルト構成でインスタンスを作成することもできます。

    テーブルを作成する

    このサンプル アプリケーションは、ユーザーが再生している曲を追跡し、リッスン イベントを Bigtable に保存します。1 つの列ファミリー(cf)と 1 つの列(曲)があり、行キーにユーザー ID を使用するテーブルを作成し、変更ストリームを有効にします。

    テーブルを作成します。

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    次のように置き換えます。

    • PROJECT_ID: 使用しているプロジェクトの ID
    • BIGTABLE_INSTANCE_ID: 新しいテーブルを含むインスタンスの ID

    パイプラインを開始する

    このパイプラインは、次の手順で変更ストリームを変換します。

    1. 変更ストリームを読み取る
    2. 曲名を取得する
    3. 曲のリッスン イベントを N 秒のウィンドウにグループ化する
    4. トップ 5 の曲をカウントする
    5. 結果を出力する

    パイプラインを実行します

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    BIGTABLE_REGION は、Bigtable インスタンスが配置されているリージョンの ID(us-east5 など)に置き換えます。

    パイプラインを理解する

    パイプラインの次のコード スニペットは、実行中のコードを理解するのに役立ちます。

    変更ストリームの読み取り

    このサンプルコードでは、特定の Bigtable インスタンスとテーブルのパラメータを使ってソース ストリームを構成しています。

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    曲名の取得

    曲が再生されると、曲名は列ファミリー cf と列修飾子 song に書き込まれます。コードは変更ストリームのミューテーションから値を抽出し、パイプラインの次のステップに出力します。

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    トップ 5 の曲のカウント

    組み込みの Beam 関数 CountTop.of を使用して、現在のウィンドウで上位 5 つの曲を取得できます。

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    結果の出力

    このパイプラインは、結果を標準出力とファイルに書き込みます。ファイルの場合は、書き込みが 10 個の要素または 1 分間のセグメントのグループに分割されます。

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    パイプラインを表示する

    1. Google Cloud コンソールで、[Dataflow] ページに移動します。

      Dataflow に移動

    2. song-rank で始まる名前のジョブをクリックします。

    3. 画面下部の [表示] をクリックして、ログパネルを開きます。

    4. [ワーカーログ] をクリックして、変更ストリームの出力ログをモニタリングします。

    ストリームの書き込み

    cbt CLI を使用して、さまざまなユーザーの曲の再生回数を song-rank テーブルに書き込みます。これは、時間の経過とともにストリーミングされる曲の再生をシミュレートするように数分間にわたって書き込むように設計されています。

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    出力を表示する

    最も人気のある曲を確認するために、Cloud Storage の出力を読み取ります。

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    出力例:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]