BigQuery ジョブの完了をポーリングする

BigQuery コネクタを使用して BigQuery ジョブを開始すると、ワークフローがジョブの完了を自動的に待機しないことがあります。後続の手順がジョブの成功に依存する場合は、ポーリング メカニズムを実装する必要があります。

多くの Workflows コネクタは、ポーリング可能な長時間実行オペレーション(LRO)リソースを返します。一部のコネクタは LRO を表すために Operation 型のリソースを返しますが、BigQuery はこの目的で Job リソースを使用します。Cloud Build や Compute Engine などの他のサービスは、実際の Operation リソースを返します。LRO リソースタイプの一覧については、サポートされているコネクタをご覧ください。

自動ポーリング

googleapis.bigquery.v2.jobs.insert などの LRO リソースを返すコネクタ メソッドを使用すると、BigQuery ジョブの完了を自動的にポーリングできます。

このようなコネクタ メソッドを使用すると、Workflows はオペレーションが成功または失敗するまでワークフローの実行をブロックします。connector_params フィールドを使用すると、合計タイムアウトやステータス チェックの頻度など、ポーリングの動作をカスタマイズできます。詳細については、コネクタ コールを呼び出すをご覧ください。

たとえば、次のワークフローは、jobs.insert メソッドを使用して BigQuery ジョブを開始し、デフォルト設定ではなくカスタム設定を使用するポリシーでジョブのポーリングを構成する方法を示しています。

YAML

# This workflow demonstrates how to automatically poll for the completion of
# a BigQuery job when using a connector method that return an LRO resource.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
    - run_bigquery_job:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${project_id}
          body:
            configuration:
              query:
                query: ${query}
                useLegacySql: false
          connector_params:
            timeout: 3600  # Total time in seconds to wait for the job; default is 1800
            polling_policy:
              initial_delay: 2.0  # Seconds to wait before the first poll; default is 1.0
              multiplier: 1.5     # Factor by which to increase delay between polls; default is 1.25
              max_delay: 60.0     # Maximum delay in seconds between polls; default is 60.0
        result: job_status
    - finish:
        return: ${job_status}

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            },
            {
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          ]
        }
      },
      {
        "run_bigquery_job": {
          "call": "googleapis.bigquery.v2.jobs.insert",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "configuration": {
                "query": {
                  "query": "${query}",
                  "useLegacySql": false
                }
              }
            },
            "connector_params": {
              "timeout": 3600,
              "polling_policy": {
                "initial_delay": 2,
                "multiplier": 1.5,
                "max_delay": 60
              }
            }
          },
          "result": "job_status"
        }
      },
      {
        "finish": {
          "return": "${job_status}"
        }
      }
    ]
  }
}

手動ポーリング

自動ポーリングは、LRO リソースを返すメソッドにのみ適用されます。googleapis.bigquery.v2.jobs.query などの標準の同期メソッドは自動ポーリングをサポートしていないため、クエリが内部 API タイムアウトよりも長くかかる場合は、手動ポーリングが必要になることがあります。

たとえば、次のワークフローは、getQueryResults メソッドを使用して、ワークフロー内で BigQuery ジョブの完了を直接ポーリングする方法を示しています。

YAML

# This workflow demonstrates how to manually poll for the completion of
# a BigQuery job by using the `getQueryResults` method.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
    - startQuery:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${project_id}
          body:
            useLegacySql: false
            query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
        result: queryResponse
    - getJobId:
        assign:
          - jobId: ${queryResponse.jobReference.jobId}
    - pollingLoop:
        steps:
          - checkStatus:
              call: googleapis.bigquery.v2.jobs.getQueryResults
              args:
                projectId: ${project_id}
                jobId: ${jobId}
                timeoutMs: 10000  # Default wait time per call
              result: jobStatus
          - checkIfDone:
              switch:
                - condition: ${jobStatus.jobComplete}
                  return: ${jobStatus}  # Job is finished
          - wait:
              call: sys.sleep
              args:
                seconds: 5
              next: checkStatus  # Repeat check

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            }
          ]
        }
      },
      {
        "startQuery": {
          "call": "googleapis.bigquery.v2.jobs.query",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "useLegacySql": false,
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          },
          "result": "queryResponse"
        }
      },
      {
        "getJobId": {
          "assign": [
            {
              "jobId": "${queryResponse.jobReference.jobId}"
            }
          ]
        }
      },
      {
        "pollingLoop": {
          "steps": [
            {
              "checkStatus": {
                "call": "googleapis.bigquery.v2.jobs.getQueryResults",
                "args": {
                  "projectId": "${project_id}",
                  "jobId": "${jobId}",
                  "timeoutMs": 10000
                },
                "result": "jobStatus"
              }
            },
            {
              "checkIfDone": {
                "switch": [
                  {
                    "condition": "${jobStatus.jobComplete}",
                    "return": "${jobStatus}"
                  }
                ]
              }
            },
            {
              "wait": {
                "call": "sys.sleep",
                "args": {
                  "seconds": 5
                },
                "next": "checkStatus"
              }
            }
          ]
        }
      }
    ]
  }
}

次のステップ