Esegui il polling per il completamento del job BigQuery

Quando avvii un job BigQuery utilizzando il connettore BigQuery, il flusso di lavoro potrebbe non attendere automaticamente il completamento del job. Se i passaggi successivi dipendono dal successo del job, devi implementare un meccanismo di polling.

Molti connettori Workflows restituiscono una risorsa di operazione a lunga esecuzione (LRO) che può essere sottoposta a polling. Mentre alcuni connettori restituiscono una risorsa di tipo Operation per rappresentare una LRO, BigQuery utilizza una Job risorsa a questo scopo. Altri servizi, come Cloud Build o Compute Engine, restituiscono una risorsa Operation effettiva. Per un elenco dei tipi di risorse LRO, consulta Connettori supportati.

Polling automatico

Puoi eseguire automaticamente il polling per il completamento di un job BigQuery quando utilizzi metodi di connettori che restituiscono una risorsa LRO, ad esempio googleapis.bigquery.v2.jobs.insert.

Quando utilizzi un metodo di connettore di questo tipo, Workflows blocca l'esecuzione del flusso di lavoro finché l'operazione non va a buon fine o non genera un errore. Il campo connector_params ti consente di personalizzare il comportamento di polling, inclusi il timeout totale e la frequenza dei controlli di stato. Per saperne di più, vedi Richiamare una chiamata al connettore.

Ad esempio, il seguente flusso di lavoro mostra l'avvio di un job BigQuery utilizzando il metodo jobs.insert e la configurazione del polling del job con un criterio che utilizza impostazioni personalizzate anziché quelle predefinite:

YAML

# This workflow demonstrates how to automatically poll for the completion of
# a BigQuery job when using a connector method that return an LRO resource.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
    - run_bigquery_job:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${project_id}
          body:
            configuration:
              query:
                query: ${query}
                useLegacySql: false
          connector_params:
            timeout: 3600  # Total time in seconds to wait for the job; default is 1800
            polling_policy:
              initial_delay: 2.0  # Seconds to wait before the first poll; default is 1.0
              multiplier: 1.5     # Factor by which to increase delay between polls; default is 1.25
              max_delay: 60.0     # Maximum delay in seconds between polls; default is 60.0
        result: job_status
    - finish:
        return: ${job_status}

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            },
            {
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          ]
        }
      },
      {
        "run_bigquery_job": {
          "call": "googleapis.bigquery.v2.jobs.insert",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "configuration": {
                "query": {
                  "query": "${query}",
                  "useLegacySql": false
                }
              }
            },
            "connector_params": {
              "timeout": 3600,
              "polling_policy": {
                "initial_delay": 2,
                "multiplier": 1.5,
                "max_delay": 60
              }
            }
          },
          "result": "job_status"
        }
      },
      {
        "finish": {
          "return": "${job_status}"
        }
      }
    ]
  }
}

Polling manuale

Il polling automatico si applica solo ai metodi che restituiscono una risorsa LRO. I metodi sincroni standard come googleapis.bigquery.v2.jobs.query non supportano il polling automatico e potrebbero richiedere il polling manuale se la query richiede più tempo del timeout dell'API interna.

Ad esempio, il seguente flusso di lavoro mostra come eseguire il polling per il completamento di un job BigQuery direttamente all'interno del flusso di lavoro utilizzando il getQueryResults metodo:

YAML

# This workflow demonstrates how to manually poll for the completion of
# a BigQuery job by using the `getQueryResults` method.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
    - startQuery:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${project_id}
          body:
            useLegacySql: false
            query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
        result: queryResponse
    - getJobId:
        assign:
          - jobId: ${queryResponse.jobReference.jobId}
    - pollingLoop:
        steps:
          - checkStatus:
              call: googleapis.bigquery.v2.jobs.getQueryResults
              args:
                projectId: ${project_id}
                jobId: ${jobId}
                timeoutMs: 10000  # Default wait time per call
              result: jobStatus
          - checkIfDone:
              switch:
                - condition: ${jobStatus.jobComplete}
                  return: ${jobStatus}  # Job is finished
          - wait:
              call: sys.sleep
              args:
                seconds: 5
              next: checkStatus  # Repeat check

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            }
          ]
        }
      },
      {
        "startQuery": {
          "call": "googleapis.bigquery.v2.jobs.query",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "useLegacySql": false,
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          },
          "result": "queryResponse"
        }
      },
      {
        "getJobId": {
          "assign": [
            {
              "jobId": "${queryResponse.jobReference.jobId}"
            }
          ]
        }
      },
      {
        "pollingLoop": {
          "steps": [
            {
              "checkStatus": {
                "call": "googleapis.bigquery.v2.jobs.getQueryResults",
                "args": {
                  "projectId": "${project_id}",
                  "jobId": "${jobId}",
                  "timeoutMs": 10000
                },
                "result": "jobStatus"
              }
            },
            {
              "checkIfDone": {
                "switch": [
                  {
                    "condition": "${jobStatus.jobComplete}",
                    "return": "${jobStatus}"
                  }
                ]
              }
            },
            {
              "wait": {
                "call": "sys.sleep",
                "args": {
                  "seconds": 5
                },
                "next": "checkStatus"
              }
            }
          ]
        }
      }
    ]
  }
}

Passaggi successivi