בדיקה חוזרת לגבי השלמת משימה ב-BigQuery

כשמפעילים משימה ב-BigQuery באמצעות המחבר של BigQuery, יכול להיות שתהליך העבודה לא ימתין אוטומטית לסיום המשימה. אם השלבים הבאים תלויים בהצלחת העבודה, כדאי להטמיע מנגנון של בדיקת סטטוס.

מחברים רבים של Workflows מחזירים משאב של פעולה ממושכת (LRO) שאפשר לדגום. יש מחברים שמחזירים משאב מסוג Operation כדי לייצג פעולה ארוכת טווח (LRO), אבל ב-BigQuery משתמשים במשאב Job למטרה הזו. שירותים אחרים, כמו Cloud Build או Compute Engine, מחזירים משאב Operation בפועל. רשימה של סוגי משאבים של LRO זמינה במאמר בנושא מקורות נתונים נתמכים.

הצבעה אוטומטית

כשמשתמשים בשיטות של מחבר שמחזירות משאב LRO, כמו googleapis.bigquery.v2.jobs.insert, אפשר לבצע באופן אוטומטי בדיקה חוזרת כדי לראות אם משימת BigQuery הסתיימה.

כשמשתמשים בשיטה כזו של מחבר, Workflows חוסם את ההפעלה של תהליך העבודה עד שהפעולה מצליחה או נכשלת. השדה connector_params מאפשר לכם להתאים אישית את אופן הפעולה של הדגימה, כולל הזמן הכולל להמתנה והתדירות של בדיקות הסטטוס. מידע נוסף זמין במאמר בנושא הפעלת שיחה עם מחבר.

לדוגמה, בתהליך העבודה הבא מוצג איך להתחיל משימת BigQuery באמצעות השיטה jobs.insert, ואיך להגדיר את הסקר של המשימה באמצעות מדיניות שמשתמשת בהגדרות בהתאמה אישית ולא בהגדרות ברירת המחדל:

YAML

# This workflow demonstrates how to automatically poll for the completion of
# a BigQuery job when using a connector method that return an LRO resource.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
    - run_bigquery_job:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${project_id}
          body:
            configuration:
              query:
                query: ${query}
                useLegacySql: false
          connector_params:
            timeout: 3600  # Total time in seconds to wait for the job; default is 1800
            polling_policy:
              initial_delay: 2.0  # Seconds to wait before the first poll; default is 1.0
              multiplier: 1.5     # Factor by which to increase delay between polls; default is 1.25
              max_delay: 60.0     # Maximum delay in seconds between polls; default is 60.0
        result: job_status
    - finish:
        return: ${job_status}

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            },
            {
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          ]
        }
      },
      {
        "run_bigquery_job": {
          "call": "googleapis.bigquery.v2.jobs.insert",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "configuration": {
                "query": {
                  "query": "${query}",
                  "useLegacySql": false
                }
              }
            },
            "connector_params": {
              "timeout": 3600,
              "polling_policy": {
                "initial_delay": 2,
                "multiplier": 1.5,
                "max_delay": 60
              }
            }
          },
          "result": "job_status"
        }
      },
      {
        "finish": {
          "return": "${job_status}"
        }
      }
    ]
  }
}

שליחת בקשות ידנית

תשאול אוטומטי רלוונטי רק לשיטות שמחזירות משאב LRO. שיטות סינכרוניות רגילות כמו googleapis.bigquery.v2.jobs.query לא תומכות בסקר אוטומטי, ויכול להיות שיידרש סקר ידני אם השאילתה נמשכת יותר מהזמן הקצוב לתפוגה של ה-API הפנימי.

לדוגמה, בתהליך העבודה הבא מוצג איך לבדוק אם עבודת BigQuery הסתיימה ישירות בתהליך העבודה באמצעות השיטה getQueryResults:

YAML

# This workflow demonstrates how to manually poll for the completion of
# a BigQuery job by using the `getQueryResults` method.
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
    - startQuery:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${project_id}
          body:
            useLegacySql: false
            query: "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
        result: queryResponse
    - getJobId:
        assign:
          - jobId: ${queryResponse.jobReference.jobId}
    - pollingLoop:
        steps:
          - checkStatus:
              call: googleapis.bigquery.v2.jobs.getQueryResults
              args:
                projectId: ${project_id}
                jobId: ${jobId}
                timeoutMs: 10000  # Default wait time per call
              result: jobStatus
          - checkIfDone:
              switch:
                - condition: ${jobStatus.jobComplete}
                  return: ${jobStatus}  # Job is finished
          - wait:
              call: sys.sleep
              args:
                seconds: 5
              next: checkStatus  # Repeat check

JSON

{
  "main": {
    "steps": [
      {
        "init": {
          "assign": [
            {
              "project_id": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
            }
          ]
        }
      },
      {
        "startQuery": {
          "call": "googleapis.bigquery.v2.jobs.query",
          "args": {
            "projectId": "${project_id}",
            "body": {
              "useLegacySql": false,
              "query": "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10"
            }
          },
          "result": "queryResponse"
        }
      },
      {
        "getJobId": {
          "assign": [
            {
              "jobId": "${queryResponse.jobReference.jobId}"
            }
          ]
        }
      },
      {
        "pollingLoop": {
          "steps": [
            {
              "checkStatus": {
                "call": "googleapis.bigquery.v2.jobs.getQueryResults",
                "args": {
                  "projectId": "${project_id}",
                  "jobId": "${jobId}",
                  "timeoutMs": 10000
                },
                "result": "jobStatus"
              }
            },
            {
              "checkIfDone": {
                "switch": [
                  {
                    "condition": "${jobStatus.jobComplete}",
                    "return": "${jobStatus}"
                  }
                ]
              }
            },
            {
              "wait": {
                "call": "sys.sleep",
                "args": {
                  "seconds": 5
                },
                "next": "checkStatus"
              }
            }
          ]
        }
      }
    ]
  }
}

המאמרים הבאים