Ejecutar una tarea de Cloud Run que procese los datos de eventos guardados en Cloud Storage

Puedes usar Workflows para ejecutar una tarea de Cloud Run como parte de un flujo de trabajo que realice un procesamiento de datos más complejo u orqueste un sistema de tareas ya creadas.

En este tutorial se muestra cómo usar Workflows para ejecutar una tarea de Cloud Run que procesa datos de eventos almacenados en un segmento de Cloud Storage. Si almacenas la carga útil del evento en un segmento de Cloud Storage, puedes cifrar los datos con claves de cifrado gestionadas por el cliente, lo que no es posible si transmites los datos del evento como variables de entorno al trabajo de Cloud Run.

En el siguiente diagrama se muestra un resumen general:

El evento de Pub/Sub se enruta mediante un activador de Eventarc a Workflows y se guarda en un segmento de Cloud Storage. La tarea de Cloud Run procesa los datos de eventos almacenados en el segmento.

Crear un trabajo de Cloud Run

En este tutorial se usa código de muestra que puedes encontrar en GitHub. El script de implementación crea una imagen de contenedor para crear un trabajo de Cloud Run. La secuencia de comandos también crea un segmento de Cloud Storage. La tarea de Cloud Run lee los datos de eventos almacenados en el segmento de Cloud Storage y, a continuación, los imprime.

  1. Si ejecutas la secuencia de comandos de implementación en Cloud Shell y la cuenta de servicio predeterminada de Compute Engine no tiene el rol Editor, asigna los siguientes roles al proyecto de la cuenta de servicio predeterminada de Compute Engine. De lo contrario, puedes saltarte este paso y clonar el repositorio de la aplicación de ejemplo en el siguiente paso.

    1. Asigna el rol Escritor de Artifact Registry (roles/artifactregistry.writer):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/artifactregistry.writer

      Sustituye PROJECT_NUMBER por el número de tu proyecto. Google Cloud Puedes encontrar el número de tu proyecto en la página Bienvenido de la consola Google Cloud o ejecutando el siguiente comando:

      gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    2. Asigna el rol de usuario de objetos de almacenamiento (roles/storage.objectUser):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/storage.objectUser
    3. Asigna el rol Escritor de registros de Logging (roles/logging.logWriter):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/logging.logWriter
  2. Para obtener el código de ejemplo, clona el repositorio de la aplicación de ejemplo en tu máquina local:

    git clone https://github.com/GoogleCloudPlatform/workflows-demos.git

    También puedes descargar la muestra como archivo ZIP.

  3. Accede al directorio que contiene el código de muestra:

    cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
  4. Crea el trabajo de Cloud Run ejecutando la secuencia de comandos de implementación:

    ./deploy-job.sh

La secuencia de comandos crea un segmento de Cloud Storage con el nombre message-payload-PROJECT_ID, donde PROJECT_ID es el ID de tu proyecto Google Cloud . También se crea una tarea de Cloud Run llamada message-payload-job.

Desplegar un flujo de trabajo que ejecute el trabajo de Cloud Run

Define y despliega un flujo de trabajo que ejecute el trabajo de Cloud Run que acabas de crear. Una definición de flujo de trabajo se compone de una serie de pasos descritos mediante la sintaxis de Workflows.

El flujo de trabajo recibe un evento, guarda los datos del evento en un segmento de Cloud Storage y, a continuación, ejecuta una tarea de Cloud Run para procesar los datos del evento.

Consola

  1. En la Google Cloud consola, ve a la página Flujos de trabajo:

    Ir a Workflows

  2. Haz clic en Crear.

  3. Escribe un nombre para el nuevo flujo de trabajo, como message-payload-workflow.

  4. Elige una región adecuada, como us-central1.

  5. En el campo Cuenta de servicio, selecciona la cuenta que has creado anteriormente.

    La cuenta de servicio actúa como identidad del flujo de trabajo. Ya deberías haber concedido los siguientes roles a la cuenta de servicio:

    • Administrador de Cloud Run: para ejecutar tareas de Cloud Run
    • Editor de registros: para escribir entradas de registro.
    • Creador de objetos de Storage: para crear objetos en Cloud Storage.
  6. Haz clic en Siguiente.

  7. En el editor del flujo de trabajo, introduce la siguiente definición para tu flujo de trabajo:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. Haz clic en Desplegar.

gcloud

  1. Crea un archivo de código fuente para tu flujo de trabajo:

    touch message-payload-workflow.yaml
  2. Copia la siguiente definición de flujo de trabajo en message-payload-workflow.yaml:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. Para desplegar el flujo de trabajo, introduce el siguiente comando:

    gcloud workflows deploy message-payload-workflow \
        --location=us-central1 \
        --source=message-payload-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Haz los cambios siguientes:

    • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio que has creado antes
    • PROJECT_ID: el ID de tu Google Cloud proyecto

    La cuenta de servicio actúa como identidad del flujo de trabajo. Ya deberías haber concedido los siguientes roles a la cuenta de servicio:

    • roles/logging.logWriter: para escribir entradas de registro
    • roles/run.admin: para ejecutar tareas de Cloud Run
    • roles/storage.objectCreator: para crear objetos en Cloud Storage

El flujo de trabajo hace lo siguiente:

  1. Paso init: acepta un evento como argumento y define las variables necesarias.

  2. log_event: crea una entrada de registro en Cloud Logging mediante la función sys.log.

  3. write_payload_to_gcs: envía una solicitud POST HTTP y escribe los datos de la carga útil del evento en un archivo de un segmento de Cloud Storage.

  4. Paso run_job_to_process_payload: usa el método del conector de la API Admin de Cloud Run, googleapis.run.v1.namespaces.jobs.run, para ejecutar el trabajo. El segmento de Cloud Storage y el nombre del archivo de datos se transfieren a la tarea desde el flujo de trabajo como variables de anulación.

  5. Paso finish: devuelve información sobre la ejecución del trabajo como resultado del flujo de trabajo.

Crear un tema de Pub/Sub

Crea un tema de Pub/Sub para poder publicar un mensaje en él. Se usa un evento de Pub/Sub para mostrar cómo enrutar un evento mediante Workflows y guardarlo en Cloud Storage para que un trabajo de Cloud Run pueda procesar los datos del evento.

Consola

  1. En la Google Cloud consola, ve a la página Temas.

    Ir a Temas

  2. Haz clic en Crear tema.

  3. En el campo ID de tema, introduce un ID para el tema, como message-payload-topic.

  4. Mantén la opción Añadir una suscripción predeterminada.

  5. No selecciones las otras opciones.

  6. Haz clic en Crear.

gcloud

Para crear un tema con el ID message-payload-topic, ejecuta el comando gcloud pubsub topics create:

gcloud pubsub topics create message-payload-topic

Crear un activador de Eventarc para enrutar eventos al flujo de trabajo

Para ejecutar automáticamente el flujo de trabajo y, a su vez, el trabajo de Cloud Run, crea un activador de Eventarc que responda a eventos de Pub/Sub y que dirija los eventos al flujo de trabajo. Cada vez que se escribe un mensaje en el tema de Pub/Sub, el evento activa una ejecución del flujo de trabajo.

Consola

  1. En la Google Cloud consola, ve a la página Flujos de trabajo:

    Ir a Workflows

  2. Haz clic en el nombre del flujo de trabajo, como message-payload-workflow.

  3. En la página Detalles del flujo de trabajo, haz clic en Editar.

  4. En la página Editar flujo de trabajo, en la sección Activadores, haz clic en Añadir nuevo activador > Eventarc.

    Se abrirá el panel Eventarc trigger (Activador de Eventarc).

  5. En el campo Nombre del activador, escribe un nombre para el activador, como message-payload-trigger.

  6. En la lista Proveedor de eventos, selecciona Cloud Pub/Sub.

  7. En la lista Evento, selecciona google.cloud.pubsub.topic.v1.messagePublished.

  8. En la lista Seleccionar un tema de Cloud Pub/Sub, elija el tema de Pub/Sub que haya creado anteriormente.

  9. En el campo Cuenta de servicio, selecciona la cuenta que has creado anteriormente.

    La cuenta de servicio actúa como identidad del activador. Ya deberías haber concedido los siguientes roles a la cuenta de servicio:

    • Receptor de evento Eventarc: para recibir eventos
    • Invocador de flujos de trabajo: para ejecutar flujos de trabajo
  10. Haz clic en Guardar activador.

    El activador de Eventarc ahora aparece en la sección Activadores de la página Editar flujo de trabajo.

  11. Haz clic en Siguiente.

  12. Haz clic en Desplegar.

gcloud

Para crear un activador de Eventarc, ejecuta el siguiente comando:

gcloud eventarc triggers create message-payload-trigger \
    --location=us-central1 \
    --destination-workflow=message-payload-workflow \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Haz los cambios siguientes:

  • PROJECT_ID: el ID de tu Google Cloud proyecto
  • SERVICE_ACCOUNT_NAME: el nombre de la cuenta de servicio que has creado antes.

La cuenta de servicio actúa como identidad del activador. Ya deberías haber concedido los siguientes roles a la cuenta de servicio:

  • roles/eventarc.eventReceiver: para recibir eventos
  • roles/workflows.invoker: para ejecutar flujos de trabajo

Activar el flujo de trabajo

Prueba el sistema integral publicando un mensaje en el tema de Pub/Sub y generando un evento. Para obtener más información, consulta el artículo sobre activar un flujo de trabajo con eventos o mensajes de Pub/Sub.

  1. Publica un mensaje en el tema de Pub/Sub para generar un evento:

    gcloud pubsub topics publish message-payload-topic --message="Hello World"
    

    El evento se dirige al flujo de trabajo, que registra el mensaje del evento, guarda los datos del evento en el segmento de Cloud Storage y ejecuta el trabajo de Cloud Run para procesar los datos guardados en Cloud Storage. Este proceso puede tardar un minuto.

  2. Para confirmar que la tarea de Cloud Run se ha ejecutado correctamente, consulta las ejecuciones de la tarea:

    gcloud run jobs executions list --job=message-payload-job

    Deberías ver una nueva ejecución de trabajo en la salida.

  3. Para ver las entradas de registro relacionadas con eventos que se han creado al activar el flujo de trabajo, ejecuta el siguiente comando:

    gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
    
  4. Busca una entrada de registro similar a la siguiente:

    textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\
    \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}"
    ...
    resource:
    labels:
      job_name: message-payload-job
      location: us-central1
      project_id: MY_PROJECT
    type: cloud_run_job
    textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
    
  5. Para confirmar que los resultados son los esperados, consulta los datos de eventos en el objeto del segmento de Cloud Storage.

    1. Recupera el nombre del segmento:

      gcloud storage ls

      El resultado debería ser similar al siguiente:

      gs://message-payload-PROJECT_ID/

    2. Lista los objetos de tu segmento:

      gcloud storage ls gs://message-payload-PROJECT_ID/** --recursive

      La salida debería ser similar a la siguiente:

      gs://message-payload-PROJECT_ID/OBJECT_ID.data.json

      Anota el OBJECT_ID para usarlo en el siguiente paso.

    3. Descarga el objeto de tu segmento como un archivo:

      gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt

      Sustituye OBJECT_ID por el ID devuelto en el paso anterior.

    4. En un editor de texto, abre el archivo message-event.txt. El cuerpo del evento escrito en el archivo debe ser similar al siguiente:

      {
        "message": {
          "data": "SGVsbG8gV29ybGQ=",
          "messageId": "8254002311197919",
          "publishTime": "2023-09-20T16:54:29.312Z"
        },
        "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741"
      }
      

      Ten en cuenta que, si decodificas el valor de datos de SGVsbG8gV29ybGQ= de su formato Base64, se devuelve "Hello World".