このチュートリアルでは、Bigtable テーブルの変更ストリームをソースとするデータベース変更のリアルタイム ストリーム用のデータ パイプラインを Dataflow にデプロイする方法について説明します。パイプラインの出力は Cloud Storage 上の一連のファイルに書き込まれます。
音楽再生アプリケーションのサンプル データセットを用意しています。このチュートリアルでは、再生された曲を追跡し、一定期間にわたり上位 5 位をランク付けします。
このチュートリアルは、コードの記述と Google Cloudへのデータ パイプラインのデプロイに精通している技術ユーザーを対象としています。
目標
このチュートリアルでは、次の方法を説明します。
- 変更ストリームを有効にして Bigtable テーブルを作成する。
- 変更ストリームを変換して出力するパイプラインを Dataflow にデプロイする。
- データ パイプラインの結果を表示する。
費用
このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles. -
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name. - Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles. -
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name. cbtCLI を更新してインストールします。gcloud components update gcloud components install cbt
-
Create a Cloud Storage bucket:
Replacegcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAMEwith a bucket name that meets the bucket naming requirements.Bigtable インスタンスを作成する
このチュートリアルでは、既存のインスタンスを使用することも、近くのリージョンにデフォルト構成でインスタンスを作成することもできます。
テーブルを作成する
このサンプル アプリケーションは、ユーザーが再生している曲を追跡し、リッスン イベントを Bigtable に保存します。1 つの列ファミリー(cf)と 1 つの列(曲)があり、行キーにユーザー ID を使用するテーブルを作成し、変更ストリームを有効にします。
テーブルを作成します。
gcloud bigtable instances tables create song-rank \ --column-families=cf --change-stream-retention-period=7d \ --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID次のように置き換えます。
- PROJECT_ID: 使用しているプロジェクトの ID
- BIGTABLE_INSTANCE_ID: 新しいテーブルを含むインスタンスの ID
パイプラインを開始する
このパイプラインは、次の手順で変更ストリームを変換します。
- 変更ストリームを読み取る
- 曲名を取得する
- 曲のリッスン イベントを N 秒のウィンドウにグループ化する
- トップ 5 の曲をカウントする
- 結果を出力する
パイプラインを実行します
mvn compile exec:java -Dexec.mainClass=SongRank \ "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \ --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \ --outputLocation=gs://BUCKET_NAME/ \ --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"BIGTABLE_REGION は、Bigtable インスタンスが配置されているリージョンの ID(
us-east5など)に置き換えます。パイプラインを理解する
パイプラインの次のコード スニペットは、実行中のコードを理解するのに役立ちます。
変更ストリームの読み取り
このサンプルコードでは、特定の Bigtable インスタンスとテーブルのパラメータを使ってソース ストリームを構成しています。
曲名の取得
曲が再生されると、曲名は列ファミリー
cfと列修飾子songに書き込まれます。コードは変更ストリームのミューテーションから値を抽出し、パイプラインの次のステップに出力します。トップ 5 の曲のカウント
組み込みの Beam 関数
CountとTop.ofを使用して、現在のウィンドウで上位 5 つの曲を取得できます。結果の出力
このパイプラインは、結果を標準出力とファイルに書き込みます。ファイルの場合は、書き込みが 10 個の要素または 1 分間のセグメントのグループに分割されます。
パイプラインを表示する
Google Cloud コンソールで、[Dataflow] ページに移動します。
song-rank で始まる名前のジョブをクリックします。
画面下部の [表示] をクリックして、ログパネルを開きます。
[ワーカーログ] をクリックして、変更ストリームの出力ログをモニタリングします。
ストリームの書き込み
cbtCLI を使用して、さまざまなユーザーの曲の再生回数をsong-rankテーブルに書き込みます。これは、時間の経過とともにストリーミングされる曲の再生をシミュレートするように数分間にわたって書き込むように設計されています。cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \ song-rank song-rank-data.csv column-family=cf batch-size=1出力を表示する
最も人気のある曲を確認するために、Cloud Storage の出力を読み取ります。
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt出力例:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}] 2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}] 2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}] バケットとファイルを削除します。
gcloud storage rm --recursive gs://BUCKET_NAME/テーブルで変更ストリームを無効にします。
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-periodテーブル
song-rankを削除します。cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank変更ストリーム パイプラインを停止します。
ジョブを一覧表示し、ジョブ ID を取得します。
gcloud dataflow jobs list --region=BIGTABLE_REGIONジョブをキャンセルします。
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGIONJOB_ID は、前のコマンドの後に表示されたジョブ ID に置き換えます。
Google Cloud CLI をインストールします。 インストール後、次のコマンドを実行して Google Cloud CLI を初期化します。
gcloud init外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
Create or select a Google Cloud project.
Roles required to select or create a project
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role (roles/serviceusage.serviceUsageAdmin), which contains the
serviceusage.services.enable permission. Learn how to grant
roles.
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Google Cloud CLI をインストールします。 インストール後、次のコマンドを実行して Google Cloud CLI を初期化します。
gcloud init外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
Create or select a Google Cloud project.
Roles required to select or create a project
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role (roles/serviceusage.serviceUsageAdmin), which contains the
serviceusage.services.enable permission. Learn how to grant
roles.
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
環境を準備する
コードを取得する
サンプルコードを含むリポジトリのクローンを作成します。このリポジトリをすでにダウンロードしている場合は、最新バージョンを pull して取得します。
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
バケットの作成
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトを削除する
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID