このチュートリアルでは、Eventarc を使用して、一般公開 BigQuery データセットに対するクエリのスケジューリング、データに基づいたグラフの生成、メールによるグラフのリンクの共有を行う処理パイプラインを構築する方法について説明します。
SendGrid API キーを作成する
SendGrid はクラウドベースのメール プロバイダで、メールサーバーを維持せずにメールを送信できます。
- SendGrid にログインし、[Settings] > [API Keys] の順に移動します。
- [Create API Key] をクリックします。
- キーの権限を選択します。メールを送信するには、キーに少なくとも Mail send(メール送信)の権限が必要です。
- [Save] をクリックして、キーを作成します。
- SendGrid によって、新しいキーが生成されます。これは、キーの唯一のコピーであるため、後で使用できるようにキーをコピーして保存してください。
GKE クラスタを作成する
Workload Identity Federation for GKE を有効にしてクラスタを作成し、GKE 内で実行されているアプリケーションから Google Cloud サービスにアクセスできるようにします。Eventarc を使用してイベントを転送するには、Workload Identity Federation for GKE も必要です。
CloudRun
、HttpLoadBalancing
、HorizontalPodAutoscaling
の各アドオンを有効にした Knative serving 用の GKE クラスタを作成します。gcloud beta container clusters create $CLUSTER_NAME \ --addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \ --machine-type=n1-standard-4 \ --enable-autoscaling --min-nodes=2 --max-nodes=10 \ --no-issue-client-certificate --num-nodes=2 \ --logging=SYSTEM,WORKLOAD \ --monitoring=SYSTEM \ --scopes=cloud-platform,logging-write,monitoring-write,pubsub \ --zone us-central1 \ --release-channel=rapid \ --workload-pool=$PROJECT_ID.svc.id.goog
クラスタの作成が完了するまで数分待ちます。処理中に、無視しても問題のない警告が表示されることがあります。クラスタが作成されると、出力は次のようになります。
Creating cluster ...done. Created [https://container.googleapis.com/v1beta1/projects/my-project/zones/us-central1/clusters/events-cluster].
Docker コンテナ イメージを保存する Artifact Registry 標準リポジトリを作成します。
gcloud artifacts repositories create REPOSITORY \ --repository-format=docker \ --location=$CLUSTER_LOCATION
REPOSITORY
は、リポジトリの一意の名前に置き換えます。
GKE サービス アカウントを構成する
デフォルトのコンピューティング サービス アカウントとして機能する GKE サービス アカウントを構成します。
サービス アカウント間に Identity and Access Management(IAM)バインディングを作成します。
PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')" gcloud iam service-accounts add-iam-policy-binding \ --role roles/iam.workloadIdentityUser \ --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/default]" \ $PROJECT_NUMBER-compute@developer.gserviceaccount.com
コンピューティング サービス アカウントのメールアドレスを使用して、
iam.gke.io/gcp-service-account
アノテーションを GKE サービス アカウントに追加します。kubectl annotate serviceaccount \ --namespace default \ default \ iam.gke.io/gcp-service-account=$PROJECT_NUMBER-compute@developer.gserviceaccount.com
GKE の宛先を有効にする
Eventarc が GKE クラスタ内のリソースを管理できるようにするには、GKE の宛先を有効にし、Eventarc サービス アカウントを必要なロールにバインドします。
Eventarc 用に GKE の宛先を有効にします。
gcloud eventarc gke-destinations init
必要なロールをバインドするよう求められたら、「
y
」と入力します。次のロールがバインドされます。
roles/compute.viewer
roles/container.developer
roles/iam.serviceAccountAdmin
サービス アカウントを作成してアクセスロールをバインドする
Eventarc トリガーを作成する前に、ユーザー管理のサービス アカウントを設定して特定のロールを付与し、Eventarc が Pub/Sub イベントを転送できるようにします。
TRIGGER_GSA
という名前のサービス アカウントを作成します。TRIGGER_GSA=eventarc-bigquery-triggers gcloud iam service-accounts create $TRIGGER_GSA
サービス アカウントに
pubsub.subscriber
、monitoring.metricWriter
、eventarc.eventReceiver
のロールを付与します。PROJECT_ID=$(gcloud config get-value project) gcloud projects add-iam-policy-binding $PROJECT_ID \ --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \ --role "roles/pubsub.subscriber" gcloud projects add-iam-policy-binding $PROJECT_ID \ --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \ --role "roles/monitoring.metricWriter" gcloud projects add-iam-policy-binding $PROJECT_ID \ --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \ --role "roles/eventarc.eventReceiver"
Cloud Storage バケットを作成する
グラフを保存する Cloud Storage バケットを作成します。バケットとグラフが GKE サービスと同じリージョンで一般公開されていることを確認します。
export BUCKET="$(gcloud config get-value core/project)-charts" gcloud storage buckets create gs://${BUCKET} --location=$(gcloud config get-value run/region) gcloud storage buckets update gs://${BUCKET} --uniform-bucket-level-access gcloud storage buckets add-iam-policy-binding gs://${BUCKET} --member=allUsers --role=roles/storage.objectViewer
リポジトリのクローンを作成する
GitHub リポジトリのクローンを作成します。
git clone https://github.com/GoogleCloudPlatform/eventarc-samples cd eventarc-samples/processing-pipelines
Notifier サービスをデプロイする
bigquery/notifier/python
ディレクトリから、Knative serving サービスをデプロイします。このサービスは、Chart Creator イベントを受信し、SendGrid を使用して、生成されたグラフへのリンクをメールで送信します。
コンテナ イメージをビルドして push します。
pushd bigquery/notifier/python export SERVICE_NAME=notifier docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 . docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 popd
コンテナ イメージを Knative serving にデプロイし、メールの送信先アドレスと SendGrid API キーを渡します。
export TO_EMAILS=EMAIL_ADDRESS export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY gcloud run deploy ${SERVICE_NAME} \ --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \ --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET}
次のように置き換えます。
EMAIL_ADDRESS
: 生成されたグラフのリンクを送信するメールアドレスYOUR_SENDGRID_API_KEY
: 前にメモした SendGrid API キー
サービス URL が表示されたら、デプロイは完了しています。
Notifier サービスのトリガーを作成する
Knative serving にデプロイされた Notifier サービスの Eventarc トリガーで、methodName が storage.objects.create
の Cloud Storage 監査ログをフィルタします。
トリガーを作成します。
gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \ --destination-gke-cluster=$CLUSTER_NAME \ --destination-gke-location=$CLUSTER_LOCATION \ --destination-gke-namespace=default \ --destination-gke-service=$SERVICE_NAME \ --destination-gke-path=/ \ --event-filters="type=google.cloud.audit.log.v1.written" \ --event-filters="serviceName=storage.googleapis.com" \ --event-filters="methodName=storage.objects.create" \ --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com
これにより、
trigger-notifier-gke
というトリガーが作成されます。
Chart Creator サービスをデプロイする
bigquery/chart-creator/python
ディレクトリで、Query Runner イベントを受信し、特定の国の BigQuery テーブルからデータを取得して、そのデータから Matplotlib を使用してグラフを生成する Knative serving サービスをデプロイします。グラフが Cloud Storage バケットにアップロードされます。
コンテナ イメージをビルドして push します。
pushd bigquery/chart-creator/python export SERVICE_NAME=chart-creator docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 . docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 popd
BUCKET
を渡してコンテナ イメージを Knative serving にデプロイします。gcloud run deploy ${SERVICE_NAME} \ --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \ --update-env-vars BUCKET=${BUCKET}
サービス URL が表示されたら、デプロイは完了しています。
Chart Creator サービスのトリガーを作成する
Knative serving にデプロイされた Chart Creator サービスの Eventarc トリガーで、Pub/Sub トピックに公開されたメッセージをフィルタします。
トリガーを作成します。
gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \ --destination-gke-cluster=$CLUSTER_NAME \ --destination-gke-location=$CLUSTER_LOCATION \ --destination-gke-namespace=default \ --destination-gke-service=$SERVICE_NAME \ --destination-gke-path=/ \ --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \ --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com
これにより、
trigger-chart-creator-gke
というトリガーが作成されます。Pub/Sub トピックの環境変数を設定します。
export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)'))
Query Runner サービスをデプロイする
processing-pipelines
ディレクトリで、Cloud Scheduler イベントを受信し、一般公開の COVID-19 データセットからデータを取得して、結果を新しい BigQuery テーブルに保存する Knative serving サービスをデプロイします。
コンテナ イメージをビルドして push します。
export SERVICE_NAME=query-runner docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 -f Dockerfile . docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
PROJECT_ID
とTOPIC_QUERY_COMPLETED
を渡してコンテナ イメージを Knative serving にデプロイします。gcloud run deploy ${SERVICE_NAME} \ --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \ --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED}
サービス URL が表示されたら、デプロイは完了しています。
Query Runner サービスのトリガーを作成する
Knative serving にデプロイされた Query Runner サービスの Eventarc トリガーで、Pub/Sub トピックに公開されたメッセージをフィルタします。
トリガーを作成します。
gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \ --destination-gke-cluster=$CLUSTER_NAME \ --destination-gke-location=$CLUSTER_LOCATION \ --destination-gke-namespace=default \ --destination-gke-service=$SERVICE_NAME \ --destination-gke-path=/ \ --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \ --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com
これにより、
trigger-query-runner-gke
というトリガーが作成されます。Pub/Sub トピックの環境変数を設定します。
export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)')
ジョブのスケジュール設定
処理パイプラインは、2 つの Cloud Scheduler ジョブによってトリガーされます。
Cloud Scheduler に必要な App Engine アプリを作成し、適切なロケーション(
europe-west
など)を指定します。export APP_ENGINE_LOCATION=LOCATION gcloud app create --region=${APP_ENGINE_LOCATION}
1 日に 1 回 Pub/Sub トピックに公開する Cloud Scheduler ジョブを 2 つ作成します。
gcloud scheduler jobs create pubsub cre-scheduler-uk \ --schedule="0 16 * * *" \ --topic=${TOPIC_QUERY_SCHEDULED} \ --message-body="United Kingdom"
gcloud scheduler jobs create pubsub cre-scheduler-cy \ --schedule="0 17 * * *" \ --topic=${TOPIC_QUERY_SCHEDULED} \ --message-body="Cyprus"
スケジュールは unix-cron 形式で指定します。たとえば、
0 16 * * *
はジョブが毎日午後 4 時(UTC)に実行されることを意味します。
パイプラインの実行
すべてのトリガーが正常に作成されたことを確認します。
gcloud eventarc triggers list
出力は次のようになります。
NAME TYPE DESTINATION ACTIVE LOCATION trigger-chart-creator-gke google.cloud.pubsub.topic.v1.messagePublished GKE:chart-creator Yes us-central1 trigger-notifier-gke google.cloud.audit.log.v1.written GKE:notifier Yes us-central1 trigger-query-runner-gke google.cloud.pubsub.topic.v1.messagePublished GKE:query-runner Yes us-central1
Cloud Scheduler ジョブ ID を取得します。
gcloud scheduler jobs list
出力は次のようになります。
ID LOCATION SCHEDULE (TZ) TARGET_TYPE STATE cre-scheduler-cy us-central1 0 17 * * * (Etc/UTC) Pub/Sub ENABLED cre-scheduler-uk us-central1 0 16 * * * (Etc/UTC) Pub/Sub ENABLED
ジョブは毎日午後 4 時から午後 5 時に実行されるようにスケジュールされていますが、Cloud Scheduler ジョブを手動で実行することもできます。
gcloud scheduler jobs run cre-scheduler-cy gcloud scheduler jobs run cre-scheduler-uk
数分後、Cloud Storage バケットに 2 つのグラフが作成されていることを確認します。
gcloud storage ls gs://${BUCKET}
出力は次のようになります。
gs://PROJECT_ID-charts/chart-cyprus.png gs://PROJECT_ID-charts/chart-unitedkingdom.png
これで、グラフのリンクを含むメールが 2 通届いたはずです。