このチュートリアルでは、Bigtable テーブルの変更ストリームをソースとするデータベース変更のリアルタイム ストリーム用のデータ パイプラインを Dataflow にデプロイする方法について説明します。パイプラインの出力は Cloud Storage 上の一連のファイルに書き込まれます。
音楽再生アプリケーションのサンプル データセットを用意しています。このチュートリアルでは、再生された曲を追跡し、一定期間にわたり上位 5 位をランク付けします。
このチュートリアルは、コードの記述と Google Cloudへのデータ パイプラインのデプロイに精通している技術ユーザーを対象としています。
目標
このチュートリアルでは、次の方法を説明します。
- 変更ストリームを有効にして Bigtable テーブルを作成する。
- 変更ストリームを変換して出力するパイプラインを Dataflow にデプロイする。
- データ パイプラインの結果を表示する。
費用
このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、
resourcemanager.projects.create権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する。 -
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。 - プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、
resourcemanager.projects.create権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する。 -
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。 cbtCLI を更新してインストールします。gcloud components update gcloud components install cbt
Google Cloud CLI をインストールします。インストール後、次のコマンドを実行して Google Cloud CLI を初期化します。
gcloud init外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
Google Cloud プロジェクトを作成または選択します。
プロジェクトの選択または作成に必要なロール
Google Cloud プロジェクトに対して課金が有効になっていることを確認します。
Dataflow、Cloud Bigtable API、Cloud Bigtable Admin API、Cloud Storage API を有効にします。
API を有効にするために必要なロール
API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する。
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Google Cloud CLI をインストールします。インストール後、次のコマンドを実行して Google Cloud CLI を初期化します。
gcloud init外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
Google Cloud プロジェクトを作成または選択します。
プロジェクトの選択または作成に必要なロール
Google Cloud プロジェクトに対して課金が有効になっていることを確認します。
Dataflow、Cloud Bigtable API、Cloud Bigtable Admin API、Cloud Storage API を有効にします。
API を有効にするために必要なロール
API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する。
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
環境を準備する
コードを取得する
サンプルコードを含むリポジトリのクローンを作成します。このリポジトリをすでにダウンロードしている場合は、最新バージョンを pull して取得します。
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
バケットの作成
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME は、バケット名の要件を満たすバケット名に置き換えます。Bigtable インスタンスを作成する
このチュートリアルでは、既存のインスタンスを使用することも、近くのリージョンにデフォルト構成でインスタンスを作成することもできます。
テーブルを作成する
このサンプル アプリケーションは、ユーザーが再生している曲を追跡し、リッスン イベントを Bigtable に保存します。1 つの列ファミリー(cf)と 1 つの列(曲)があり、行キーにユーザー ID を使用するテーブルを作成し、変更ストリームを有効にします。
テーブルを作成します。
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
次のように置き換えます。
- PROJECT_ID: 使用しているプロジェクトの ID
- BIGTABLE_INSTANCE_ID: 新しいテーブルを含むインスタンスの ID
パイプラインを開始する
このパイプラインは、次の手順で変更ストリームを変換します。
- 変更ストリームを読み取る
- 曲名を取得する
- 曲のリッスン イベントを N 秒のウィンドウにグループ化する
- トップ 5 の曲をカウントする
- 結果を出力する
パイプラインを実行します
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
BIGTABLE_REGION は、Bigtable インスタンスが配置されているリージョンの ID(us-east5 など)に置き換えます。
パイプラインを理解する
パイプラインの次のコード スニペットは、実行中のコードを理解するのに役立ちます。
変更ストリームの読み取り
このサンプルコードでは、特定の Bigtable インスタンスとテーブルのパラメータを使ってソース ストリームを構成しています。
曲名の取得
曲が再生されると、曲名は列ファミリー cf と列修飾子 song に書き込まれます。コードは変更ストリームのミューテーションから値を抽出し、パイプラインの次のステップに出力します。
トップ 5 の曲のカウント
組み込みの Beam 関数 Count と Top.of を使用して、現在のウィンドウで上位 5 つの曲を取得できます。
結果の出力
このパイプラインは、結果を標準出力とファイルに書き込みます。ファイルの場合は、書き込みが 10 個の要素または 1 分間のセグメントのグループに分割されます。
パイプラインを表示する
Google Cloud コンソールで、[Dataflow] ページに移動します。
song-rank で始まる名前のジョブをクリックします。
画面下部の [表示] をクリックして、ログパネルを開きます。
[ワーカーログ] をクリックして、変更ストリームの出力ログをモニタリングします。
ストリームの書き込み
cbt CLI を使用して、さまざまなユーザーの曲の再生回数を song-rank テーブルに書き込みます。これは、時間の経過とともにストリーミングされる曲の再生をシミュレートするように数分間にわたって書き込むように設計されています。
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
出力を表示する
最も人気のある曲を確認するために、Cloud Storage の出力を読み取ります。
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
出力例:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトを削除する
Google Cloud プロジェクトを削除します。
gcloud projects delete PROJECT_ID
リソースを個別に削除する
バケットとファイルを削除します。
gcloud storage rm --recursive gs://BUCKET_NAME/テーブルで変更ストリームを無効にします。
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-periodテーブル
song-rankを削除します。cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank変更ストリーム パイプラインを停止します。
ジョブを一覧表示し、ジョブ ID を取得します。
gcloud dataflow jobs list --region=BIGTABLE_REGIONジョブをキャンセルします。
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGIONJOB_ID は、前のコマンドの後に表示されたジョブ ID に置き換えます。