שימוש ב-Spark data lineage ב-Dataproc

במאמר הזה מוסבר איך להפעיל מעקב אחר מקורות נתונים למשימות Dataproc Spark ברמת הפרויקט או ברמת האשכול.

Data lineage היא תכונה של Dataplex Universal Catalog שמאפשרת לעקוב אחרי תנועת הנתונים במערכות: מאיפה הם מגיעים, לאן הם מועברים ואילו טרנספורמציות מופעלות עליהם.

היסטוריית הנתונים זמינה לכל המשימות של Dataproc Spark, למעט משימות של SparkR ושל Spark streaming, והיא תומכת במקורות נתונים של BigQuery ו-Cloud Storage. הוא כלול ב-Dataproc ב-Compute Engine בגרסאות התמונות 2.0.74+, 2.1.22+, 2.2.50+, 2.3.1+ ו-3.0.

אחרי שמפעילים את התכונה באשכול Dataproc, משימות Dataproc Spark מתעדות אירועים של שרשרת מקורות הנתונים ומפרסמות אותם ב-Data Lineage API של Dataplex Universal Catalog. ‫Dataproc משתלב עם Data Lineage API דרך OpenLineage, באמצעות OpenLineage Spark plugin.

אפשר לגשת למידע על שושלת הנתונים דרך Dataplex Universal Catalog, באמצעות האפשרויות הבאות:

לפני שמתחילים

  1. בדף לבחירת הפרויקט במסוף Google Cloud , בוחרים את הפרויקט שמכיל את אשכול Dataproc שרוצים לעקוב אחרי המקור שלו.

    כניסה לדף לבחירת הפרויקט

  2. מפעילים את Data Lineage API.

    הפעלת ממשקי ה-API

    שינויים קרובים ב-Spark data lineage אפשר לעיין בהערות לגבי הגרסה של Dataproc כדי לקרוא על שינוי שיגרום לכך ש-Spark data lineage יהיה זמין באופן אוטומטי לפרויקטים ולאשכולות שלכם כשתפעילו את Data Lineage API (ראו שליטה בהוספת נתונים של lineage לשירות) בלי שתצטרכו להגדיר הגדרות נוספות ברמת הפרויקט או האשכול.

התפקידים הנדרשים

אם יוצרים אשכול Dataproc באמצעות חשבון השירות שמוגדר כברירת מחדל למכונה וירטואלית, מוקצה לו התפקיד Dataproc Worker, שמאפשר מעקב אחר מקורות נתונים. אין צורך לבצע פעולה נוספת.

עם זאת, אם יוצרים אשכול Dataproc שמשתמש בחשבון שירות בהתאמה אישית, כדי להפעיל את מעקב מקורות הנתונים באשכול, צריך להקצות תפקיד נדרש לחשבון השירות בהתאמה אישית, כמו שמוסבר בפסקה הבאה.

כדי לקבל את ההרשאות שדרושות לשימוש בנתוני שושלת עם Dataproc, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בחשבון השירות המותאם אישית של האשכול:

להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.

הפעלת מעקב אחר מקורות נתונים ב-Spark

אפשר להפעיל את תכונת שרשרת המקורות של נתוני Spark ברמת הפרויקט או ברמת האשכול.

הפעלת מעקב אחר מקורות נתונים ב-Spark ברמת הפרויקט

אחרי שמפעילים את התכונה 'מעקב אחר מקורות נתונים ב-Spark' ברמת הפרויקט, היא תופעל גם במשימות Spark הבאות שיופעלו באשכולות Dataproc בפרויקט.

כדי להפעיל את מעקב המקורות של נתוני Spark ברמת הפרויקט, מגדירים את המטא-נתונים הבאים של הפרויקט בהתאמה אישית:

מפתח ערך
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform
הגדרת היקף הגישה למכונה הווירטואלית נדרשת רק עבור אשכולות של גרסאות תמונות 2.0. היא מוגדרת אוטומטית באשכולות של תמונות מגרסה 2.1 ואילך.

כדי להשבית את מעקב המקורות של נתוני Spark ברמת הפרויקט, צריך להגדיר את המטא-נתונים DATAPROC_LINEAGE_ENABLED לערך false.

הפעלה של מעקב אחר מקורות נתונים ב-Spark ברמת האשכול

אם מפעילים את תכונת שרשרת המקור של נתוני Spark כשיוצרים אשכול, משימות Spark נתמכות שמופעלות באשכולות Dataproc יכללו את התכונה הזו. ההגדרה הזו מבטלת כל הגדרה של מעקב אחר מקורות נתונים ב-Spark ברמת הפרויקט: אם מעקב אחר מקורות נתונים ב-Spark מושבת ברמת הפרויקט אבל מופעל ברמת האשכול, ההגדרה ברמת האשכול קודמת להגדרה ברמת הפרויקט, ומעקב אחר מקורות נתונים יופעל במשימות Spark נתמכות שמופעלות באשכול.

כדי להפעיל את תכונת שרשרת המקורות של נתוני Spark באשכול, צריך ליצור אשכול Dataproc עם מאפיין האשכול dataproc:dataproc.lineage.enabled שמוגדר לערך true.

דוגמה ל-CLI של gcloud:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

כדי להשבית את שושלת הנתונים של Spark באשכול, צריך להגדיר את המאפיין dataproc:dataproc.lineage.enabled לערך false כשיוצרים את האשכול.

  • השבתת מעקב אחר מקורות נתונים באשכול: כדי ליצור אשכול עם מעקב מקורות נתונים מושבת, מגדירים את dataproc:dataproc.lineage.enabled=false. אחרי שיוצרים את האשכול, אי אפשר להשבית את מעקב המקורות של נתוני Spark באשכול. כדי להשבית את מעקב המקורות של נתוני Spark באשכול קיים, אפשר ליצור מחדש את האשכול עם המאפיין dataproc:dataproc.lineage.enabled שהוגדר לערך false.

  • הגדרת היקף הרשאות באשכולות של גרסת תמונה 2.0: נדרש היקף הרשאות של אשכול Dataproc גישה למכונה וירטואלית cloud-platform בשביל שושלת הנתונים של Spark. באשכולות של גרסאות תמונות Dataproc שנוצרו עם גרסת תמונה 2.1 ואילך, האפשרות cloud-platform מופעלת. אם מציינים את גרסת התמונה של Dataproc‏ 2.0 כשיוצרים אשכול, צריך להגדיר את ההיקף ל-cloud-platform.

השבתת מעקב אחר מקורות נתונים ב-Spark במשימה

אם התכונה 'מקורות נתונים ב-Spark' מופעלת באשכול, אפשר להשבית אותה במשימה על ידי העברת המאפיין spark.extraListeners עם ערך ריק ("") כששולחים את המשימה.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.extraListeners=''

שליחת משימת Spark

כששולחים משימת Spark נתמכת באשכול Dataproc שנוצר עם האפשרות 'מעקב אחר מקורות נתונים ב-Spark' מופעלת, Dataproc מתעד את המידע על מקורות הנתונים ומדווח עליו ל-Data Lineage API.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

הערות:

  • הוספה של המאפיינים spark.openlineage.namespace ו-spark.openlineage.appName, שמשמשים לזיהוי ייחודי של המשרה, היא אופציונלית. אם לא מוסיפים את המאפיינים האלה, Dataproc משתמש בערכי ברירת המחדל הבאים:
    • ערך ברירת המחדל של spark.openlineage.namespace: PROJECT_ID
    • ערך ברירת המחדל של spark.openlineage.appName: spark.app.name

הצגת שרשרת המקור ב-Dataplex Universal Catalog

תרשים שושלת מציג את הקשרים בין משאבי הפרויקט לבין התהליכים שיצרו אותם. אפשר לצפות במידע על שרשרת מקורות הנתונים במסוף Google Cloud או לאחזר אותו מ-Data Lineage API בפורמט JSON.

קוד לדוגמה ב-PySpark:

העבודה הבאה ב-PySpark קוראת נתונים מטבלה ציבורית ב-BigQuery, ואז כותבת את הפלט לטבלה חדשה במערך נתונים קיים ב-BigQuery. הוא משתמש בקטגוריה של Cloud Storage לאחסון זמני.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

מחליפים את הפרטים הבאים:

  • BUCKET: השם של קטגוריה קיימת ב-Cloud Storage

  • PROJECT_ID,‏ DATASET ו-TABLE: מזהה הפרויקט, השם של מערך נתונים קיים ב-BigQuery והשם של טבלה חדשה שרוצים ליצור במערך הנתונים (הטבלה לא יכולה להיות קיימת)

אפשר לראות את תרשים השושלת בממשק המשתמש של Dataplex Universal Catalog.

דוגמה לתרשים של שרשרת מקורות המידע

המאמרים הבאים