Abfragen, ob ein BigQuery-Job abgeschlossen ist

Wenn Sie einen BigQuery-Job mit dem BigQuery-Connector starten, wartet Ihr Workflow möglicherweise nicht automatisch auf den Abschluss des Jobs. Wenn nachfolgende Schritte vom Erfolg des Jobs abhängen, sollten Sie einen Polling-Mechanismus implementieren.

Viele Workflows-Connectors geben eine Vorgangsressource mit langer Ausführungszeit (LRO) zurück, die abgefragt werden kann. Während einige Connectors eine Ressource vom Typ Operation zurückgeben, um einen LRO darzustellen, verwendet BigQuery dafür eine Job-Ressource. Andere Dienste wie Cloud Build oder Compute Engine geben eine tatsächliche Operation-Ressource zurück. Eine Liste der LRO-Ressourcentypen finden Sie unter Unterstützte Connectors.

Automatische Abfragen

Sie können automatisch abfragen, ob ein BigQuery-Job abgeschlossen ist, wenn Sie Connectormethoden verwenden, die eine LRO-Ressource zurückgeben, z. B. googleapis.bigquery.v2.jobs.insert.

Wenn Sie eine solche Connectormethode verwenden, wird die Workflowausführung in Workflows blockiert, bis der Vorgang erfolgreich abgeschlossen wird oder fehlschlägt. Mit dem Feld connector_params können Sie das Polling-Verhalten anpassen, einschließlich des gesamten Zeitlimits und der Häufigkeit der Statusprüfungen. Weitere Informationen finden Sie unter Connector-Aufruf aufrufen.

Im folgenden Workflow wird beispielsweise gezeigt, wie Sie einen BigQuery-Job mit der Methode jobs.insert starten und das Polling des Jobs mit einer Richtlinie konfigurieren, die benutzerdefinierte Einstellungen anstelle der Standardeinstellungen verwendet:

YAML

# This workflow demonstrates how to automatically poll for the completion of
# a BigQuery job when using a connector method that return an LRO resource.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
    - run_bigquery_job:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${project_id}
          body:
            configuration:
              query:
                query: ${query}
                useLegacySql: false
          connector_params:
            timeout: 3600  # Total time in seconds to wait for the job; default is 1800
            polling_policy:
              initial_delay: 2.0  # Seconds to wait before the first poll; default is 1.0
              multiplier: 1.5     # Factor by which to increase delay between polls; default is 1.25
              max_delay: 60.0     # Maximum delay in seconds between polls; default is 60.0
        result: job_status
    - finish:
        return: ${job_status}

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            },
            {
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          ]
        }
      },
      {
        "run_bigquery_job": {
          "call": "googleapis.bigquery.v2.jobs.insert",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "configuration": {
                "query": {
                  "query": "${query}",
                  "useLegacySql": false
                }
              }
            },
            "connector_params": {
              "timeout": 3600,
              "polling_policy": {
                "initial_delay": 2,
                "multiplier": 1.5,
                "max_delay": 60
              }
            }
          },
          "result": "job_status"
        }
      },
      {
        "finish": {
          "return": "${job_status}"
        }
      }
    ]
  }
}

Manuelle Abfrage

Das automatische Polling gilt nur für Methoden, die eine LRO-Ressource zurückgeben. Standardmäßige synchrone Methoden wie googleapis.bigquery.v2.jobs.query unterstützen kein automatisches Polling und erfordern möglicherweise manuelles Polling, wenn die Abfrage länger als das interne API-Zeitlimit dauert.

Im folgenden Workflow wird beispielsweise gezeigt, wie Sie direkt im Workflow mit der Methode getQueryResults abfragen, ob ein BigQuery-Job abgeschlossen ist:

YAML

# This workflow demonstrates how to manually poll for the completion of
# a BigQuery job by using the `getQueryResults` method.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
    - startQuery:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${project_id}
          body:
            useLegacySql: false
            query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
        result: queryResponse
    - getJobId:
        assign:
          - jobId: ${queryResponse.jobReference.jobId}
    - pollingLoop:
        steps:
          - checkStatus:
              call: googleapis.bigquery.v2.jobs.getQueryResults
              args:
                projectId: ${project_id}
                jobId: ${jobId}
                timeoutMs: 10000  # Default wait time per call
              result: jobStatus
          - checkIfDone:
              switch:
                - condition: ${jobStatus.jobComplete}
                  return: ${jobStatus}  # Job is finished
          - wait:
              call: sys.sleep
              args:
                seconds: 5
              next: checkStatus  # Repeat check

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            }
          ]
        }
      },
      {
        "startQuery": {
          "call": "googleapis.bigquery.v2.jobs.query",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "useLegacySql": false,
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          },
          "result": "queryResponse"
        }
      },
      {
        "getJobId": {
          "assign": [
            {
              "jobId": "${queryResponse.jobReference.jobId}"
            }
          ]
        }
      },
      {
        "pollingLoop": {
          "steps": [
            {
              "checkStatus": {
                "call": "googleapis.bigquery.v2.jobs.getQueryResults",
                "args": {
                  "projectId": "${project_id}",
                  "jobId": "${jobId}",
                  "timeoutMs": 10000
                },
                "result": "jobStatus"
              }
            },
            {
              "checkIfDone": {
                "switch": [
                  {
                    "condition": "${jobStatus.jobComplete}",
                    "return": "${jobStatus}"
                  }
                ]
              }
            },
            {
              "wait": {
                "call": "sys.sleep",
                "args": {
                  "seconds": 5
                },
                "next": "checkStatus"
              }
            }
          ]
        }
      }
    ]
  }
}

Nächste Schritte