Dataflow と Cloud Storage を使用して Pub/Sub からメッセージをストリーミングする
Dataflow は、ストリーミング(リアルタイム)モードのデータとバッチモードのデータを同等の信頼性と表現力で変換、活用するフルマネージド サービスです。Apache Beam SDK を使用して、簡素化されたパイプライン開発環境を提供します。Apache Beam SDK は、ウィンドウ処理とセッション分析のプリミティブが豊富に用意されているだけでなく、ソースとシンクのコネクタからなるエコシステムも提供しています。このクイックスタートでは、Dataflow を使用して次の操作を行う方法を説明します。
- Pub/Sub トピックにパブリッシュされたメッセージを読む
- タイムスタンプごとにメッセージをウィンドウ処理(グループ化)する
- Cloud Storage にメッセージを書き込む
このクイックスタートでは、Java と Python で Dataflow を使用する方法について説明します。SQL もサポートされます。このクイックスタートは、ご利用を開始していただくにあたり一時的な認証情報を提供する Google Cloud Skills Boost チュートリアルとしても掲載しています。
カスタムデータ処理を行う予定がない場合は、UI ベースの Dataflow テンプレートを使用することもできます。
始める前に
- Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します。
gcloud init -
Google Cloud プロジェクトを作成または選択します。
プロジェクトの選択または作成に必要なロール
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、
resourcemanager.projects.create権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。
Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Storage JSON API、Pub/Sub、Resource Manager、Cloud Scheduler の各 API を有効にします。 Google Cloud
API を有効にするために必要なロール
API を有効にするには、
serviceusage.services.enable権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する。gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
認証を設定します。
-
サービス アカウントの作成 IAM ロール(
roles/iam.serviceAccountCreator)とプロジェクト IAM 管理者ロール(roles/resourcemanager.projectIamAdmin)があることを確認します。ロールを付与する方法を確認する。 -
サービス アカウントを作成します。
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
SERVICE_ACCOUNT_NAMEをサービス アカウントの名前に置き換えます。 -
サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
以下を置き換えます。
SERVICE_ACCOUNT_NAME: サービス アカウントの名前PROJECT_ID: サービス アカウントを作成したプロジェクト IDROLE: 付与するロール
-
サービス アカウントを他のリソースに関連付けるプリンシパルに必要なロールを付与します。
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
次のように置き換えます。
SERVICE_ACCOUNT_NAME: サービス アカウントの名前PROJECT_ID: サービス アカウントを作成したプロジェクト IDUSER_EMAIL: Google アカウントのメールアドレス
-
サービス アカウントの作成 IAM ロール(
-
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します。
gcloud init -
Google Cloud プロジェクトを作成または選択します。
プロジェクトの選択または作成に必要なロール
- プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
-
プロジェクトを作成する: プロジェクトを作成するには、
resourcemanager.projects.create権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_IDは、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_IDは、 Google Cloud プロジェクトの名前に置き換えます。
Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Storage JSON API、Pub/Sub、Resource Manager、Cloud Scheduler の各 API を有効にします。 Google Cloud
API を有効にするために必要なロール
API を有効にするには、
serviceusage.services.enable権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する。gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
認証を設定します。
-
サービス アカウントの作成 IAM ロール(
roles/iam.serviceAccountCreator)とプロジェクト IAM 管理者ロール(roles/resourcemanager.projectIamAdmin)があることを確認します。ロールを付与する方法を確認する。 -
サービス アカウントを作成します。
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
SERVICE_ACCOUNT_NAMEをサービス アカウントの名前に置き換えます。 -
サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
以下を置き換えます。
SERVICE_ACCOUNT_NAME: サービス アカウントの名前PROJECT_ID: サービス アカウントを作成したプロジェクト IDROLE: 付与するロール
-
サービス アカウントを他のリソースに関連付けるプリンシパルに必要なロールを付与します。
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
次のように置き換えます。
SERVICE_ACCOUNT_NAME: サービス アカウントの名前PROJECT_ID: サービス アカウントを作成したプロジェクト IDUSER_EMAIL: Google アカウントのメールアドレス
-
サービス アカウントの作成 IAM ロール(
-
ユーザー アカウントのローカル認証情報を作成します。
gcloud auth application-default login
認証エラーが返され、外部 ID プロバイダ(IdP)を使用している場合は、 連携 ID を使用して gcloud CLI にログインしていることを確認します。
Pub/Sub プロジェクトを設定する
-
バケット、プロジェクト、およびリージョンの変数を作成します。 Cloud Storage のバケット名は、グローバルに一意である必要がある。 このクイックスタートでコマンドを実行する場所に近接した Dataflow リージョンを選択します。
REGION変数の値は有効なリージョン名にする必要があります。リージョンとロケーションの詳細については、Dataflow のロケーションをご覧ください。BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
このプロジェクトが所有する Cloud Storage バケットを作成します。
gcloud storage buckets create gs://$BUCKET_NAME
-
このプロジェクトで Pub/Sub トピックを作成します。
gcloud pubsub topics create $TOPIC_ID
-
このプロジェクトで Cloud Scheduler ジョブを作成します。このジョブは、1 分間隔で Cloud Pub/Sub トピックにメッセージをパブリッシュします。
App Engine アプリがプロジェクトに存在しない場合は、この手順で作成されます。
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
ジョブを開始します。
gcloud scheduler jobs run publisher-job --location=$REGION
-
次のコマンドを使用して、クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Pub/Sub から Cloud Storage へのメッセージのストリーミング
コードサンプル
このサンプルコードでは、Dataflow を使用して次のことを行います。
- Pub/Sub メッセージを読み取ります。
- パブリッシュ タイムスタンプにより、固定サイズの間隔でメッセージをウィンドウ処理(グループ化)します。
各ウィンドウのメッセージを Cloud Storage のファイルに書き込みます。
Java
Python
パイプラインの開始
パイプラインを開始するには、次のコマンドを実行します。
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
上記のコマンドがローカルで実行され、クラウドで実行される Dataflow ジョブを起動します。コマンドが JOB_MESSAGE_DETAILED: Workers
have started successfully を返したら、Ctrl+C を使用してローカル プログラムを終了します。
ジョブとパイプラインの進行状況の確認
ジョブの進行状況は Dataflow コンソールで確認できます。
[ジョブの詳細] ビューを開いて、次の情報を確認します。
- ジョブの構成
- ジョブのログ
- ステージ指標
Cloud Storage に出力ファイルが表示されるまで数分間かかる場合があります。
または、以下のコマンドラインを使用して、書き出されたファイルを確認します。
gcloud storage ls gs://${BUCKET_NAME}/samples/
出力は次のようになります。
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1クリーンアップ
このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、リソースを含む Google Cloud プロジェクトを削除します。
Cloud Scheduler ジョブを削除します。
gcloud scheduler jobs delete publisher-job --location=$REGION
Dataflow コンソールで、ジョブを停止します。パイプラインをドレインせずにキャンセルします。
トピックを削除します。
gcloud pubsub topics delete $TOPIC_ID
パイプラインによって作成されたファイルを削除します。
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Cloud Storage バケットを削除します。
gcloud storage rm gs://${BUCKET_NAME} --recursive
- サービス アカウントを削除します。
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。
gcloud auth application-default revoke
-
(省略可)gcloud CLI から認証情報を取り消します。
gcloud auth revoke
次のステップ
カスタム タイムスタンプで Pub/Sub メッセージをウィンドウ処理する場合は、タイムスタンプを Pub/Sub メッセージ内の属性として指定し、そのカスタム タイムスタンプを PubsubIO の
withTimestampAttributeで使用できます。Google のストリーミング用に設計されたオープンソースの Dataflow テンプレートをご覧ください。
Dataflow と Pub/Sub の統合方法の詳細を確認します。
Pub/Sub から読み取り、Dataflow Flex テンプレートを使用して BigQuery に書き込むこちらのチュートリアルをご覧ください。
ウィンドウ処理の詳細については、Apache Beam モバイルゲーム パイプラインの例をご覧ください。