במאמר הזה מוסבר איך להפעיל מעקב אחר מקורות נתונים למשימות Dataproc Spark ברמת הפרויקט או ברמת האשכול.
Data lineage היא תכונה של Dataplex Universal Catalog שמאפשרת לעקוב אחרי תנועת הנתונים במערכות: מאיפה הם מגיעים, לאן הם מועברים ואילו טרנספורמציות מופעלות עליהם.
היסטוריית הנתונים זמינה לכל המשימות של Dataproc Spark, למעט משימות של SparkR ושל Spark streaming, והיא תומכת במקורות נתונים של BigQuery ו-Cloud Storage. הוא כלול ב-Dataproc ב-Compute Engine בגרסאות התמונות 2.0.74+, 2.1.22+,
2.2.50+, 2.3.1+ ו-3.0.
אחרי שמפעילים את התכונה באשכול Dataproc, משימות Dataproc Spark מתעדות אירועים של שרשרת מקורות הנתונים ומפרסמות אותם ב-Data Lineage API של Dataplex Universal Catalog. Dataproc משתלב עם Data Lineage API דרך OpenLineage, באמצעות OpenLineage Spark plugin.
אפשר לגשת למידע על שושלת הנתונים דרך Dataplex Universal Catalog, באמצעות האפשרויות הבאות:
לפני שמתחילים
בדף לבחירת הפרויקט במסוף Google Cloud , בוחרים את הפרויקט שמכיל את אשכול Dataproc שרוצים לעקוב אחרי המקור שלו.
מפעילים את Data Lineage API.
שינויים קרובים ב-Spark data lineage אפשר לעיין בהערות לגבי הגרסה של Dataproc כדי לקרוא על שינוי שיגרום לכך ש-Spark data lineage יהיה זמין באופן אוטומטי לפרויקטים ולאשכולות שלכם כשתפעילו את Data Lineage API (ראו שליטה בהוספת נתונים של lineage לשירות) בלי שתצטרכו להגדיר הגדרות נוספות ברמת הפרויקט או האשכול.
התפקידים הנדרשים
אם יוצרים אשכול Dataproc באמצעות חשבון השירות שמוגדר כברירת מחדל למכונה וירטואלית, מוקצה לו התפקיד Dataproc Worker, שמאפשר מעקב אחר מקורות נתונים. אין צורך לבצע פעולה נוספת.
עם זאת, אם יוצרים אשכול Dataproc שמשתמש בחשבון שירות בהתאמה אישית, כדי להפעיל את מעקב מקורות הנתונים באשכול, צריך להקצות תפקיד נדרש לחשבון השירות בהתאמה אישית, כמו שמוסבר בפסקה הבאה.
כדי לקבל את ההרשאות שדרושות לשימוש בנתוני שושלת עם Dataproc, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בחשבון השירות המותאם אישית של האשכול:
-
מקצים אחד מהתפקידים הבאים:
-
Dataproc Worker (
roles/dataproc.worker) -
הכלי לעריכת שושלת הנתונים (
roles/datalineage.editor) -
יצירת שושלת נתונים (
roles/datalineage.producer) -
אדמין של מעקב מקורות נתונים (
roles/datalineage.admin)
-
Dataproc Worker (
להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.
יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.
הפעלת מעקב אחר מקורות נתונים ב-Spark
אפשר להפעיל את תכונת שרשרת המקורות של נתוני Spark ברמת הפרויקט או ברמת האשכול.
הפעלת מעקב אחר מקורות נתונים ב-Spark ברמת הפרויקט
אחרי שמפעילים את התכונה 'מעקב אחר מקורות נתונים ב-Spark' ברמת הפרויקט, היא תופעל גם במשימות Spark הבאות שיופעלו באשכולות Dataproc בפרויקט.
כדי להפעיל את מעקב המקורות של נתוני Spark ברמת הפרויקט, מגדירים את המטא-נתונים הבאים של הפרויקט בהתאמה אישית:
| מפתח | ערך |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platformהגדרת היקף הגישה למכונה הווירטואלית נדרשת רק עבור אשכולות של גרסאות תמונות 2.0. היא מוגדרת אוטומטית באשכולות של תמונות מגרסה 2.1 ואילך. |
כדי להשבית את מעקב המקורות של נתוני Spark ברמת הפרויקט, צריך להגדיר את המטא-נתונים DATAPROC_LINEAGE_ENABLED לערך false.
הפעלה של מעקב אחר מקורות נתונים ב-Spark ברמת האשכול
אם מפעילים את תכונת שרשרת המקור של נתוני Spark כשיוצרים אשכול, משימות Spark נתמכות שמופעלות באשכולות Dataproc יכללו את התכונה הזו. ההגדרה הזו מבטלת כל הגדרה של מעקב אחר מקורות נתונים ב-Spark ברמת הפרויקט: אם מעקב אחר מקורות נתונים ב-Spark מושבת ברמת הפרויקט אבל מופעל ברמת האשכול, ההגדרה ברמת האשכול קודמת להגדרה ברמת הפרויקט, ומעקב אחר מקורות נתונים יופעל במשימות Spark נתמכות שמופעלות באשכול.
כדי להפעיל את תכונת שרשרת המקורות של נתוני Spark באשכול, צריך ליצור אשכול Dataproc עם מאפיין האשכול dataproc:dataproc.lineage.enabled שמוגדר לערך true.
דוגמה ל-CLI של gcloud:
gcloud dataproc clusters create CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--properties 'dataproc:dataproc.lineage.enabled=true'כדי להשבית את שושלת הנתונים של Spark באשכול, צריך להגדיר את המאפיין dataproc:dataproc.lineage.enabled לערך false כשיוצרים את האשכול.
השבתת מעקב אחר מקורות נתונים באשכול: כדי ליצור אשכול עם מעקב מקורות נתונים מושבת, מגדירים את
dataproc:dataproc.lineage.enabled=false. אחרי שיוצרים את האשכול, אי אפשר להשבית את מעקב המקורות של נתוני Spark באשכול. כדי להשבית את מעקב המקורות של נתוני Spark באשכול קיים, אפשר ליצור מחדש את האשכול עם המאפייןdataproc:dataproc.lineage.enabledשהוגדר לערךfalse.הגדרת היקף הרשאות באשכולות של גרסת תמונה 2.0: נדרש היקף הרשאות של אשכול Dataproc גישה למכונה וירטואלית
cloud-platformבשביל שושלת הנתונים של Spark. באשכולות של גרסאות תמונות Dataproc שנוצרו עם גרסת תמונה2.1ואילך, האפשרותcloud-platformמופעלת. אם מציינים את גרסת התמונה של Dataproc2.0כשיוצרים אשכול, צריך להגדיר את ההיקף ל-cloud-platform.
השבתת מעקב אחר מקורות נתונים ב-Spark במשימה
אם התכונה 'מקורות נתונים ב-Spark' מופעלת באשכול, אפשר להשבית אותה במשימה על ידי העברת המאפיין spark.extraListeners עם ערך ריק ("") כששולחים את המשימה.
gcloud dataproc jobs submit spark \
--cluster=CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.extraListeners=''שליחת משימת Spark
כששולחים משימת Spark נתמכת באשכול Dataproc שנוצר עם האפשרות 'מעקב אחר מקורות נתונים ב-Spark' מופעלת, Dataproc מתעד את המידע על מקורות הנתונים ומדווח עליו ל-Data Lineage API.
gcloud dataproc jobs submit spark \
--cluster=CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAMEהערות:
- הוספה של המאפיינים
spark.openlineage.namespaceו-spark.openlineage.appName, שמשמשים לזיהוי ייחודי של המשרה, היא אופציונלית. אם לא מוסיפים את המאפיינים האלה, Dataproc משתמש בערכי ברירת המחדל הבאים:- ערך ברירת המחדל של
spark.openlineage.namespace: PROJECT_ID - ערך ברירת המחדל של
spark.openlineage.appName:spark.app.name
- ערך ברירת המחדל של
הצגת שרשרת המקור ב-Dataplex Universal Catalog
תרשים שושלת מציג את הקשרים בין משאבי הפרויקט לבין התהליכים שיצרו אותם. אפשר לצפות במידע על שרשרת מקורות הנתונים במסוף Google Cloud או לאחזר אותו מ-Data Lineage API בפורמט JSON.
קוד לדוגמה ב-PySpark:
העבודה הבאה ב-PySpark קוראת נתונים מטבלה ציבורית ב-BigQuery, ואז כותבת את הפלט לטבלה חדשה במערך נתונים קיים ב-BigQuery. הוא משתמש בקטגוריה של Cloud Storage לאחסון זמני.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
מחליפים את הפרטים הבאים:
BUCKET: השם של קטגוריה קיימת ב-Cloud Storage
PROJECT_ID, DATASET ו-TABLE: מזהה הפרויקט, השם של מערך נתונים קיים ב-BigQuery והשם של טבלה חדשה שרוצים ליצור במערך הנתונים (הטבלה לא יכולה להיות קיימת)
אפשר לראות את תרשים השושלת בממשק המשתמש של Dataplex Universal Catalog.