Esegui un job Cloud Run che elabora i dati sugli eventi salvati in Cloud Storage

Puoi utilizzare Workflows per eseguire un job Cloud Run nell'ambito di un flusso di lavoro che esegue un'elaborazione dei dati più complessa o che orchestra un sistema di job esistenti.

Questo tutorial mostra come utilizzare Workflows per eseguire un job Cloud Run che elabora i dati degli eventi archiviati in un bucket Cloud Storage. L'archiviazione del payload dell'evento in un bucket Cloud Storage ti consente di criptare i dati utilizzando le chiavi di crittografia gestite dal cliente, il che non è possibile se trasferisci i dati degli eventi come variabili di ambiente al job Cloud Run.

Il seguente diagramma fornisce una panoramica generale:

L'evento Pub/Sub viene instradato dal trigger Eventarc a Workflows e salvato nel bucket Cloud Storage. Il job Cloud Run elabora i dati degli eventi archiviati nel bucket.

Crea un job Cloud Run

Questo tutorial utilizza codice campione che puoi trovare su GitHub. Lo script di deployment crea un'immagine container per creare un job Cloud Run. Lo script crea anche un bucket Cloud Storage. Il job Cloud Run legge i dati degli eventi archiviati nel bucket Cloud Storage e li stampa.

  1. Se esegui lo script di deployment in Cloud Shell e se l'account di servizio predefinito di Compute Engine non dispone del ruolo Editor, concedi i seguenti ruoli nel progetto all'account di servizio predefinito di Compute Engine. Altrimenti, puoi saltare questo passaggio e procedere con la clonazione del repository dell'app di esempio nel passaggio successivo.

    1. Concedi il ruolo Writer Artifact Registry (roles/artifactregistry.writer):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/artifactregistry.writer

      Sostituisci PROJECT_NUMBER con il numero del tuo progetto Google Cloud. Puoi trovare il numero di progetto nella pagina Benvenuto della console Google Cloud o eseguendo questo comando:

      gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    2. Concedi il ruolo Storage Object User (roles/storage.objectUser):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/storage.objectUser
    3. Concedi il ruolo Writer log di Logging (roles/logging.logWriter):

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/logging.logWriter
  2. Recupera il codice campione clonando il repository dell'app di esempio sulla tua macchina locale:

    git clone https://github.com/GoogleCloudPlatform/workflows-demos.git

    In alternativa, puoi scaricare il campione come file ZIP

  3. Passa alla directory che contiene il codice di esempio:

    cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
  4. Crea il job Cloud Run eseguendo lo script di deployment:

    ./deploy-job.sh

Lo script crea un bucket Cloud Storage con il nome message-payload-PROJECT_ID, dove PROJECT_ID è l'ID del tuo progetto Google Cloud . Viene creato anche un job Cloud Run denominato message-payload-job.

Esegui il deployment di un flusso di lavoro che esegue il job Cloud Run

Definisci ed esegui il deployment di un flusso di lavoro che esegue il job Cloud Run che hai appena creato. Una definizione di workflow è costituita da una serie di passaggi descritti utilizzando la sintassi di Workflows.

Il flusso di lavoro riceve un evento, salva i dati dell'evento in un bucket Cloud Storage ed esegue un job Cloud Run per elaborare i dati dell'evento.

Console

  1. Nella console Google Cloud , vai alla pagina Workflows:

    Vai a Flussi di lavoro

  2. Fai clic su Crea.

  3. Inserisci un nome per il nuovo flusso di lavoro, ad esempio message-payload-workflow.

  4. Scegli una regione appropriata, ad esempio us-central1.

  5. Nel campo Service account, seleziona il account di servizio che hai creato in precedenza.

    L'account di servizio funge da identità del flusso di lavoro. Dovresti aver già concesso i seguenti ruoli al account di servizio:

    • Amministratore di Cloud Run: per eseguire job Cloud Run
    • Writer log: per scrivere voci di log
    • Storage Object Creator: per creare oggetti in Cloud Storage
  6. Fai clic su Avanti.

  7. Nell'editor del workflow, inserisci la seguente definizione per il workflow:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. Fai clic su Esegui il deployment.

gcloud

  1. Crea un file di codice sorgente per il workflow:

    touch message-payload-workflow.yaml
  2. Copia la seguente definizione del workflow in message-payload-workflow.yaml:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. Esegui il deployment del flusso di lavoro inserendo questo comando:

    gcloud workflows deploy message-payload-workflow \
        --location=us-central1 \
        --source=message-payload-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Sostituisci quanto segue:

    • SERVICE_ACCOUNT_NAME: il nome del account di servizio che hai creato in precedenza
    • PROJECT_ID: l'ID del tuo Google Cloud progetto

    L'account di servizio funge da identità del flusso di lavoro. Dovresti aver già concesso i seguenti ruoli al account di servizio:

    • roles/logging.logWriter: per scrivere voci di log
    • roles/run.admin: per eseguire i job Cloud Run
    • roles/storage.objectCreator: per creare oggetti in Cloud Storage

Il flusso di lavoro esegue le seguenti operazioni:

  1. init: accetta un evento come argomento e imposta le variabili necessarie.

  2. Passaggio log_event: crea una voce di log in Cloud Logging utilizzando la funzione sys.log.

  3. Passaggio write_payload_to_gcs: esegue una richiesta HTTP POST e scrive i dati del payload dell'evento in un file del bucket Cloud Storage.

  4. run_job_to_process_payload: utilizza il metodo del connettore API Cloud Run Admin, googleapis.run.v1.namespaces.jobs.run, per eseguire il job. Il bucket Cloud Storage e il nome del file di dati vengono trasferiti come variabili di override dal flusso di lavoro al job.

  5. Passaggio finish: restituisce informazioni sull'esecuzione del job come risultato del workflow.

Crea un argomento Pub/Sub

Crea un argomento Pub/Sub in modo da poter pubblicare un messaggio. Un evento Pub/Sub viene utilizzato per dimostrare come instradare un evento utilizzando Workflows e salvare l'evento in Cloud Storage in modo che un job Cloud Run possa elaborare i dati dell'evento.

Console

  1. Nella console Google Cloud , vai alla pagina Argomenti.

    Vai ad Argomenti

  2. Fai clic su Crea argomento.

  3. Nel campo ID argomento, inserisci un ID per l'argomento, ad esempio message-payload-topic.

  4. Mantieni l'opzione Aggiungi una sottoscrizione predefinita.

  5. Non selezionare le altre opzioni.

  6. Fai clic su Crea.

gcloud

Per creare un argomento con ID message-payload-topic, esegui il comando gcloud pubsub topics create:

gcloud pubsub topics create message-payload-topic

Crea un trigger Eventarc per il routing degli eventi al flusso di lavoro

Per eseguire automaticamente il workflow e, di conseguenza, il job Cloud Run, crea un trigger Eventarc che risponda agli eventi Pub/Sub e che li indirizzi al workflow. Ogni volta che viene scritto un messaggio nell'argomento Pub/Sub, l'evento attiva l'esecuzione del flusso di lavoro.

Console

  1. Nella console Google Cloud , vai alla pagina Workflows:

    Vai a Flussi di lavoro

  2. Fai clic sul nome del flusso di lavoro, ad esempio message-payload-workflow.

  3. Nella pagina Dettagli del flusso di lavoro, fai clic su Modifica.

  4. Nella pagina Modifica flusso di lavoro, nella sezione Trigger, fai clic su Aggiungi nuovo trigger > Eventarc.

    Viene visualizzato il riquadro Trigger Eventarc.

  5. Nel campo Nome trigger, inserisci un nome per il trigger, ad esempio message-payload-trigger.

  6. Nell'elenco Provider di eventi, seleziona Cloud Pub/Sub.

  7. Nell'elenco Evento, seleziona google.cloud.pubsub.topic.v1.messagePublished.

  8. Nell'elenco Seleziona un argomento Cloud Pub/Sub, seleziona l'argomento Pub/Sub che hai creato in precedenza.

  9. Nel campo Service account, seleziona il account di servizio che hai creato in precedenza.

    L'account di servizio funge da identità del trigger. Dovresti aver già concesso i seguenti ruoli al account di servizio:

    • Eventarc Event Receiver: per ricevere eventi
    • Workflows Invoker: per eseguire i workflow
  10. Fai clic su Salva trigger.

    Il trigger Eventarc ora viene visualizzato nella sezione Trigger della pagina Modifica flusso di lavoro.

  11. Fai clic su Avanti.

  12. Fai clic su Esegui il deployment.

gcloud

Crea un trigger Eventarc eseguendo il seguente comando:

gcloud eventarc triggers create message-payload-trigger \
    --location=us-central1 \
    --destination-workflow=message-payload-workflow \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del tuo progetto Google Cloud
  • SERVICE_ACCOUNT_NAME: il nome del service account creato in precedenza.

L'account di servizio funge da identità del trigger. Dovresti aver già concesso i seguenti ruoli al account di servizio:

  • roles/eventarc.eventReceiver: per ricevere eventi
  • roles/workflows.invoker: per eseguire i workflow

Attivare il workflow

Testa il sistema end-to-end pubblicando un messaggio nell'argomento Pub/Sub e generando un evento. Per maggiori informazioni, consulta la sezione Attivare un flusso di lavoro con eventi o messaggi Pub/Sub.

  1. Pubblica un messaggio nell'argomento Pub/Sub per generare un evento:

    gcloud pubsub topics publish message-payload-topic --message="Hello World"
    

    L'evento viene indirizzato al flusso di lavoro che registra il messaggio dell'evento, salva i dati dell'evento nel bucket Cloud Storage ed esegue il job Cloud Run per elaborare i dati salvati in Cloud Storage. L'operazione potrebbe richiedere un minuto.

  2. Verifica che il job Cloud Run sia stato eseguito come previsto visualizzando le esecuzioni del job:

    gcloud run jobs executions list --job=message-payload-job

    Nell'output dovresti vedere una nuova esecuzione del job.

  3. Per visualizzare le voci di log correlate all'evento create attivando il flusso di lavoro, esegui questo comando:

    gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
    
  4. Cerca una voce di log simile alla seguente:

    textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\
    \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}"
    ...
    resource:
    labels:
      job_name: message-payload-job
      location: us-central1
      project_id: MY_PROJECT
    type: cloud_run_job
    textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
    
  5. Puoi verificare che i risultati siano quelli previsti visualizzando i dati degli eventi nell'oggetto bucket Cloud Storage.

    1. Recupera il nome del bucket:

      gcloud storage ls

      L'output è simile al seguente:

      gs://message-payload-PROJECT_ID/

    2. Elenca gli oggetti nel bucket:

      gcloud storage ls gs://message-payload-PROJECT_ID/** --recursive

      L'output dovrebbe essere simile al seguente:

      gs://message-payload-PROJECT_ID/OBJECT_ID.data.json

      Prendi nota di OBJECT_ID da utilizzare nel passaggio successivo.

    3. Scarica l'oggetto nel bucket come file:

      gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt

      Sostituisci OBJECT_ID con l'ID restituito nel passaggio precedente.

    4. In un editor di testo, apri il file message-event.txt. Il corpo dell'evento scritto nel file dovrebbe essere simile al seguente:

      {
        "message": {
          "data": "SGVsbG8gV29ybGQ=",
          "messageId": "8254002311197919",
          "publishTime": "2023-09-20T16:54:29.312Z"
        },
        "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741"
      }
      

      Tieni presente che se decodifichi il valore dei dati di SGVsbG8gV29ybGQ= dal formato Base64, viene restituito "Hello World".