Execute a Cloud Run job that processes event data saved in Cloud Storage

You can use Workflows to execute a Cloud Run job as part of a workflow that performs more complex data processing or that orchestrates a system of existing jobs.

This tutorial demonstrates how to use Workflows to execute a Cloud Run job that processes event data stored in a Cloud Storage bucket. Storing the event payload in a Cloud Storage bucket allows you to encrypt the data using customer-managed encryption keys which is not possible if you are passing the event data as environment variables to the Cloud Run job.

The following diagram provides a high-level overview:

Pub/Sub event is routed by Eventarc trigger to Workflows and saved in Cloud
Storage bucket. Cloud Run job processes event data stored in bucket.

Create a Cloud Run job

This tutorial uses sample code that you can find on GitHub. The deployment script builds a container image to create a Cloud Run job. The script also creates a Cloud Storage bucket. The Cloud Run job reads any event data stored in the Cloud Storage bucket and then prints the event data.

  1. If you are running the deployment script in Cloud Shell, and if the Compute Engine default service account doesn't have the Editor role, grant the following roles on the project to the Compute Engine default service account. (Otherwise, you can skip this step and proceed with cloning the sample app repository in the next step.)

    1. Grant the Artifact Registry Writer role (roles/artifactregistry.writer):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/artifactregistry.writer

      Replace PROJECT_NUMBER with your Google Cloud project number. You can find your project number on the Welcome page of the Google Cloud console or by running the following command:

      gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    2. Grant the Storage Object User role (roles/storage.objectUser):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/storage.objectUser
    3. Grant the Logging Logs Writer role (roles/logging.logWriter):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/logging.logWriter
  2. Get the sample code by cloning the sample app repository to your local machine:

    git clone https://github.com/GoogleCloudPlatform/workflows-demos.git

    Alternatively, you can download the sample as a ZIP file

  3. Change to the directory that contains the sample code:

    cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
  4. Create the Cloud Run job by running the deployment script:

    ./deploy-job.sh

The script creates a Cloud Storage bucket with the name message-payload-PROJECT_ID, where PROJECT_ID is the ID of your Google Cloud project. A Cloud Run job named message-payload-job is also created.

Deploy a workflow that executes the Cloud Run job

Define and deploy a workflow that executes the Cloud Run job you just created. A workflow definition is made up of a series of steps described using the Workflows syntax.

The workflow receives an event, saves the event data to a Cloud Storage bucket, and then executes a Cloud Run job to process the event data.

Console

  1. In the Google Cloud console, go to the Workflows page:

    Go to Workflows

  2. Click Create.

  3. Enter a name for the new workflow, such as message-payload-workflow.

  4. Choose an appropriate region; for example, us-central1.

  5. In the Service account field, select the service account you created earlier.

    The service account serves as the workflow's identity. You should have already granted the following roles to the service account:

    • Cloud Run Admin: to execute Cloud Run jobs
    • Logs Writer: to write log entries
    • Storage Object Creator: to create objects in Cloud Storage
  6. Click Next.

  7. In the workflow editor, enter the following definition for your workflow:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. Click Deploy.

gcloud

  1. Create a source code file for your workflow:

    touch message-payload-workflow.yaml
  2. Copy the following workflow definition to message-payload-workflow.yaml:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. Deploy the workflow by entering the following command:

    gcloud workflows deploy message-payload-workflow \
        --location=us-central1 \
        --source=message-payload-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Replace the following:

    • SERVICE_ACCOUNT_NAME: the name of the service account you created earlier
    • PROJECT_ID: the ID of your Google Cloud project

    The service account serves as the workflow's identity. You should have already granted the following roles to the service account:

    • roles/logging.logWriter: to write log entries
    • roles/run.admin: to execute Cloud Run jobs
    • roles/storage.objectCreator: to create objects in Cloud Storage

The workflow does the following:

  1. init step—Accepts an event as an argument and sets necessary variables.

  2. log_event step—Creates a log entry in Cloud Logging using the function, sys.log.

  3. write_payload_to_gcs step—Makes an HTTP POST request and writes the event payload data to a Cloud Storage bucket file.

  4. run_job_to_process_payload step—Uses the Cloud Run Admin API connector method, googleapis.run.v1.namespaces.jobs.run, to execute the job. The Cloud Storage bucket and data filename are passed as override variables from the workflow to the job.

  5. finish step—Returns information about the job execution as the result of the workflow.

Create a Pub/Sub topic

Create a Pub/Sub topic so that you can publish a message to it. A Pub/Sub event is used to demonstrate how to route an event using Workflows and save the event to Cloud Storage so that a Cloud Run job can process the event data.

Console

  1. In the Google Cloud console, go to the Topics page.

    Go to Topics

  2. Click Create topic.

  3. In the Topic ID field, enter an ID for the topic, such as message-payload-topic.

  4. Retain the option Add a default subscription.

  5. Don't select the other options.

  6. Click Create.

gcloud

To create a topic with the ID message-payload-topic, run the gcloud pubsub topics create command:

gcloud pubsub topics create message-payload-topic

Create an Eventarc trigger to route events to the workflow

To automatically execute the workflow and in turn the Cloud Run job, create an Eventarc trigger that responds to Pub/Sub events, and that routes the events to the workflow. Whenever a message is written to the Pub/Sub topic, the event triggers an execution of the workflow.

Console

  1. In the Google Cloud console, go to the Workflows page:

    Go to Workflows

  2. Click the name of your workflow, such as message-payload-workflow.

  3. On the Workflow details page, click Edit.

  4. On the Edit workflow page, in the Triggers section, click Add new trigger > Eventarc.

    The Eventarc trigger pane opens.

  5. In the Trigger name field, enter a name for the trigger, such as message-payload-trigger.

  6. From the Event provider list, select Cloud Pub/Sub.

  7. From the Event list, select google.cloud.pubsub.topic.v1.messagePublished.

  8. From the Select a Cloud Pub/Sub topic list, select the Pub/Sub topic you previously created.

  9. In the Service account field, select the service account you created earlier.

    The service account serves as the trigger's identity. You should have already granted the following roles to the service account:

    • Eventarc Event Receiver: to receive events
    • Workflows Invoker: to execute workflows
  10. Click Save trigger.

    The Eventarc trigger now appears in the Triggers section on the Edit workflow page.

  11. Click Next.

  12. Click Deploy.

gcloud

Create an Eventarc trigger by running the following command:

gcloud eventarc triggers create message-payload-trigger \
    --location=us-central1 \
    --destination-workflow=message-payload-workflow \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Replace the following:

  • PROJECT_ID: the ID of your Google Cloud project
  • SERVICE_ACCOUNT_NAME: the name of the service account you created earlier.

The service account serves as the trigger's identity. You should have already granted the following roles to the service account:

  • roles/eventarc.eventReceiver: to receive events
  • roles/workflows.invoker: to execute workflows

Trigger the workflow

Test the end-to-end system by publishing a message to the Pub/Sub topic and generating an event. For more information, see triggering a workflow with events or Pub/Sub messages.

  1. Publish a message to the Pub/Sub topic to generate an event:

    gcloud pubsub topics publish message-payload-topic --message="Hello World"
    

    The event is routed to the workflow which logs the event message, saves the event data to the Cloud Storage bucket, and executes the Cloud Run job to process the data saved in Cloud Storage. This can take a minute.

  2. Confirm that the Cloud Run job ran as expected by viewing the job executions:

    gcloud run jobs executions list --job=message-payload-job

    You should see a new job execution in the output.

  3. To view the event-related log entries created by triggering the workflow, run the following command:

    gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
    
  4. Look for a log entry similar to:

    textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\
    \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}"
    ...
    resource:
    labels:
      job_name: message-payload-job
      location: us-central1
      project_id: MY_PROJECT
    type: cloud_run_job
    textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
    
  5. You can confirm that the results are as expected by viewing the event data in the Cloud Storage bucket object.

    1. Retrieve your bucket name:

      gcloud storage ls

      The output is similar to the following:

      gs://message-payload-PROJECT_ID/

    2. List the objects in your bucket:

      gcloud storage ls gs://message-payload-PROJECT_ID/** --recursive

      The output should be similar to the following:

      gs://message-payload-PROJECT_ID/OBJECT_ID.data.json

      Note the OBJECT_ID to use in the next step.

    3. Download the object in your bucket as a file:

      gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt

      Replace OBJECT_ID with the ID returned in the previous step.

    4. In a text editor, open the message-event.txt file. The event body written to the file should be similar to the following:

      {
        "message": {
          "data": "SGVsbG8gV29ybGQ=",
          "messageId": "8254002311197919",
          "publishTime": "2023-09-20T16:54:29.312Z"
        },
        "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741"
      }
      

      Note that if you decode the data value of SGVsbG8gV29ybGQ= from its Base64 format, "Hello World" is returned.