Build a BigQuery processing pipeline for Knative serving with Eventarc

This tutorial shows you how to use Eventarc to build a processing pipeline that schedules queries to a public BigQuery dataset, generates charts based on the data, and shares links to the charts through email.

Create a SendGrid API key

SendGrid is a cloud-based email provider that lets you send email without having to maintain email servers.

  1. Sign in to SendGrid and go to Settings > API Keys.
  2. Click Create API Key.
  3. Select the permissions for the key. At a minimum, the key must have Mail Send permissions to send email.
  4. Click Save to create the key.
  5. SendGrid generates a new key. This is the only copy of the key, so make sure that you copy the key and save it for later.

Create a GKE cluster

Create a cluster with Workload Identity Federation for GKE enabled so that it can access Google Cloud services from applications running within GKE. You also need Workload Identity Federation for GKE to forward events using Eventarc.

  1. Create a GKE cluster for Knative serving with the CloudRun, HttpLoadBalancing and HorizontalPodAutoscaling addons enabled:

    gcloud beta container clusters create $CLUSTER_NAME \
        --addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \
        --machine-type=n1-standard-4 \
        --enable-autoscaling --min-nodes=2 --max-nodes=10 \
        --no-issue-client-certificate --num-nodes=2  \
        --logging=SYSTEM,WORKLOAD \
        --monitoring=SYSTEM \
        --scopes=cloud-platform,logging-write,monitoring-write,pubsub \
        --zone us-central1 \
        --release-channel=rapid \
        --workload-pool=$PROJECT_ID.svc.id.goog
    
  2. Wait a few minutes for the cluster creation to complete. During the process, you might see warnings that you can safely ignore. When the cluster has been created, the output is similar to the following:

    Creating cluster ...done.
    Created [https://container.googleapis.com/v1beta1/projects/my-project/zones/us-central1/clusters/events-cluster].
    
  3. Create an Artifact Registry standard repository to store your Docker container image:

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=$CLUSTER_LOCATION

    Replace REPOSITORY with a unique name for the repository.

Configure the GKE service account

Configure a GKE service account to act as the default compute service account.

  1. Create an Identity and Access Management (IAM) binding between the service accounts:

    PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
    
    gcloud iam service-accounts add-iam-policy-binding \
        --role roles/iam.workloadIdentityUser \
        --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/default]" \
        $PROJECT_NUMBER-compute@developer.gserviceaccount.com
  2. Add the iam.gke.io/gcp-service-account annotation to the GKE service account, using the email address of the compute service account:

    kubectl annotate serviceaccount \
        --namespace default \
        default \
        iam.gke.io/gcp-service-account=$PROJECT_NUMBER-compute@developer.gserviceaccount.com

Enable GKE destinations

To allow Eventarc to manage resources in the GKE cluster, enable GKE destinations and bind the Eventarc service account with the required roles.

  1. Enable GKE destinations for Eventarc:

    gcloud eventarc gke-destinations init
  2. At the prompt to bind the required roles, enter y.

    The following roles are bound:

    • roles/compute.viewer
    • roles/container.developer
    • roles/iam.serviceAccountAdmin

Create a service account and bind access roles

Before creating the Eventarc trigger, set up a user-managed service account and grant it specific roles so that Eventarc can forward Pub/Sub events.

  1. Create a service account called TRIGGER_GSA:

    TRIGGER_GSA=eventarc-bigquery-triggers
    gcloud iam service-accounts create $TRIGGER_GSA
  2. Grant the pubsub.subscriber, monitoring.metricWriter, and eventarc.eventReceiver roles to the service account:

    PROJECT_ID=$(gcloud config get-value project)
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/pubsub.subscriber"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/monitoring.metricWriter"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/eventarc.eventReceiver"

Create a Cloud Storage bucket

Create a Cloud Storage bucket to save the charts. Make sure that the bucket and the charts are publicly available, and in the same region as your GKE service:

export BUCKET="$(gcloud config get-value core/project)-charts"
gcloud storage buckets create gs://${BUCKET} --location=$(gcloud config get-value run/region)
gcloud storage buckets update gs://${BUCKET} --uniform-bucket-level-access
gcloud storage buckets add-iam-policy-binding gs://${BUCKET} --member=allUsers --role=roles/storage.objectViewer

Clone the repository

Clone the GitHub repository.

git clone https://github.com/GoogleCloudPlatform/eventarc-samples
cd eventarc-samples/processing-pipelines

Deploy the notifier service

From the bigquery/notifier/python directory, deploy a Knative serving service that receives chart creator events and uses SendGrid to email links to the generated charts.

  1. Build and push the container image:

    pushd bigquery/notifier/python
    export SERVICE_NAME=notifier
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. Deploy the container image to Knative serving, passing in an address to send emails to, and the SendGrid API key:

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET}

    Replace the following:

    • EMAIL_ADDRESS: an email address to send the links to the generated charts
    • YOUR_SENDGRID_API_KEY: the SendGrid API key you noted previously

When you see the service URL, the deployment is complete.

Create a trigger for the notifier service

The Eventarc trigger for the notifier service deployed on Knative serving filters for Cloud Storage audit logs where the methodName is storage.objects.create.

  1. Create the trigger:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    This creates a trigger called trigger-notifier-gke.

Deploy the chart creator service

From the bigquery/chart-creator/python directory, deploy a Knative serving service that receives query runner events, retrieves data from a BigQuery table for a specific country, and then generates a chart, using Matplotlib, from the data. The chart is uploaded to a Cloud Storage bucket.

  1. Build and push the container image:

    pushd bigquery/chart-creator/python
    export SERVICE_NAME=chart-creator
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. Deploy the container image to Knative serving, passing in BUCKET:

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars BUCKET=${BUCKET}

When you see the service URL, the deployment is complete.

Create a trigger for the chart creator service

The Eventarc trigger for the chart creator service deployed on Knative serving filters for messages published to a Pub/Sub topic.

  1. Create the trigger:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    This creates a trigger called trigger-chart-creator-gke.

  2. Set the Pub/Sub topic environment variable.

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)'))

Deploy the query runner service

From the processing-pipelines directory, deploy a Knative serving service that receives Cloud Scheduler events, retrieves data from a public COVID-19 dataset, and saves the results in a new BigQuery table.

  1. Build and push the container image:

    export SERVICE_NAME=query-runner
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 -f Dockerfile .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  2. Deploy the container image to Knative serving, passing in PROJECT_ID and TOPIC_QUERY_COMPLETED:

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED}

When you see the service URL, the deployment is complete.

Create a trigger for the query runner service

The Eventarc trigger for the query runner service deployed on Knative serving filters for messages published to a Pub/Sub topic.

  1. Create the trigger:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    This creates a trigger called trigger-query-runner-gke.

  2. Set an environment variable for the Pub/Sub topic.

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)')

Schedule the jobs

The processing pipeline is triggered by two Cloud Scheduler jobs.

  1. Create an App Engine app which is required by Cloud Scheduler and specify an appropriate location (for example, europe-west):

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
  2. Create two Cloud Scheduler jobs that publish to a Pub/Sub topic once per day:

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
        --schedule="0 16 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="United Kingdom"
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
        --schedule="0 17 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="Cyprus"

    The schedule is specified in unix-cron format. For example, 0 16 * * * means that the jobs runs at 16:00 (4 PM) UTC every day.

Run the pipeline

  1. Confirm that all the triggers were successfully created:

    gcloud eventarc triggers list

    The output should be similar to the following:

    NAME                       TYPE                                            DESTINATION         ACTIVE  LOCATION
    trigger-chart-creator-gke  google.cloud.pubsub.topic.v1.messagePublished   GKE:chart-creator   Yes     us-central1
    trigger-notifier-gke       google.cloud.audit.log.v1.written               GKE:notifier        Yes     us-central1
    trigger-query-runner-gke   google.cloud.pubsub.topic.v1.messagePublished   GKE:query-runner    Yes     us-central1
    
  2. Retrieve the Cloud Scheduler job IDs:

    gcloud scheduler jobs list

    The output should be similar to the following:

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. Although the jobs are scheduled to run daily at 4 and 5 PM, you can also run the Cloud Scheduler jobs manually:

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
  4. After a few minutes, confirm that there are two charts in the Cloud Storage bucket:

    gcloud storage ls gs://${BUCKET}

    The output should be similar to the following:

    gs://PROJECT_ID-charts/chart-cyprus.png
    gs://PROJECT_ID-charts/chart-unitedkingdom.png
    

Congratulations! You should also receive two emails with links to the charts.