Crear un flujo de procesamiento de BigQuery para Knative Serving con Eventarc

En este tutorial se explica cómo usar Eventarc para crear un flujo de procesamiento que programe consultas en un conjunto de datos público de BigQuery, genere gráficos basados en los datos y comparta enlaces a los gráficos por correo electrónico.

Crear una clave de API de SendGrid

SendGrid es un proveedor de correo basado en la nube que te permite enviar correos sin tener que mantener servidores de correo.

  1. Inicia sesión en SendGrid y ve a Configuración > Claves de API.
  2. Haz clic en Crear clave de API.
  3. Selecciona los permisos de la clave. Como mínimo, la clave debe tener permisos de Envío de correo para enviar correos.
  4. Haz clic en Guardar para crear la clave.
  5. SendGrid genera una nueva clave. Esta es la única copia de la clave, así que asegúrate de copiarla y guardarla para más adelante.

Crear un clúster de GKE

Crea un clúster con Workload Identity Federation for GKE habilitado para que pueda acceder a los servicios de las aplicaciones que se ejecutan en GKE. Google Cloud También necesitas Workload Identity Federation para GKE para reenviar eventos mediante Eventarc.

  1. Crea un clúster de GKE para Knative Serving con los complementos CloudRun, HttpLoadBalancing y HorizontalPodAutoscaling habilitados:

    gcloud beta container clusters create $CLUSTER_NAME \
        --addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \
        --machine-type=n1-standard-4 \
        --enable-autoscaling --min-nodes=2 --max-nodes=10 \
        --no-issue-client-certificate --num-nodes=2  \
        --logging=SYSTEM,WORKLOAD \
        --monitoring=SYSTEM \
        --scopes=cloud-platform,logging-write,monitoring-write,pubsub \
        --zone us-central1 \
        --release-channel=rapid \
        --workload-pool=$PROJECT_ID.svc.id.goog
    
  2. Espera unos minutos a que se cree el clúster. Durante el proceso, es posible que veas advertencias que puedes ignorar sin problema. Cuando se haya creado el clúster, el resultado será similar al siguiente:

    Creating cluster ...done.
    Created [https://container.googleapis.com/v1beta1/projects/my-project/zones/us-central1/clusters/events-cluster].
    
  3. Crea un repositorio estándar de Artifact Registry para almacenar tu imagen de contenedor Docker:

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=$CLUSTER_LOCATION

    Sustituye REPOSITORY por un nombre único para el repositorio.

Configurar la cuenta de servicio de GKE

Configura una cuenta de servicio de GKE para que actúe como cuenta de servicio de Compute predeterminada.

  1. Crea un enlace de gestión de identidades y accesos (IAM) entre las cuentas de servicio:

    PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
    
    gcloud iam service-accounts add-iam-policy-binding \
        --role roles/iam.workloadIdentityUser \
        --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/default]" \
        $PROJECT_NUMBER-compute@developer.gserviceaccount.com
  2. Añade la anotación iam.gke.io/gcp-service-account a la cuenta de servicio de GKE con la dirección de correo de la cuenta de servicio de Compute:

    kubectl annotate serviceaccount \
        --namespace default \
        default \
        iam.gke.io/gcp-service-account=$PROJECT_NUMBER-compute@developer.gserviceaccount.com

Habilitar destinos de GKE

Para permitir que Eventarc gestione recursos en el clúster de GKE, habilita los destinos de GKE y vincula la cuenta de servicio de Eventarc con los roles necesarios.

  1. Habilita los destinos de GKE para Eventarc:

    gcloud eventarc gke-destinations init
  2. Cuando se te pida que enlaces los roles necesarios, introduce y.

    Se han vinculado los siguientes roles:

    • roles/compute.viewer
    • roles/container.developer
    • roles/iam.serviceAccountAdmin

Crear una cuenta de servicio y vincular roles de acceso

Antes de crear el activador de Eventarc, configura una cuenta de servicio gestionada por el usuario y concédele roles específicos para que Eventarc pueda reenviar eventos de Pub/Sub.

  1. Crea una cuenta de servicio llamada TRIGGER_GSA:

    TRIGGER_GSA=eventarc-bigquery-triggers
    gcloud iam service-accounts create $TRIGGER_GSA
  2. Asigna los roles pubsub.subscriber, monitoring.metricWriter y eventarc.eventReceiver a la cuenta de servicio:

    PROJECT_ID=$(gcloud config get-value project)
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/pubsub.subscriber"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/monitoring.metricWriter"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/eventarc.eventReceiver"

Crea un segmento de Cloud Storage

Crea un segmento de Cloud Storage para guardar los gráficos. Asegúrate de que el segmento y los gráficos estén disponibles públicamente y en la misma región que tu servicio de GKE:

export BUCKET="$(gcloud config get-value core/project)-charts"
gcloud storage buckets create gs://${BUCKET} --location=$(gcloud config get-value run/region)
gcloud storage buckets update gs://${BUCKET} --uniform-bucket-level-access
gcloud storage buckets add-iam-policy-binding gs://${BUCKET} --member=allUsers --role=roles/storage.objectViewer

Clonar el repositorio

Clona el repositorio de GitHub.

git clone https://github.com/GoogleCloudPlatform/eventarc-samples
cd eventarc-samples/processing-pipelines

Implementar el servicio de notificaciones

Desde el directorio bigquery/notifier/python, despliega un servicio de Knative Serving que reciba eventos de creador de gráficos y use SendGrid para enviar por correo enlaces a los gráficos generados.

  1. Crea y envía la imagen del contenedor:

    pushd bigquery/notifier/python
    export SERVICE_NAME=notifier
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. Despliega la imagen de contenedor en Knative Serving. Para ello, indica una dirección a la que enviar correos y la clave de API de SendGrid:

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET}

    Haz los cambios siguientes:

    • EMAIL_ADDRESS: una dirección de correo a la que enviar los enlaces a los gráficos generados
    • YOUR_SENDGRID_API_KEY: la clave de API de SendGrid que has anotado anteriormente

Cuando veas la URL del servicio, el despliegue se habrá completado.

Crear un activador para el servicio de notificaciones

El activador de Eventarc del servicio de notificaciones implementado en Knative Serving filtra los registros de auditoría de Cloud Storage en los que methodName es storage.objects.create.

  1. Crea el activador:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    De esta forma, se crea un activador llamado trigger-notifier-gke.

Desplegar el servicio de creación de gráficos

En el directorio bigquery/chart-creator/python, implementa un servicio de Knative Serving que reciba eventos de Query Runner, recupere datos de una tabla de BigQuery de un país específico y, a continuación, genere un gráfico con Matplotlib a partir de los datos. El gráfico se sube a un segmento de Cloud Storage.

  1. Crea y envía la imagen del contenedor:

    pushd bigquery/chart-creator/python
    export SERVICE_NAME=chart-creator
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. Despliega la imagen de contenedor en Knative Serving. Para ello, introduce lo siguiente: BUCKET

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars BUCKET=${BUCKET}

Cuando veas la URL del servicio, el despliegue se habrá completado.

Crear un activador para el servicio de creación de gráficos

El activador de Eventarc del servicio de creación de gráficos implementado en Knative Serving filtra los mensajes publicados en un tema de Pub/Sub.

  1. Crea el activador:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    De esta forma, se crea un activador llamado trigger-chart-creator-gke.

  2. Define la variable de entorno del tema de Pub/Sub.

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)'))

Desplegar el servicio de ejecución de consultas

En el directorio processing-pipelines, implementa un servicio de Knative Serving que reciba eventos de Cloud Scheduler, obtenga datos de un conjunto de datos público sobre el COVID-19 y guarde los resultados en una tabla de BigQuery.

  1. Crea y envía la imagen del contenedor:

    export SERVICE_NAME=query-runner
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 -f Dockerfile .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  2. Despliega la imagen de contenedor en Knative Serving. Para ello, introduce PROJECT_ID y TOPIC_QUERY_COMPLETED:

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED}

Cuando veas la URL del servicio, el despliegue se habrá completado.

Crear un activador para el servicio de ejecución de consultas

El activador de Eventarc del servicio de ejecución de consultas implementado en Knative Serving filtra los mensajes publicados en un tema de Pub/Sub.

  1. Crea el activador:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    De esta forma, se crea un activador llamado trigger-query-runner-gke.

  2. Define una variable de entorno para el tema de Pub/Sub.

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)')

Programar las tareas

El flujo de procesamiento se activa mediante dos tareas de Cloud Scheduler.

  1. Crea una aplicación de App Engine, que es necesaria para Cloud Scheduler, y especifica una ubicación adecuada (por ejemplo, europe-west):

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
  2. Crea dos tareas de Cloud Scheduler que publiquen en un tema de Pub/Sub una vez al día:

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
        --schedule="0 16 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="United Kingdom"
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
        --schedule="0 17 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="Cyprus"

    La programación se especifica en formato cron de UNIX. Por ejemplo, 0 16 * * * significa que los trabajos se ejecutan a las 16:00 (4:00 PM) UTC todos los días.

Ejecutar el flujo de procesamiento

  1. Confirma que todos los activadores se han creado correctamente:

    gcloud eventarc triggers list

    La salida debería ser similar a la siguiente:

    NAME                       TYPE                                            DESTINATION         ACTIVE  LOCATION
    trigger-chart-creator-gke  google.cloud.pubsub.topic.v1.messagePublished   GKE:chart-creator   Yes     us-central1
    trigger-notifier-gke       google.cloud.audit.log.v1.written               GKE:notifier        Yes     us-central1
    trigger-query-runner-gke   google.cloud.pubsub.topic.v1.messagePublished   GKE:query-runner    Yes     us-central1
    
  2. Recupera los IDs de las tareas de Cloud Scheduler:

    gcloud scheduler jobs list

    La salida debería ser similar a la siguiente:

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. Aunque las tareas están programadas para ejecutarse todos los días a las 16:00 y a las 17:00, también puedes ejecutarlas manualmente:

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
  4. Al cabo de unos minutos, confirma que hay dos gráficos en el segmento de Cloud Storage:

    gcloud storage ls gs://${BUCKET}

    La salida debería ser similar a la siguiente:

    gs://PROJECT_ID-charts/chart-cyprus.png
    gs://PROJECT_ID-charts/chart-unitedkingdom.png
    

¡Enhorabuena! También deberías recibir dos correos con enlaces a los gráficos.