Fazer polling para conclusão do job do BigQuery

Ao iniciar um job do BigQuery usando o conector do BigQuery, seu fluxo de trabalho pode não esperar automaticamente a conclusão do job. Se as etapas subsequentes dependerem do sucesso do job, implemente um mecanismo de pesquisa.

Muitos conectores do Workflows retornam um recurso de operação de longa duração (LRO) que pode ser pesquisado. Embora alguns conectores retornem um recurso do tipo Operation para representar uma LRO, o BigQuery usa um recurso Job para essa finalidade. Outros serviços, como o Cloud Build ou o Compute Engine, retornam um recurso Operation real. Para conferir uma lista dos tipos de recursos de LRO, consulte Conectores compatíveis.

Sondagem automática

É possível fazer uma pesquisa automática para a conclusão de um job do BigQuery ao usar métodos de conector que retornam um recurso LRO, como googleapis.bigquery.v2.jobs.insert.

Quando você usa um método de conector, o Workflows bloqueia a execução do fluxo de trabalho até que a operação seja concluída ou falhe. O campo connector_params permite personalizar o comportamento de pesquisa, incluindo o tempo limite total e a frequência das verificações de status. Para mais informações, consulte Invocar uma chamada de conector.

Por exemplo, o fluxo de trabalho a seguir demonstra como iniciar um job do BigQuery usando o método jobs.insert e configurar a pesquisa do job com uma política que usa configurações personalizadas em vez das configurações padrão:

YAML

# This workflow demonstrates how to automatically poll for the completion of
# a BigQuery job when using a connector method that return an LRO resource.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
    - run_bigquery_job:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${project_id}
          body:
            configuration:
              query:
                query: ${query}
                useLegacySql: false
          connector_params:
            timeout: 3600  # Total time in seconds to wait for the job; default is 1800
            polling_policy:
              initial_delay: 2.0  # Seconds to wait before the first poll; default is 1.0
              multiplier: 1.5     # Factor by which to increase delay between polls; default is 1.25
              max_delay: 60.0     # Maximum delay in seconds between polls; default is 60.0
        result: job_status
    - finish:
        return: ${job_status}

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            },
            {
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          ]
        }
      },
      {
        "run_bigquery_job": {
          "call": "googleapis.bigquery.v2.jobs.insert",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "configuration": {
                "query": {
                  "query": "${query}",
                  "useLegacySql": false
                }
              }
            },
            "connector_params": {
              "timeout": 3600,
              "polling_policy": {
                "initial_delay": 2,
                "multiplier": 1.5,
                "max_delay": 60
              }
            }
          },
          "result": "job_status"
        }
      },
      {
        "finish": {
          "return": "${job_status}"
        }
      }
    ]
  }
}

Sondagem manual

A pesquisa automática só se aplica a métodos que retornam um recurso LRO. Métodos síncronos padrão, como googleapis.bigquery.v2.jobs.query, não são compatíveis com a pesquisa automática e podem exigir pesquisa manual se a consulta demorar mais do que o tempo limite interno da API.

Por exemplo, o fluxo de trabalho a seguir demonstra como fazer uma pesquisa para a conclusão de um job do BigQuery diretamente no fluxo de trabalho usando o método getQueryResults:

YAML

# This workflow demonstrates how to manually poll for the completion of
# a BigQuery job by using the `getQueryResults` method.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
    - startQuery:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${project_id}
          body:
            useLegacySql: false
            query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
        result: queryResponse
    - getJobId:
        assign:
          - jobId: ${queryResponse.jobReference.jobId}
    - pollingLoop:
        steps:
          - checkStatus:
              call: googleapis.bigquery.v2.jobs.getQueryResults
              args:
                projectId: ${project_id}
                jobId: ${jobId}
                timeoutMs: 10000  # Default wait time per call
              result: jobStatus
          - checkIfDone:
              switch:
                - condition: ${jobStatus.jobComplete}
                  return: ${jobStatus}  # Job is finished
          - wait:
              call: sys.sleep
              args:
                seconds: 5
              next: checkStatus  # Repeat check

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            }
          ]
        }
      },
      {
        "startQuery": {
          "call": "googleapis.bigquery.v2.jobs.query",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "useLegacySql": false,
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          },
          "result": "queryResponse"
        }
      },
      {
        "getJobId": {
          "assign": [
            {
              "jobId": "${queryResponse.jobReference.jobId}"
            }
          ]
        }
      },
      {
        "pollingLoop": {
          "steps": [
            {
              "checkStatus": {
                "call": "googleapis.bigquery.v2.jobs.getQueryResults",
                "args": {
                  "projectId": "${project_id}",
                  "jobId": "${jobId}",
                  "timeoutMs": 10000
                },
                "result": "jobStatus"
              }
            },
            {
              "checkIfDone": {
                "switch": [
                  {
                    "condition": "${jobStatus.jobComplete}",
                    "return": "${jobStatus}"
                  }
                ]
              }
            },
            {
              "wait": {
                "call": "sys.sleep",
                "args": {
                  "seconds": 5
                },
                "next": "checkStatus"
              }
            }
          ]
        }
      }
    ]
  }
}

A seguir