במאמר הזה מוסבר איך ליצור משימות של בקרת איכות נתונים ב-Dataplex Universal Catalog, שמאפשרות לתזמן ולהריץ בדיקות של איכות הנתונים בטבלאות מובנות ובטבלאות חיצוניות של BigQuery.
מידע נוסף מופיע במאמר סקירה כללית על משימות שקשורות לאיכות הנתונים.
לפני שמתחילים
ההנחה במאמר הזה היא שיש לכם אגם קיים של Dataplex Universal Catalog שבו אתם רוצים ליצור את משימת איכות הנתונים.
הפעלת ממשקי API ושירותים של Google
מפעילים את Dataproc API.
מפעילים גישה פרטית ל-Google לרשת ולתת-רשת. מפעילים גישה פרטית ל-Google ברשת שמתכננים להשתמש בה עם משימות של איכות נתונים ב-Dataplex Universal Catalog. אם לא מציינים רשת או רשת משנה כשיוצרים את משימת איכות הנתונים של Dataplex Universal Catalog, המערכת משתמשת ברשת המשנה שמוגדרת כברירת מחדל. במקרה כזה, צריך להפעיל את הגישה הפרטית ל-Google ברשת המשנה שמוגדרת כברירת מחדל.
יצירת קובץ מפרט
Dataplex Universal Catalog משתמש ב-CloudDQ בקוד פתוח כתוכנת הדרייבר. הדרישות לבדיקת איכות הנתונים ב-Dataplex Universal Catalog מוגדרות בקובצי מפרט YAML של CloudDQ.
כקלט למשימת איכות הנתונים, אפשר להשתמש בקובץ YAML בודד או בארכיון ZIP בודד שמכיל קובץ YAML אחד או יותר. מומלץ לתעד את הדרישות של בדיקת איכות הנתונים בקובצי מפרט נפרדים בפורמט YAML, כשכל קובץ מתייחס לקטע אחר.
כדי להכין קובץ מפרט:
-
יוצרים קובץ או קבצים של מפרט YAML של CloudDQ שמגדירים את הדרישות של בדיקת איכות הנתונים. מידע נוסף על התחביר הנדרש מופיע בקטע מידע על קובץ המפרט במאמר הזה.
שומרים את קובץ מפרט ה-YAML בפורמט
.ymlאו.yaml. אם יוצרים כמה קובצי מפרט YAML, צריך לשמור את כל הקבצים בארכיון ZIP אחד. - יצירת קטגוריה של Cloud Storage
- מעלים את קובץ המפרט לקטגוריה של Cloud Storage.
מידע על קובץ המפרט
קובץ המפרט של CloudDQ בפורמט YAML צריך לכלול את הקטעים הבאים:
כללים (מוגדרים בצומת
rulesYAML ברמה העליונה): רשימה של כללים להפעלה. אפשר ליצור את הכללים האלה מסוגי כללים מוגדרים מראש, כמוNOT_NULLו-REGEX, או להרחיב אותם באמצעות הצהרות SQL מותאמות אישית, כמוCUSTOM_SQL_EXPRו-CUSTOM_SQL_STATEMENT. ההצהרהCUSTOM_SQL_EXPRמסמנת כל שורה שבהcustom_sql_exprמוערך כ-Falseככשל. ההצהרהCUSTOM_SQL_STATEMENTמסמנת כל ערך שמוחזר על ידי ההצהרה כולה ככישלון.מסנני שורות (מוגדרים בצומת ה-YAML ברמה העליונה
row_filters): ביטויי SQL שמחזירים ערך בוליאני שמגדירים מסננים לאחזור קבוצת משנה של נתונים מהישות הבסיסית שנתונה לאימות.קשרי כללים (מוגדרים בצומת
rule_bindingsYAML ברמה העליונה): ההגדרותrulesו-rule filtersחלות על הטבלאות.מאפייני הכלל (מוגדרים בצומת
rule_dimensionsב-YAML): מגדירים את רשימת המאפיינים המותרים של כלל איכות הנתונים, שאפשר להגדיר בכלל בשדהdimensionהמתאים.לדוגמה:
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
השדה
dimensionהוא אופציונלי לכלל. חובה להשתמש בקטע rule dimensions אםdimensionמופיע בכלל כלשהו.
מידע נוסף זמין במדריך העזר של CloudDQ ובקבצים לדוגמה של מפרטים.
יצירת מערך נתונים לאחסון התוצאות
-
כדי לאחסן את התוצאות, יוצרים מערך נתונים ב-BigQuery.
מערך הנתונים צריך להיות באותו אזור שבו נמצאות הטבלאות שעליהן מריצים את משימת איכות הנתונים.
Dataplex Universal Catalog משתמש במערך הנתונים הזה, ויוצר או משתמש מחדש בטבלה שתבחרו כדי לאחסן את התוצאות.
יצירה של חשבון שירות
יוצרים חשבון שירות עם ההרשאות והתפקידים הבאים בניהול זהויות והרשאות גישה (IAM):
- הרשאת קריאה לנתיב ב-Cloud Storage שמכיל את מפרטי ה-YAML. אפשר להשתמש בתפקיד Storage Object Viewer (
roles/storage.objectViewer) בקטגוריה של Cloud Storage. - הרשאת קריאה למערכי נתונים ב-BigQuery עם נתונים שצריך לאמת.
אפשר להשתמש בתפקיד BigQuery Data Viewer (
roles/bigquery.dataViewer). - הרשאת כתיבה למערך הנתונים ב-BigQuery כדי ליצור טבלה (אם צריך) ולכתוב את התוצאות בטבלה הזו. אפשר להשתמש בתפקיד BigQuery Data Editor (
roles/bigquery.dataEditor) ברמת מערך הנתונים. - תפקיד BigQuery Job User (
roles/bigquery.jobUser) ברמת הפרויקט כדי ליצור משימות BigQuery בפרויקט. - תפקיד Dataplex Metadata Reader (
roles/dataplex.metadataReader) ברמת הפרויקט או האגם. - תפקיד שימוש בשירות (
roles/serviceusage.serviceUsageConsumer) ברמת הפרויקט. - התפקיד Dataproc Worker (
roles/dataproc.worker). - ההרשאה
iam.serviceAccounts.actAsשניתנה למשתמש ששולח את העבודה. - התפקיד 'משתמש בחשבון שירות' שניתן לחשבון השירות של Dataplex Universal Catalog lake. אפשר לראות את חשבון השירות של אגם Dataplex Universal Catalog ב Google Cloud מסוף.
שימוש בהגדרות מתקדמות
השלבים הבאים הם אופציונליים:
כברירת מחדל, BigQuery מריץ בדיקות של איכות הנתונים בפרויקט הנוכחי. אפשר לבחור פרויקט אחר להרצת העבודות ב-BigQuery. משתמשים בארגומנט
--gcp_project_idTASK_ARGSבשביל המאפיין--execution-argsשל המשימה.אם מזהה הפרויקט שצוין להרצת שאילתות BigQuery שונה מהפרויקט שבו נוצר חשבון השירות (שצוין על ידי
--execution-service-account), צריך לוודא שמדיניות הארגון שמשביתה את השימוש בחשבונות שירות בין פרויקטים (iam.disableServiceAccountCreation) מושבתת. בנוסף, צריך לוודא שלחשבון השירות יש גישה ללוח הזמנים של משימות BigQuery בפרויקט שבו מורצות שאילתות BigQuery.
מגבלות
כל הטבלאות שצוינו למשימה מסוימת של איכות נתונים צריכות להיות באותו Google Cloudאזור.
תזמון משימה של איכות נתונים
המסוף
- במסוף Google Cloud , נכנסים לדף Process בקטלוג האוניברסלי של Dataplex.
- לוחצים על Create task.
- בכרטיס בדיקת איכות הנתונים, לוחצים על יצירת משימה.
- בשדה Dataplex lake (אגם Dataplex), בוחרים את האגם.
- בשדה ID (מזהה), מזינים מזהה.
- בקטע Data quality specification (מפרט איכות הנתונים), מבצעים את הפעולות הבאות:
- בשדה Select GCS file (בחירת קובץ GCS), לוחצים על Browse (עיון).
בוחרים את הקטגוריה של Cloud Storage.
לוחצים על בחירה.
בקטע Results table:
בשדה Select BigQuery dataset (בחירת מערך נתונים ב-BigQuery), לוחצים על Browse (עיון).
בוחרים את מערך הנתונים ב-BigQuery שבו יאוחסנו תוצאות האימות.
לוחצים על בחירה.
בשדה BigQuery table (טבלה ב-BigQuery), מזינים את שם הטבלה שבה רוצים לאחסן את התוצאות. אם הטבלה לא קיימת, Dataplex Universal Catalog יוצר אותה בשבילכם. אל תשתמשו בשם
dq_summaryכי הוא שמור למשימות עיבוד פנימיות.
בקטע Service account, בוחרים חשבון שירות מהתפריט User service account.
לוחצים על Continue.
בקטע Set schedule (הגדרת תזמון), מגדירים את התזמון להרצת משימת בקרת האיכות.
לוחצים על יצירה.
CLI של gcloud
הדוגמה הבאה מציגה הרצה של משימת איכות הנתונים באמצעות הפקודה Dataplex Universal Catalog tasks ה-CLI של gcloud:
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"
# Google Cloud project where the Dataplex Universal Catalog task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"
# Google Cloud region for the Dataplex Universal Catalog lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"
# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"
# The Dataplex Universal Catalog lake where your task is created.
export DATAPLEX_LAKE_ID="DATAPLEX_LAKE_ID"
# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Universal Catalog Editor, Dataproc Worker, and Service
Usage Consumer.
export DATAPLEX_TASK_SERVICE_ACCOUNT="DATAPLEX_TASK_SERVICE_ACCOUNT"
# If you want to use a different dataset for storing the intermediate data quality summary results
and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET="CLOUDDQ_BIGQUERY_DATASET"
# The BigQuery dataset where the final results of the data quality checks are stored.
This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"
# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"
# The unique identifier for the task.
export TASK_ID="TASK_ID"
gcloud dataplex tasks create \
--location="${DATAPLEX_REGION_ID}" \
--lake="${DATAPLEX_LAKE_ID}" \
--trigger-type=ON_DEMAND \
--execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
--spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
--spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
--execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
"$TASK_ID"| פרמטר | תיאור |
|---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
הנתיב ב-Cloud Storage לקובץ ה-YAML של הגדרות איכות הנתונים, שמשמש כקלט למשימת איכות הנתונים. אפשר להעלות קובץ YAML בודד בפורמט .yml או .yaml, או ארכיון ZIP שמכיל כמה קובצי YAML. |
GOOGLE_CLOUD_PROJECT |
Google Cloud הפרויקט שבו נוצרת המשימה של Dataplex Universal Catalog והעבודות של BigQuery. |
DATAPLEX_REGION_ID |
האזור של אגם Dataplex Universal Catalog שבו נוצרה משימת איכות הנתונים. |
SERVICE_ACCOUNT |
חשבון השירות שמשמש להפעלת המשימה. מוודאים שלחשבון השירות הזה יש הרשאות IAM מספיקות, כפי שמתואר בקטע לפני שמתחילים. |
במקרה של --execution-args, צריך להעביר את הארגומנטים הבאים כארגומנטים תלויי מיקום, ולכן בסדר הזה:
| ארגומנט | תיאור |
|---|---|
clouddq-executable.zip |
קובץ הפעלה שעבר קומפילציה מראש, שהועבר ב-spark-file-uris מקטגוריית Cloud Storage ציבורית. |
ALL |
מריצים את כל קישורי הכללים. אפשר גם לציין קשרי כללים ספציפיים כרשימה מופרדת בפסיקים.
לדוגמה, RULE_1,RULE_2. |
gcp-project-id |
מזהה הפרויקט שבו מורצות השאילתות ב-BigQuery. |
gcp-region-id |
האזור להרצת משימות BigQuery לצורך אימות איכות הנתונים. האזור הזה צריך להיות זהה לאזור של gcp-bq-dataset-id ושל target_bigquery_summary_table. |
gcp-bq-dataset-id |
מערך נתונים ב-BigQuery שמשמש לאחסון התצוגות וסיכומי התוצאות של איכות הנתונים.rule_binding |
target-bigquery-summary-table |
מזהה הטבלה ב-BigQuery שבה מאוחסנות התוצאות הסופיות של בדיקות איכות הנתונים. אל תשתמשו בערך המזהה
dq_summary כי הוא שמור למשימות עיבוד פנימיות. |
--summary_to_stdout |
(אופציונלי) כשמעבירים את הדגל הזה, כל השורות של תוצאות האימות שנוצרו בטבלה dq_summary בהרצה האחרונה נרשמות כרשומות JSON ב-Cloud Logging וב-stdout. |
API
מחליפים את מה שכתוב בשדות הבאים:
PROJECT_ID = "Your Dataplex Universal Catalog Project ID" REGION = "Your Dataplex Universal Catalog lake region" LAKE_ID = "Your Dataplex Universal Catalog lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- שליחת בקשת HTTP POST:
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
אפשר גם לעיין בדוגמה ל-DAG של Airflow למשימת איכות הנתונים ב-Dataplex Universal Catalog.
מעקב אחרי משימה מתוזמנת של איכות נתונים
צפייה בתוצאות
התוצאות של אימות איכות הנתונים מאוחסנות במערך הנתונים ובטבלת הסיכום שצוינו ב-BigQuery, כפי שמתואר במאמר יצירת מערך נתונים לאחסון התוצאות. טבלת הסיכום מכילה את סיכום הפלט של כל שילוב של קשירת כלל וכלל לכל הפעלה של אימות. הפלט בטבלת הסיכום כולל את המידע הבא:
| שם העמודה | תיאור |
|---|---|
dataplex_lake |
(string) מזהה של אגם Dataplex Universal Catalog שמכיל את הטבלה שנבדקת. |
dataplex_zone |
(מחרוזת) מזהה האזור ב-Dataplex Universal Catalog שמכיל את הטבלה שנבדקת. |
dataplex_asset_id |
(string) המזהה של נכס Dataplex Universal Catalog שמכיל את הטבלה שנבדקת. |
execution_ts |
(timestamp) חותמת הזמן שבה הופעלה שאילתת האימות. |
rule_binding_id |
(string) מזהה של קשירת הכלל שעבורה מדווחות תוצאות האימות. |
rule_id |
(string) מזהה הכלל במסגרת קשירת הכלל שעבורו מדווחות תוצאות האימות. |
dimension |
(string) מאפיין איכות הנתונים של rule_id. הערך הזה יכול להיות רק אחד מהערכים שצוינו בצומת YAML rule_dimensions. |
table_id |
(string) מזהה הישות שעבורה מדווחות תוצאות האימות.
המזהה הזה מצוין בפרמטר entity של קישור הכלל המתאים. |
column_id |
(string) מזהה העמודה שעבורה מדווחות תוצאות האימות.
המזהה הזה מצוין בפרמטר column של קישור הכלל המתאים. |
last_modified |
(timestamp) חותמת הזמן של השינוי האחרון של table_id
שנבדק. |
metadata_json_string |
(string) צמדי מפתח/ערך של תוכן פרמטר המטא-נתונים שצוין במסגרת הקישור של הכלל או במהלך ההרצה של איכות הנתונים. |
configs_hashsum |
(string) סכום הגיבוב של מסמך ה-JSON שמכיל את הקישור של הכלל ואת כל הכללים, הקישורים של הכללים, מסנני השורות והגדרות הישות שמשויכים אליו.
configs_hashsum מאפשר מעקב אחרי שינויים בתוכן של מזהה rule_binding או של אחת מההגדרות המקושרות שלו. |
dq_run_id |
(string) מזהה ייחודי של הרשומה. |
invocation_id |
(string) המזהה של ההרצה של איכות הנתונים. כל הרשומות של סיכום איכות הנתונים שנוצרו באותו מופע של הפעלת איכות הנתונים חולקות את אותו invocation_id. |
progress_watermark |
(ערך בוליאני) קובע אם הרשומה הספציפית הזו נלקחת בחשבון בבדיקת איכות הנתונים כדי לקבוע את נקודת הציון הגבוהה ביותר לאימות מצטבר. אם הערך הוא FALSE, המערכת מתעלמת מהרשומה הרלוונטית כשמגדירים את ערך הסף העליון. המידע הזה שימושי כשמבצעים אימותים של איכות נתוני בדיקה שלא אמורים להעלות את רף המים הגבוה. כברירת מחדל, Dataplex Universal Catalog מאכלס את השדה הזה עם TRUE, אבל אפשר לשנות את הערך אם הארגומנט --progress_watermark הוא FALSE.
|
rows_validated |
(integer) המספר הכולל של הרשומות שאומתו אחרי החלת row_filters ומסנני סימון של נקודת התחלה בעמודה incremental_time_filter_column_id, אם צוינו. |
complex_rule_validation_errors_count |
(float) מספר השורות שמוחזרות על ידי כלל CUSTOM_SQL_STATEMENT. |
complex_rule_validation_success_flag |
(בוליאני) סטטוס ההצלחה של כללי CUSTOM_SQL_STATEMENT.
|
success_count |
(integer) המספר הכולל של הרשומות שעברו אימות. השדה הזה מוגדר ל-NULL עבור כללי CUSTOM_SQL_STATEMENT.
|
success_percentage |
(float) אחוז מספר הרשומות שעברו אימות מתוך המספר הכולל של הרשומות שאומתו. השדה הזה מוגדר ל-NULL עבור כללי CUSTOM_SQL_STATEMENT. |
failed_count |
(מספר שלם) המספר הכולל של הרשומות שנכשלו באימות. השדה הזה מוגדר ל-NULL עבור כללי CUSTOM_SQL_STATEMENT.
|
failed_percentage |
(float) אחוז הרשומות שנכשל באימות מתוך המספר הכולל של הרשומות שאומתו. השדה הזה מוגדר ל-NULL עבור כללי CUSTOM_SQL_STATEMENT. |
null_count |
(מספר שלם) המספר הכולל של הרשומות שהחזירו ערך null במהלך האימות.
השדה הזה מוגדר ל-NULL עבור כללי NOT_NULL ו-CUSTOM_SQL_STATEMENT. |
null_percentage |
(float) אחוז מספר הרשומות שהחזירו ערך null במהלך האימות מתוך המספר הכולל של הרשומות שאומתו. השדה הזה מוגדר כ-NULL עבור כללי NOT_NULL ו-CUSTOM_SQL_STATEMENT. |
failed_records_query |
לכל כלל שנכשל, העמודה הזו מאחסנת שאילתה שאפשר להשתמש בה כדי לקבל רשומות שנכשלו. במסמך הזה מוסבר איך לפתור בעיות בכללים שנכשלו באמצעות failed_records_query. |
לגבי ישויות BigQuery, נוצרת תצוגה לכל rule_binding שמכילה את לוגיקת האימות של SQL מההפעלה האחרונה. אפשר למצוא את התצוגות האלה במערך הנתונים של BigQuery שצוין בארגומנט --gcp-bq-dataset-id.
אופטימיזציה של עלויות
משימות של איכות נתונים מבוצעות כעבודות של BigQuery בפרויקט שלכם. כדי לשלוט בעלות של הפעלת משימות לשיפור איכות הנתונים, צריך להשתמש בתמחור של BigQuery בפרויקט שבו מופעלות משימות BigQuery. מידע נוסף זמין במאמר בנושא ניהול עומסי עבודה ב-BigQuery.
אימותים מצטברים
לעתים קרובות יש לכם טבלאות שמתעדכנות באופן שוטף עם מחיצות חדשות (שורות חדשות). אם אתם לא רוצים לאמת מחדש מחיצות ישנות בכל הפעלה, אתם יכולים להשתמש באימותים מצטברים.
באימותים מצטברים, הטבלה צריכה לכלול עמודה מסוג TIMESTAMP או DATETIME שבה הערך עולה באופן מונוטוני. אתם יכולים להשתמש בעמודות שלפיהן הטבלה שלכם ב-BigQuery מחולקת למחיצות.
כדי לציין אימות מצטבר, צריך לציין ערך לפרמטר
incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
כחלק מקישור הכלל.
כשמציינים עמודה, המשימה של איכות הנתונים מתייחסת רק לשורות עם ערך TIMESTAMP שגדול מחותמת הזמן של המשימה האחרונה של איכות הנתונים שהופעלה.
קבצי מפרט לדוגמה
כדי להשתמש בדוגמאות האלה, יוצרים מערך נתונים ב-BigQuery בשם sales. אחר כך יוצרים טבלת עובדות בשם sales_orders ומוסיפים נתונים לדוגמה על ידי הפעלת שאילתה עם ההצהרות הבאות של GoogleSQL:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
דוגמה 1
בדוגמת הקוד הבאה אפשר לראות איך יוצרים בדיקות של איכות הנתונים כדי לאמת את הערכים האלה:
-
amount: הערכים הם אפס או מספרים חיוביים. -
item_id: מחרוזת אלפאנומרית של 5 תווים אלפביתיים, ואחריה 15 ספרות. -
transaction_currency: סוג מטבע מותר, כפי שמוגדר ברשימה סטטית. הרשימה הסטטית בדוגמה הזו מאפשרת להשתמש ב-GBP וב-JPY כסוגי מטבע. האימות הזה חל רק על שורות שמסומנות כבינלאומיות.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
מחליפים את מה שכתוב בשדות הבאים:
PROJECT_ID: מזהה הפרויקט.-
DATASET_ID: מזהה קבוצת הנתונים.
דוגמה 2
אם הטבלה שרוצים לבדוק היא חלק מאגם Dataplex Universal Catalog, אפשר לציין את הטבלאות באמצעות סימון של אגם או אזור. כך תוכלו לצבור את התוצאות לפי אגם או אזור. לדוגמה, אפשר ליצור ציון ברמת האזור.
כדי להשתמש בדוגמה הזו, צריך ליצור אגם Dataplex Universal Catalog עם מזהה האגם operations ומזהה האזור procurement. לאחר מכן מוסיפים את הטבלה sales_orders
כנכס לאזור.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
מחליפים את מה שכתוב בשדות הבאים:
- PROJECT_ID: מזהה הפרויקט.
- REGION_ID: מזהה האזור של אגם Dataplex Universal Catalog שבו הטבלה קיימת, לדוגמה
us-central1.
דוגמה 3
בדוגמה הזו משפרים את דוגמה 2 על ידי הוספת בדיקת SQL מותאמת אישית כדי לראות אם ערכי המזהים הם ייחודיים.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
דוגמה 4
בדוגמה הזו, שיפרנו את דוגמה 3 על ידי הוספת אימותים מצטברים באמצעות העמודה last_modified_timestamp. אפשר להוסיף אימותים מצטברים לקישורי כללים אחדים או יותר.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
פתרון בעיות בכללים שנכשלו באמצעות failed_records_query
לכל כלל שנכשל, בטבלת הסיכום מאוחסנת שאילתה בעמודה failed_records_query שאפשר להשתמש בה כדי לקבל את הרשומות שנכשלו.
כדי לבצע ניפוי באגים, אפשר גם להשתמש ב-reference columns בקובץ ה-YAML. כך אפשר לצרף את הפלט של failed_records_query לנתונים המקוריים כדי לקבל את הרשומה המלאה. לדוגמה, אפשר לציין עמודה primary_key או עמודה מורכבת primary_key כעמודת הפניה.
ציון עמודות להשוואה
כדי ליצור עמודות הפניה, אפשר להוסיף את ההגדרות הבאות למפרט ה-YAML:
החלק
reference_columns. בקטע הזה אפשר ליצור קבוצה אחת או יותר של עמודות להשוואה, כשכל קבוצה מציינת עמודה אחת או יותר.החלק
rule_bindings. בקטע הזה אפשר להוסיף שורה לקשירת כלל שמציינת מזהה של עמודת הפניה (reference_columns_id) לשימוש בכללים בקשירת הכלל הזו. הוא צריך להיות אחד ממערכות העמודות של ההפניה שצוינו בקטעreference_columns.
לדוגמה, קובץ ה-YAML הבא מציין קטע reference_columns ומגדיר שלוש עמודות: id, last_modified_timestamp ו-item_id כחלק מהקבוצה ORDER_DETAILS_REFERENCE_COLUMNS. בדוגמה הבאה נעשה שימוש בטבלת הדוגמה sales_orders.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVEשימוש בשאילתת הרשומות שנכשלו
שאילתת הרשומות שנכשלו יוצרת שורה לכל רשומה שיש לה כלל שנכשל. הוא כולל את שם העמודה שגרמה לכשל, את הערך שגרם לכשל ואת הערכים של עמודות ההפניה. הוא כולל גם מטא-נתונים שאפשר להשתמש בהם כדי להתייחס להרצה של משימת איכות הנתונים.
זוהי דוגמה לפלט של שאילתת רשומות שנכשלה עבור קובץ ה-YAML שמתואר במאמר ציון עמודות הפניה. מוצגת שגיאה בעמודה amount וערך שגוי של -10. היא גם מתעדת את הערך המתאים של עמודת ההפניה.
| _dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
|---|---|---|---|---|---|---|---|---|---|---|---|
| 10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | סכום | -10 | לא נכון | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
שימוש בשאילתות של רשומות שנכשלו בכללים מסוג CUSTOM_SQL_STATEMENT
עבור כללי CUSTOM_SQL_STATEMENT, שאילתות רשומות שנכשלו כוללות את העמודה custom_sql_statement_validation_errors. העמודה custom_sql_statement_validation_errors היא עמודה מקוננת עם שדות שתואמים לפלט של הצהרת ה-SQL. עמודות ההפניה לא נכללות בשאילתות של רשומות שנכשלו עבור כללי CUSTOM_SQL_STATEMENT.
לדוגמה, כלל CUSTOM_SQL_STATEMENT יכול להיראות כך:
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors, עם שורה לכל מופע שבו existing_id!=replacement_id.
כשמציגים את התוכן של תא בעמודה הזו ב-JSON, הוא יכול להיראות כך:
{
"custom_sql_statement_valdation_errors" :{
"product_name"="abc"
"product_key"="12345678"
"_rule_binding_id"="your_rule_binding"
}
}
אפשר להצטרף לתוצאות האלה לטבלה המקורית באמצעות הפניה מקוננת כמו join on custom_sql_statement_valdation_errors.product_key.
המאמרים הבאים
- מידע נוסף מופיע במפרט של CloudDQ YAML.
- דוגמאות לכללים פשוטים ומתקדמים לאיכות נתונים מופיעות במאמרים כללים פשוטים וכללים מתקדמים.
- דוגמה ל-DAG של Airflow למשימת בקרת איכות נתונים ב-Dataplex Universal Catalog