このチュートリアルでは、Bigtable テーブルの変更ストリームをソースとするデータベース変更のリアルタイム ストリーム用のデータ パイプラインを Dataflow にデプロイする方法について説明します。パイプラインの出力は Cloud Storage 上の一連のファイルに書き込まれます。
音楽再生アプリケーションのサンプル データセットを用意しています。このチュートリアルでは、再生された曲を追跡し、一定期間にわたり上位 5 位をランク付けします。
このチュートリアルは、コードの記述と Google Cloudへのデータ パイプラインのデプロイに精通している技術ユーザーを対象としています。
環境を準備する
コードを取得する
サンプルコードを含むリポジトリのクローンを作成します。このリポジトリをすでにダウンロードしている場合は、最新バージョンを pull して取得します。
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
バケットの作成
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME
with a bucket name
that meets the bucket naming requirements.
Bigtable インスタンスを作成する
このチュートリアルでは、既存のインスタンスを使用することも、近くのリージョンにデフォルト構成でインスタンスを作成することもできます。
テーブルを作成する
このサンプル アプリケーションは、ユーザーが再生している曲を追跡し、リッスン イベントを Bigtable に保存します。1 つの列ファミリー(cf)と 1 つの列(曲)があり、行キーにユーザー ID を使用するテーブルを作成し、変更ストリームを有効にします。
テーブルを作成します。
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
次のように置き換えます。
- PROJECT_ID: 使用しているプロジェクトの ID
- BIGTABLE_INSTANCE_ID: 新しいテーブルを含むインスタンスの ID
パイプラインを開始する
このパイプラインは、次の手順で変更ストリームを変換します。
- 変更ストリームを読み取る
- 曲名を取得する
- 曲のリッスン イベントを N 秒のウィンドウにグループ化する
- トップ 5 の曲をカウントする
- 結果を出力する
パイプラインを実行します
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
BIGTABLE_REGION は、Bigtable インスタンスが配置されているリージョンの ID(us-east5
など)に置き換えます。
パイプラインを理解する
パイプラインの次のコード スニペットは、実行中のコードを理解するのに役立ちます。
変更ストリームの読み取り
このサンプルコードでは、特定の Bigtable インスタンスとテーブルのパラメータを使ってソース ストリームを構成しています。
曲名の取得
曲が再生されると、曲名は列ファミリー cf
と列修飾子 song
に書き込まれます。コードは変更ストリームのミューテーションから値を抽出し、パイプラインの次のステップに出力します。
トップ 5 の曲のカウント
組み込みの Beam 関数 Count
と Top.of
を使用して、現在のウィンドウで上位 5 つの曲を取得できます。
結果の出力
このパイプラインは、結果を標準出力とファイルに書き込みます。ファイルの場合は、書き込みが 10 個の要素または 1 分間のセグメントのグループに分割されます。
パイプラインを表示する
Google Cloud コンソールで、[Dataflow] ページに移動します。
song-rank で始まる名前のジョブをクリックします。
画面下部の [表示] をクリックして、ログパネルを開きます。
[ワーカーログ] をクリックして、変更ストリームの出力ログをモニタリングします。
ストリームの書き込み
cbt
CLI を使用して、さまざまなユーザーの曲の再生回数を song-rank
テーブルに書き込みます。これは、時間の経過とともにストリーミングされる曲の再生をシミュレートするように数分間にわたって書き込むように設計されています。
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
出力を表示する
最も人気のある曲を確認するために、Cloud Storage の出力を読み取ります。
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
出力例:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]