Membuat polling untuk penyelesaian tugas BigQuery

Saat memulai tugas BigQuery menggunakan konektor BigQuery, alur kerja Anda mungkin tidak otomatis menunggu hingga tugas selesai. Jika langkah selanjutnya bergantung pada keberhasilan tugas, Anda harus menerapkan mekanisme polling.

Banyak konektor Workflows menampilkan resource operasi yang berjalan lama (LRO) yang dapat dipolling. Meskipun beberapa konektor menampilkan resource berjenis Operation untuk merepresentasikan LRO, BigQuery menggunakan resource Job untuk tujuan ini. Layanan lain, seperti Cloud Build atau Compute Engine, menampilkan resource Operation yang sebenarnya. Untuk mengetahui daftar jenis resource LRO, lihat Konektor yang didukung.

Polling otomatis

Anda dapat secara otomatis melakukan polling untuk penyelesaian tugas BigQuery saat menggunakan metode konektor yang menampilkan resource LRO, seperti googleapis.bigquery.v2.jobs.insert.

Saat Anda menggunakan metode konektor tersebut, Workflows akan memblokir eksekusi alur kerja hingga operasi berhasil atau gagal. Kolom connector_params memungkinkan Anda menyesuaikan perilaku polling, termasuk total waktu tunggu dan frekuensi pemeriksaan status. Untuk mengetahui informasi selengkapnya, lihat Memanggil panggilan konektor.

Misalnya, alur kerja berikut menunjukkan cara memulai tugas BigQuery menggunakan metode jobs.insert, dan mengonfigurasi polling tugas dengan kebijakan yang menggunakan setelan kustom, bukan setelan default:

YAML

# This workflow demonstrates how to automatically poll for the completion of
# a BigQuery job when using a connector method that return an LRO resource.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
    - run_bigquery_job:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${project_id}
          body:
            configuration:
              query:
                query: ${query}
                useLegacySql: false
          connector_params:
            timeout: 3600  # Total time in seconds to wait for the job; default is 1800
            polling_policy:
              initial_delay: 2.0  # Seconds to wait before the first poll; default is 1.0
              multiplier: 1.5     # Factor by which to increase delay between polls; default is 1.25
              max_delay: 60.0     # Maximum delay in seconds between polls; default is 60.0
        result: job_status
    - finish:
        return: ${job_status}

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            },
            {
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          ]
        }
      },
      {
        "run_bigquery_job": {
          "call": "googleapis.bigquery.v2.jobs.insert",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "configuration": {
                "query": {
                  "query": "${query}",
                  "useLegacySql": false
                }
              }
            },
            "connector_params": {
              "timeout": 3600,
              "polling_policy": {
                "initial_delay": 2,
                "multiplier": 1.5,
                "max_delay": 60
              }
            }
          },
          "result": "job_status"
        }
      },
      {
        "finish": {
          "return": "${job_status}"
        }
      }
    ]
  }
}

Polling manual

Polling otomatis hanya berlaku untuk metode yang menampilkan resource LRO. Metode sinkron standar seperti googleapis.bigquery.v2.jobs.query tidak mendukung polling otomatis dan mungkin memerlukan polling manual jika kueri memerlukan waktu lebih lama daripada waktu tunggu API internal.

Misalnya, alur kerja berikut menunjukkan cara melakukan polling untuk penyelesaian tugas BigQuery langsung dalam alur kerja menggunakan metode getQueryResults:

YAML

# This workflow demonstrates how to manually poll for the completion of
# a BigQuery job by using the `getQueryResults` method.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
    - startQuery:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${project_id}
          body:
            useLegacySql: false
            query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
        result: queryResponse
    - getJobId:
        assign:
          - jobId: ${queryResponse.jobReference.jobId}
    - pollingLoop:
        steps:
          - checkStatus:
              call: googleapis.bigquery.v2.jobs.getQueryResults
              args:
                projectId: ${project_id}
                jobId: ${jobId}
                timeoutMs: 10000  # Default wait time per call
              result: jobStatus
          - checkIfDone:
              switch:
                - condition: ${jobStatus.jobComplete}
                  return: ${jobStatus}  # Job is finished
          - wait:
              call: sys.sleep
              args:
                seconds: 5
              next: checkStatus  # Repeat check

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            }
          ]
        }
      },
      {
        "startQuery": {
          "call": "googleapis.bigquery.v2.jobs.query",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "useLegacySql": false,
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          },
          "result": "queryResponse"
        }
      },
      {
        "getJobId": {
          "assign": [
            {
              "jobId": "${queryResponse.jobReference.jobId}"
            }
          ]
        }
      },
      {
        "pollingLoop": {
          "steps": [
            {
              "checkStatus": {
                "call": "googleapis.bigquery.v2.jobs.getQueryResults",
                "args": {
                  "projectId": "${project_id}",
                  "jobId": "${jobId}",
                  "timeoutMs": 10000
                },
                "result": "jobStatus"
              }
            },
            {
              "checkIfDone": {
                "switch": [
                  {
                    "condition": "${jobStatus.jobComplete}",
                    "return": "${jobStatus}"
                  }
                ]
              }
            },
            {
              "wait": {
                "call": "sys.sleep",
                "args": {
                  "seconds": 5
                },
                "next": "checkStatus"
              }
            }
          ]
        }
      }
    ]
  }
}

Langkah berikutnya