Attendere gli eventi utilizzando i callback e gli trigger Eventarc

Il workflow potrebbe dover attendere un processo esterno. Puoi utilizzare i callback HTTP per attendere che un altro servizio effettui una richiesta a un endpoint di callback; questa richiesta riprende l'esecuzione del flusso di lavoro. Puoi anche attendere utilizzando il polling.

Anziché utilizzare il polling, questo tutorial mostra come attendere eventi o messaggi Pub/Sub utilizzando i callback HTTP e i trigger Eventarc. Anche se puoi attivare un workflow con eventi o messaggi Pub/Sub, potresti voler interrompere l'esecuzione per attendere un altro evento prima di continuare. Ad esempio, un evento attiva un workflow per avviare un processo, ma il workflow deve attendere un altro evento che segnala il completamento del processo. Puoi implementarlo facendo in modo che un flusso di lavoro richiami un altro flusso di lavoro.

Crea un database Firestore

Firestore archivia i dati in documenti che contengono campi che mappano i valori. Questi documenti vengono archiviati in raccolte, che sono contenitori per i documenti che puoi utilizzare per organizzare i dati e creare query. Scopri di più su Firestore.

Tieni presente che ogni Google Cloud progetto è limitato a un database Firestore. Completa i seguenti passaggi se devi creare un nuovo database.

Console

  1. Nella console Google Cloud , vai alla pagina Inizia di Firestore.

    Vai a Inizia

  2. Fai clic su Seleziona modalità Native.

    Per indicazioni sulla selezione di una modalità di database e per un confronto funzionalità per funzionalità, vedi Scelta tra la modalità nativa e la modalità Datastore.

  3. Nell'elenco Seleziona una località, seleziona nam5 (Stati Uniti).

    La località si applica sia al database Firestore sia all'applicazione App Engine nel tuo progetto Google Cloud . Una volta creato il database, non puoi modificarne la posizione.

  4. Fai clic su Crea database.

gcloud

Per creare un database Firestore, devi prima creare un'applicazione App Engine ed eseguire il comando gcloud firestore databases create:

gcloud app create --region=us-central
gcloud firestore databases create --region=us-central

Puoi ignorare l'avviso us-central is not a valid Firestore location. App Engine e Firestore supportano le stesse località, ma la regione us-central (Iowa) di App Engine corrisponde alla multi-regione nam5 (Stati Uniti) di Firestore.

Crea un argomento Pub/Sub

Questo tutorial utilizza Pub/Sub come origine eventi. Crea un argomento Pub/Sub in modo da poter pubblicare un messaggio. Scopri di più sulla creazione e sulla gestione degli argomenti.

Console

  1. Nella console Google Cloud , vai alla pagina Argomenti di Pub/Sub.

    Vai ad Argomenti

  2. Fai clic su Crea argomento.

  3. Nel campo ID argomento, inserisci topic-callback.

  4. Accetta gli altri valori predefiniti.

  5. Fai clic su Crea argomento.

gcloud

Per creare un argomento, esegui il comando gcloud pubsub topics create:

gcloud pubsub topics create topic-callback

Crea un bucket Cloud Storage

Questo tutorial utilizza Cloud Storage come origine evento. Crea un bucket Cloud Storage in modo da poter caricare un file. Scopri di più sulla creazione di bucket di archiviazione.

Console

  1. Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.

    Vai a Cloud Storage

  2. Fai clic su Crea.

  3. Come Nome del bucket, inserisci PROJECT_ID-bucket-callback.

    L'ID progetto viene utilizzato nel flusso di lavoro callback-event-sample per identificare il bucket.

  4. Fai clic su Continua.

  5. In Tipo di località, seleziona Regione e poi us-central1 (Iowa).

  6. Accetta gli altri valori predefiniti.

  7. Fai clic su Crea.

gcloud

Per creare un bucket, esegui il comando gcloud storage buckets create:

gcloud storage buckets create gs://PROJECT_ID-bucket-callback \
    --location=us-central1

L'ID progetto viene utilizzato nel flusso di lavoro callback-event-sample per identificare il bucket.

Dopo aver creato le origini eventi, puoi eseguire il deployment del flusso di lavoro del ricevitore di eventi.

Esegui il deployment di un flusso di lavoro che è in ascolto di eventi

Il flusso di lavoro callback-event-listener viene attivato quando un messaggio viene pubblicato in un argomento Pub/Sub o quando un file viene caricato in un bucket Cloud Storage. Il flusso di lavoro riceve l'evento, recupera i dettagli di callback appropriati dal database Firestore e poi invia una richiesta HTTP all'endpoint di callback.

Console

  1. Nella console Google Cloud , vai alla pagina Workflows:

    Vai a Flussi di lavoro

  2. Fai clic su Crea.

  3. Inserisci un nome per il nuovo workflow: callback-event-listener.

  4. Nell'elenco Regione, seleziona us-central1.

  5. Seleziona il service account che hai creato in precedenza.

  6. Fai clic su Avanti.

  7. Nell'editor del workflow, inserisci la seguente definizione per il workflow:

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  8. Fai clic su Esegui il deployment.

gcloud

  1. Crea un file di codice sorgente per il workflow:

    touch callback-event-listener.yaml
  2. In un editor di testo, copia il seguente flusso di lavoro nel file del codice sorgente:

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  3. Esegui il deployment del flusso di lavoro inserendo questo comando:

    gcloud workflows deploy callback-event-listener \
        --source=callback-event-listener.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Sostituisci SERVICE_ACCOUNT_NAME con il nome del account di servizio che hai creato in precedenza.

Esegui il deployment di un workflow che attende eventi

Il flusso di lavoro callback-event-sample memorizza i dettagli del callback in un database Firestore, interrompe l'esecuzione e attende che si verifichino eventi specifici.

Console

  1. Nella console Google Cloud , vai alla pagina Workflows:

    Vai a Flussi di lavoro

  2. Fai clic su Crea.

  3. Inserisci un nome per il nuovo workflow: callback-event-sample.

  4. Nell'elenco Regione, seleziona us-central1.

  5. Seleziona il service account che hai creato in precedenza.

  6. Fai clic su Avanti.

  7. Nell'editor del workflow, inserisci la seguente definizione per il workflow:

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  8. Fai clic su Esegui il deployment.

gcloud

  1. Crea un file di codice sorgente per il workflow:

    touch callback-event-sample.yaml
  2. In un editor di testo, copia il seguente flusso di lavoro nel file del codice sorgente:

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  3. Esegui il deployment del flusso di lavoro inserendo questo comando:

    gcloud workflows deploy callback-event-sample \
        --source=callback-event-sample.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Sostituisci SERVICE_ACCOUNT_NAME con il nome del account di servizio che hai creato in precedenza.

Crea un trigger Eventarc per il routing degli eventi Pub/Sub

Un trigger Eventarc consente di instradare gli eventi specificando i filtri per il trigger, inclusa l'origine evento e il workflow di destinazione. Crea un trigger Eventarc per eseguire il flusso di lavoro callback-event-listener in seguito alla pubblicazione di un messaggio in un argomento Pub/Sub. Scopri di più sull'attivazione di un workflow.

Console

  1. Nella Google Cloud console, vai alla pagina Eventarc.

    Vai a Eventarc

  2. Fai clic su Crea trigger.

  3. Digita un Nome trigger.

    Ad esempio, trigger-pubsub-events-listener.

  4. Nell'elenco Provider di eventi, seleziona Cloud Pub/Sub.

  5. Nell'elenco Evento, seleziona google.cloud.pubsub.topic.v1.messagePublished in Personalizzato.

  6. Nell'elenco Seleziona un argomento Cloud Pub/Sub, seleziona l'argomento che hai creato in precedenza.

  7. Nell'elenco Regione, seleziona us-central1 (Iowa).

  8. Se richiesto, concedi il ruolo iam.serviceAccountTokenCreator all'account di servizio Pub/Sub.

  9. Seleziona il service account che hai creato in precedenza.

  10. Nell'elenco Destinazione evento, seleziona Workflows.

  11. Nell'elenco Seleziona un flusso di lavoro, seleziona il flusso di lavoro callback-event-listener.

  12. Fai clic su Crea.

gcloud

Per creare un trigger, esegui il comando gcloud eventarc triggers create:

gcloud eventarc triggers create trigger-pubsub-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=topic-callback \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Gli eventi vengono trasformati e passati all'esecuzione del flusso di lavoro come argomenti runtime. Tieni presente che l'attivazione del nuovo trigger può richiedere fino a 2 minuti.

Crea un trigger Eventarc per il routing degli eventi di Cloud Storage

Un trigger Eventarc consente di instradare gli eventi specificando i filtri per il trigger, inclusa l'origine evento e il workflow di destinazione. Crea un trigger Eventarc per eseguire il flusso di lavoro callback-event-listener in seguito al caricamento di un file in un bucket Cloud Storage. Scopri di più sull'attivazione di un workflow.

Console

  1. Nella console Google Cloud , vai alla pagina Eventarc.

    Vai a Eventarc

  2. Fai clic su Crea trigger.

  3. Digita un Nome trigger.

    Ad esempio, trigger-storage-events-listener.

  4. Nell'elenco Provider di eventi, seleziona Cloud Storage.

  5. Nell'elenco Evento, in Diretti, seleziona google.cloud.storage.object.v1.finalized.

  6. Nell'elenco Bucket, cerca il bucket che hai creato in precedenza e selezionalo.

  7. Nell'elenco Regione, in base al bucket Cloud Storage, accetta il valore predefinito us-central1 (Iowa).

  8. Se richiesto, concedi il ruolo iam.serviceAccountTokenCreator all'account di servizio Pub/Sub.

  9. Seleziona il service account che hai creato in precedenza.

  10. Nell'elenco Destinazione evento, seleziona Workflows.

  11. Nell'elenco Seleziona un flusso di lavoro, seleziona il flusso di lavoro callback-event-listener.

  12. Fai clic su Crea.

gcloud

Per creare un trigger, esegui il comando gcloud eventarc triggers create:

gcloud eventarc triggers create trigger-storage-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=PROJECT_ID-bucket-callback" \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Gli eventi vengono trasformati e passati all'esecuzione del flusso di lavoro come argomenti runtime. Tieni presente che l'attivazione del nuovo trigger può richiedere fino a 2 minuti.

Esegui il workflow principale

L'esecuzione di un workflow esegue la definizione attuale del workflow associata al workflow. Esegui il workflow callback-event-sample. Questo è il workflow principale e attende che si verifichino eventi specifici, riprendendo l'esecuzione solo quando il workflow secondario effettua le richieste di callback appropriate.

Console

  1. Nella Google Cloud console, vai alla pagina Workflows.

    Vai a Flussi di lavoro

  2. Nella pagina Workflows, fai clic sul flusso di lavoro callback-event-sample per accedere alla relativa pagina dei dettagli.

  3. Nella pagina Dettagli workflow, fai clic su Esegui.

  4. Fai di nuovo clic su Esegui.

    Viene avviata l'esecuzione del workflow. Durante l'esecuzione, dovresti vedere uno Stato di esecuzione Running e una voce di log simile alla seguente: Started waiting 1hr for an event from source topic-callback.

gcloud

Per eseguire un workflow, esegui il comando gcloud workflows run:

gcloud workflows run callback-event-sample \
    --location=us-central1

Viene avviata l'esecuzione del workflow. Durante l'esecuzione, dovresti vedere uno stato di esecuzione simile al seguente:

Waiting for execution [a848a164-268a-449c-b2fe-396f32f2ed66] to complete...working...

Generare eventi e controllare lo stato di esecuzione

Puoi verificare che i risultati siano quelli previsti generando eventi, visualizzando le voci di log e controllando lo stato di esecuzione del workflow.

Pubblica un messaggio

Pubblica un messaggio nell'argomento Pub/Sub che hai creato in precedenza.

Console

  1. Nella console Google Cloud , vai alla pagina Argomenti di Pub/Sub.

    Vai ad Argomenti

  2. Fai clic su topic-callback.

  3. Fai clic sulla scheda Messaggi.

  4. Fai clic su Pubblica messaggio.

  5. Nel campo Corpo del messaggio, inserisci Hello World.

  6. Fai clic su Pubblica.

gcloud

Per pubblicare un messaggio, utilizza il comando gcloud pubsub topics publish:

gcloud pubsub topics publish topic-callback \
    --message="Hello World"

Caricamento di un oggetto

Carica un file nel bucket Cloud Storage che hai creato in precedenza.

Console

  1. Nella console Google Cloud , vai alla pagina Bucket in Cloud Storage.

    Vai a Bucket

  2. Fai clic sul nome del bucket che hai creato in precedenza.

  3. Nella scheda Oggetti, esegui una delle seguenti operazioni:

    • Trascina il file desiderato dal desktop o dal file manager nel riquadro principale della console Google Cloud .

    • Fai clic su Carica file, seleziona il file che vuoi caricare e poi fai clic su Apri.

gcloud

Per caricare un file, esegui il comando gcloud storage cp:

gcloud storage cp OBJECT_LOCATION gs://PROJECT_ID-bucket-callback/

Sostituisci OBJECT_LOCATION con il percorso locale del tuo oggetto. Ad esempio, random.txt.

Visualizzare le voci di log e lo stato di esecuzione

Verifica che il flusso di lavoro callback-event-sample sia stato completato correttamente.

Console

  1. Nella Google Cloud console, vai alla pagina Workflows.

    Vai a Flussi di lavoro

  2. Nella pagina Workflows, fai clic sul flusso di lavoro callback-event-sample per accedere alla relativa pagina dei dettagli.

  3. Nella pagina Dettagli del flusso di lavoro, per recuperare i dettagli di una determinata esecuzione, fai clic sull'ID esecuzione appropriato.

    Lo Stato di esecuzione deve essere Riuscito e, nel riquadro Output, dovresti vedere gli eventi Pub/Sub e Cloud Storage ricevuti.

gcloud

  1. Filtra le voci di log e restituisci l'output in formato JSON:

    gcloud logging read "resource.type=workflows.googleapis.com/Workflow AND textPayload:calling OR textPayload:waiting" \
        --format=json
  2. Cerca voci di log simili a:

    "textPayload": "Stopped waiting for an event from source..."
    "textPayload": "Calling back url https://workflowexecutions.googleapis.com/v1/projects/..."
    "textPayload": "Started waiting 1hr for an event from source..."
    
  3. Controlla lo stato dell'ultimo tentativo di esecuzione:

    gcloud workflows executions wait-last

    Il risultato dovrebbe essere simile al seguente:

    Using cached execution name: projects/1085953646031/locations/us-central1/workflows/callback-event-sample/executions/79929e4e-82c1-4da1-b068-f828034c01b7
    Waiting for execution [79929e4e-82c1-4da1-b068-f828034c01b7] to complete...done.
    [...]
    state: SUCCEEDED