Cloud Run-Job ausführen, der in Cloud Storage gespeicherte Ereignisdaten verarbeitet

Sie können Workflows verwenden, um einen Cloud Run-Job als Teil eines Workflows auszuführen, der eine komplexere Datenverarbeitung durchführt oder ein System vorhandener Jobs orchestriert.

In dieser Anleitung wird gezeigt, wie Sie mit Workflows einen Cloud Run-Job ausführen, der Ereignisdaten verarbeitet, die in einem Cloud Storage-Bucket gespeichert sind. Wenn Sie die Ereignisnutzlast in einem Cloud Storage-Bucket speichern, können Sie die Daten mit vom Kunden verwalteten Verschlüsselungsschlüsseln verschlüsseln. Das ist nicht möglich, wenn Sie die Ereignisdaten als Umgebungsvariablen an den Cloud Run-Job übergeben.

Das folgende Diagramm bietet einen groben Überblick:

Das Pub/Sub-Ereignis wird vom Eventarc-Trigger an Workflows weitergeleitet und in einem Cloud Storage-Bucket gespeichert. Der Cloud Run-Job verarbeitet Ereignisdaten, die in einem Bucket gespeichert sind.

Cloud Run-Job erstellen

In dieser Anleitung wird Beispielcode verwendet, den Sie auf GitHub finden. Das Bereitstellungsskript erstellt ein Container-Image, um einen Cloud Run-Job zu erstellen. Mit dem Skript wird auch ein Cloud Storage-Bucket erstellt. Der Cloud Run-Job liest alle Ereignisdaten, die im Cloud Storage-Bucket gespeichert sind, und gibt sie dann aus.

  1. Wenn Sie das Bereitstellungsskript in Cloud Shell ausführen und das Compute Engine-Standarddienstkonto nicht die Rolle „Bearbeiter“ hat, weisen Sie dem Compute Engine-Standarddienstkonto die folgenden Rollen für das Projekt zu. Andernfalls können Sie diesen Schritt überspringen und mit dem Klonen des Beispiel-App-Repositorys im nächsten Schritt fortfahren.

    1. Weisen Sie die Rolle „Artifact Registry-Autor“ (roles/artifactregistry.writer) zu:

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/artifactregistry.writer

      Ersetzen Sie PROJECT_NUMBER durch die Google CloudProjektnummer. Sie finden Ihre Projektnummer auf der Willkommensseite der Google Cloud -Konsole oder durch Ausführen des folgenden Befehls:

      gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    2. Weisen Sie die Storage-Objekt-Nutzerrolle (roles/storage.objectUser) zu:

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/storage.objectUser
    3. Weisen Sie die Rolle „Logautor“ (roles/logging.logWriter) zu:

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/logging.logWriter
  2. Rufen Sie den Beispielcode ab, indem Sie das Repository der Beispiel-App auf Ihren lokalen Computer klonen:

    git clone https://github.com/GoogleCloudPlatform/workflows-demos.git

    Alternativ können Sie das Beispiel als ZIP-Datei herunterladen.

  3. Wechseln Sie zu dem Verzeichnis, das den Beispielcode enthält:

    cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
  4. Erstellen Sie den Cloud Run-Job, indem Sie das Bereitstellungsskript ausführen:

    ./deploy-job.sh

Das Skript erstellt einen Cloud Storage-Bucket mit dem Namen message-payload-PROJECT_ID, wobei PROJECT_ID die ID Ihres Google Cloud Projekts ist. Außerdem wird ein Cloud Run-Job mit dem Namen message-payload-job erstellt.

Workflow bereitstellen, der den Cloud Run-Job ausführt

Definieren Sie einen Workflow, der den gerade erstellten Cloud Run-Job ausführt, und stellen Sie ihn bereit. Ein Workflow besteht aus einer Reihe von Schritten, die mit der Workflows-Syntax beschrieben werden.

Der Workflow empfängt ein Ereignis, speichert die Ereignisdaten in einem Cloud Storage-Bucket und führt dann einen Cloud Run-Job aus, um die Ereignisdaten zu verarbeiten.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Workflows auf:

    Zur Seite "Workflows"

  2. Klicken Sie auf  Erstellen.

  3. Geben Sie einen Namen für den neuen Workflow ein, z. B. message-payload-workflow.

  4. Wählen Sie eine geeignete Region aus, z. B. us-central1.

  5. Wählen Sie im Feld Dienstkonto das zuvor erstellte Dienstkonto aus.

    Das Dienstkonto dient als Identität des Workflows. Sie sollten dem Dienstkonto bereits die folgenden Rollen zugewiesen haben:

    • Cloud Run Admin: zum Ausführen von Cloud Run-Jobs
    • Logautor: zum Schreiben von Logeinträgen
    • Storage-Objekt-Ersteller: zum Erstellen von Objekten in Cloud Storage
  6. Klicken Sie auf Weiter.

  7. Geben Sie im Workflow-Editor die folgende Definition für Ihren Workflow ein:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. Klicken Sie auf Bereitstellen.

gcloud

  1. Erstellen Sie eine Quellcodedatei für Ihren Workflow:

    touch message-payload-workflow.yaml
  2. Kopieren Sie die folgende Workflowdefinition in message-payload-workflow.yaml:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. Stellen Sie den Workflow bereit. Geben Sie hierzu den folgenden Befehl ein:

    gcloud workflows deploy message-payload-workflow \
        --location=us-central1 \
        --source=message-payload-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Ersetzen Sie Folgendes:

    • SERVICE_ACCOUNT_NAME: der Name des Dienstkontos, das Sie zuvor erstellt haben
    • PROJECT_ID: die ID IhresGoogle Cloud -Projekts

    Das Dienstkonto dient als Identität des Workflows. Sie sollten dem Dienstkonto bereits die folgenden Rollen zugewiesen haben:

    • roles/logging.logWriter: Logeinträge schreiben
    • roles/run.admin: zum Ausführen von Cloud Run-Jobs
    • roles/storage.objectCreator: zum Erstellen von Objekten in Cloud Storage

Der Workflow führt folgende Aktionen aus:

  1. init-Schritt: Akzeptiert ein Ereignis als Argument und legt die erforderlichen Variablen fest.

  2. log_event-Schritt: Erstellt mit der Funktion sys.log einen Logeintrag in Cloud Logging.

  3. write_payload_to_gcs-Schritt: Stellt eine HTTP-POST-Anfrage und schreibt die Nutzlastdaten des Ereignisses in eine Cloud Storage-Bucket-Datei.

  4. Im run_job_to_process_payload-Schritt wird die Methode googleapis.run.v1.namespaces.jobs.run des Cloud Run Admin API-Connectors verwendet, um den Job auszuführen. Der Cloud Storage-Bucket und der Dateiname werden als Überschreibungsvariablen vom Workflow an den Job übergeben.

  5. finish-Schritt: Gibt Informationen zur Jobausführung als Ergebnis des Workflows zurück.

Pub/Sub-Thema erstellen

Erstellen Sie ein Pub/Sub-Thema, damit Sie eine Nachricht darin veröffentlichen können. Ein Pub/Sub-Ereignis wird verwendet, um zu demonstrieren, wie ein Ereignis mit Workflows weitergeleitet und in Cloud Storage gespeichert wird, damit ein Cloud Run-Job die Ereignisdaten verarbeiten kann.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf Thema erstellen.

  3. Geben Sie im Feld Themen-ID eine ID für das Thema ein, z. B. message-payload-topic.

  4. Behalten Sie die Option Standardabo hinzufügen bei.

  5. Wählen Sie die anderen Optionen nicht aus.

  6. Klicken Sie auf Erstellen.

gcloud

Führen Sie den Befehl gcloud pubsub topics create aus, um ein Thema mit der ID message-payload-topic zu erstellen:

gcloud pubsub topics create message-payload-topic

Eventarc-Trigger erstellen, um Ereignisse an den Workflow weiterzuleiten

Wenn Sie den Workflow und damit den Cloud Run-Job automatisch ausführen möchten, erstellen Sie einen Eventarc-Trigger, der auf Pub/Sub-Ereignisse reagiert und die Ereignisse an den Workflow weiterleitet. Immer wenn eine Nachricht in das Pub/Sub-Thema geschrieben wird, löst das Ereignis die Ausführung des Workflows aus.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Workflows auf:

    Zur Seite "Workflows"

  2. Klicken Sie auf den Namen Ihres Workflows, z. B. message-payload-workflow.

  3. Klicken Sie auf der Seite Workflow-Details auf Bearbeiten.

  4. Klicken Sie auf der Seite Workflow bearbeiten im Bereich Trigger auf Neuen Trigger hinzufügen > Eventarc.

    Der Bereich Eventarc-Trigger wird geöffnet.

  5. Geben Sie im Feld Triggername einen Namen für den Trigger ein, z. B. message-payload-trigger.

  6. Wählen Sie in der Liste Ereignisanbieter die Option Cloud Pub/Sub aus.

  7. Wählen Sie in der Liste Ereignis die Option google.cloud.pubsub.topic.v1.messagePublished aus.

  8. Wählen Sie in der Liste Cloud Pub/Sub-Thema auswählen das Pub/Sub-Thema aus, das Sie zuvor erstellt haben.

  9. Wählen Sie im Feld Dienstkonto das zuvor erstellte Dienstkonto aus.

    Das Dienstkonto dient als Identität des Triggers. Sie sollten dem Dienstkonto bereits die folgenden Rollen zugewiesen haben:

    • Eventarc-Ereignisempfänger: zum Empfangen von Ereignissen
    • Workflow-Aufrufer: zum Ausführen von Workflows
  10. Klicken Sie auf Trigger speichern.

    Der Eventarc-Trigger wird jetzt im Abschnitt Trigger auf der Seite Workflow bearbeiten angezeigt.

  11. Klicken Sie auf Weiter.

  12. Klicken Sie auf Bereitstellen.

gcloud

Erstellen Sie einen Eventarc-Trigger mit dem folgenden Befehl:

gcloud eventarc triggers create message-payload-trigger \
    --location=us-central1 \
    --destination-workflow=message-payload-workflow \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID Ihres Google Cloud -Projekts
  • SERVICE_ACCOUNT_NAME: Der Name des zuvor erstellten Dienstkontos.

Das Dienstkonto dient als Identität des Triggers. Sie sollten dem Dienstkonto bereits die folgenden Rollen zugewiesen haben:

  • roles/eventarc.eventReceiver: Ereignisse empfangen
  • roles/workflows.invoker: zum Ausführen von Workflows

Workflow auslösen

Testen Sie das End-to-End-System, indem Sie eine Nachricht im Pub/Sub-Thema veröffentlichen und ein Ereignis generieren. Weitere Informationen finden Sie unter Workflow mit Ereignissen oder Pub/Sub-Nachrichten auslösen.

  1. Senden Sie eine Nachricht an das Pub/Sub-Thema, um ein Ereignis zu generieren:

    gcloud pubsub topics publish message-payload-topic --message="Hello World"
    

    Das Ereignis wird an den Workflow weitergeleitet, der die Ereignisnachricht protokolliert, die Ereignisdaten im Cloud Storage-Bucket speichert und den Cloud Run-Job ausführt, um die in Cloud Storage gespeicherten Daten zu verarbeiten. Das kann einen Moment dauern.

  2. Prüfen Sie, ob der Cloud Run-Job wie erwartet ausgeführt wurde, indem Sie die Jobausführungen ansehen:

    gcloud run jobs executions list --job=message-payload-job

    In der Ausgabe sollte eine neue Jobausführung angezeigt werden.

  3. Führen Sie den folgenden Befehl aus, um die ereignisbezogenen Logeinträge aufzurufen, die durch das Auslösen des Workflows erstellt wurden:

    gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
    
  4. Suchen Sie nach einem Logeintrag wie dem folgenden:

    textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\
    \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}"
    ...
    resource:
    labels:
      job_name: message-payload-job
      location: us-central1
      project_id: MY_PROJECT
    type: cloud_run_job
    textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
    
  5. Sie können prüfen, ob die Ergebnisse wie erwartet sind, indem Sie sich die Ereignisdaten im Cloud Storage-Bucket-Objekt ansehen.

    1. Rufen Sie den Namen Ihres Buckets ab:

      gcloud storage ls

      Die Ausgabe sieht etwa so aus:

      gs://message-payload-PROJECT_ID/

    2. Objekte in Ihrem Bucket auflisten:

      gcloud storage ls gs://message-payload-PROJECT_ID/** --recursive

      Die Ausgabe sollte in etwa so aussehen:

      gs://message-payload-PROJECT_ID/OBJECT_ID.data.json

      Notieren Sie sich die OBJECT_ID, die Sie im nächsten Schritt benötigen.

    3. So laden Sie das Objekt in Ihrem Bucket als Datei herunter:

      gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt

      Ersetzen Sie OBJECT_ID durch die im vorherigen Schritt zurückgegebene ID.

    4. Öffnen Sie die Datei message-event.txt in einem Texteditor. Der in die Datei geschriebene Ereignistext sollte in etwa so aussehen:

      {
        "message": {
          "data": "SGVsbG8gV29ybGQ=",
          "messageId": "8254002311197919",
          "publishTime": "2023-09-20T16:54:29.312Z"
        },
        "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741"
      }
      

      Beachten Sie, dass wenn Sie den Datenwert von SGVsbG8gV29ybGQ= aus seinem Base64-Format decodieren, „Hello World“ zurückgegeben wird.