שימוש ב-Spark data lineage

במאמר הזה מוסבר איך להפעיל שושלת נתונים למשימות של Managed Service for Apache Spark ברמת הפרויקט או ברמת האשכול.

Data lineage היא תכונה של Knowledge Catalog שמאפשרת לעקוב אחרי תנועת הנתונים במערכות: מאיפה הם מגיעים, לאן הם מועברים ואילו טרנספורמציות מוחלות עליהם.

היסטוריית הנתונים זמינה לכל המשימות של Managed Service for Apache Spark, למעט משימות של SparkR ושל Spark streaming, והיא תומכת במקורות נתונים של BigQuery ו-Cloud Storage. הוא כלול בגרסאות התמונות של Managed Service for Apache Spark‏ 2.0.74+, 2.1.22+, 2.2.50+, 2.3.1+ ו-3.0.

אחרי שמפעילים את התכונה באשכול Managed Service for Apache Spark, משימות Spark של Managed Service for Apache Spark מתעדות אירועים של שושלת נתונים ומפרסמות אותם ב-Knowledge Catalog Data Lineage API. ‫Managed Service for Apache Spark משתלב עם Data Lineage API דרך OpenLineage, באמצעות OpenLineage Spark plugin.

אפשר לגשת למידע על שרשרת מקורות הנתונים דרך Knowledge Catalog, באמצעות האפשרויות הבאות:

לפני שמתחילים

  1. במסוף Google Cloud , בדף לבחירת הפרויקט, בוחרים את הפרויקט שמכיל את אשכול Managed Service for Apache Spark שרוצים לעקוב אחרי שושלת הנתונים שלו.

    כניסה לדף לבחירת הפרויקט

  2. מפעילים את Data Lineage API.

    הפעלת ממשקי ה-API

    שינויים קרובים ב-Spark data lineage אפשר לעיין בהערות לגבי הגרסה של Managed Service for Apache Spark כדי לקרוא על שינוי שיגרום לכך ש-Spark data lineage יהיה זמין באופן אוטומטי לפרויקטים ולמקלאסטרים שלכם כשתפעילו את Data Lineage API (ראו שליטה בהטמעת lineage בשירות) בלי שתצטרכו להגדיר הגדרות נוספות ברמת הפרויקט או המקלאסטר.

התפקידים הנדרשים

אם יוצרים אשכול של Managed Service for Apache Spark באמצעות חשבון השירות של מכונה וירטואלית שמוגדר כברירת מחדל, מוקצה לו התפקיד Managed Service for Apache Spark Worker, שמאפשר מעקב אחר מקורות נתונים. אין צורך לבצע פעולה נוספת.

עם זאת, אם יוצרים אשכול של Managed Service for Apache Spark שמשתמש בחשבון שירות בהתאמה אישית, כדי להפעיל את מעקב מקורות הנתונים באשכול, צריך להקצות לחשבון השירות בהתאמה אישית את התפקיד הנדרש, כמו שמוסבר בפסקה הבאה.

כדי לקבל את ההרשאות שדרושות לשימוש ב-Data Lineage עם השירות המנוהל של Apache Spark, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בחשבון השירות המותאם אישית של האשכול:

להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.

הפעלת מעקב אחר מקורות נתונים ב-Spark

אפשר להפעיל את תכונת שרשרת המקור של נתוני Spark ברמת הפרויקט או ברמת האשכול.

הפעלה של מעקב אחר מקורות נתונים ב-Spark ברמת הפרויקט

אחרי שמפעילים את מעקב המקורות של נתוני Spark ברמת הפרויקט, מעקב המקורות של נתוני Spark יופעל במשימות Spark הבאות שיופעלו באשכולות של Managed Service for Apache Spark בפרויקט.

כדי להפעיל את מעקב המקורות של נתוני Spark ברמת הפרויקט, מגדירים את המטא-נתונים המותאמים אישית הבאים של הפרויקט:

מפתח ערך
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform
הגדרת היקף הגישה למכונה הווירטואלית נדרשת רק עבור אשכולות של גרסאות תמונות 2.0. היא מוגדרת אוטומטית באשכולות של גרסת תמונה 2.1 ואילך.

אפשר להשבית את מעקב המקורות של נתוני Spark ברמת הפרויקט על ידי הגדרת המטא-נתונים DATAPROC_LINEAGE_ENABLED לערך false.

הפעלה של מעקב אחר מקורות נתונים ב-Spark ברמת האשכול

אם מפעילים את תכונת השושלת של נתוני Spark כשיוצרים אשכול, תכונת השושלת של נתוני Spark תופעל במשימות Spark נתמכות שמופעלות באשכולות של Managed Service for Apache Spark. ההגדרה הזו מבטלת כל הגדרה של מעקב אחר מקורות נתונים ב-Spark ברמת הפרויקט: אם מעקב אחר מקורות נתונים ב-Spark מושבת ברמת הפרויקט אבל מופעל ברמת האשכול, ההגדרה ברמת האשכול מקבלת עדיפות, ומעקב אחר מקורות נתונים יופעל במשימות Spark נתמכות שמופעלות באשכול.

כדי להפעיל את תכונת שרשרת המקור של נתוני Spark באשכול, צריך ליצור אשכול של Managed Service for Apache Spark עם מאפיין האשכול dataproc:dataproc.lineage.enabled שמוגדר לערך true.

דוגמה ל-CLI של gcloud:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

אפשר להשבית את מעקב המקורות של נתוני Spark באשכול על ידי הגדרת המאפיין dataproc:dataproc.lineage.enabled לערך false כשיוצרים את האשכול.

  • השבתת שושלת הנתונים באשכול: כדי ליצור אשכול עם השבתה של שושלת הנתונים, מגדירים את dataproc:dataproc.lineage.enabled=false. אחרי שיוצרים את האשכול, אי אפשר להשבית את מעקב המקורות של נתוני Spark באשכול. כדי להשבית את מעקב המקורות של נתוני Spark באשכול קיים, אפשר ליצור מחדש את האשכול עם המאפיין dataproc:dataproc.lineage.enabled שהערך שלו מוגדר ל-false.

  • הגדרת היקף באשכולות של גרסת תמונה 2.0: נדרש היקף של אשכול Managed Service for Apache Spark‏ cloud-platform לגישה למכונה וירטואלית לצורך מעקב אחר מקורות נתונים ב-Spark. באשכולות של Managed Service for Apache Spark שנוצרו עם גרסת האימג' 2.1 ואילך, האפשרות cloud-platform מופעלת. אם מציינים את גרסת התמונה של Managed Service for Apache Spark‏ 2.0 כשיוצרים אשכול, צריך להגדיר את ההיקף ל-cloud-platform.

השבתת מעקב אחר מקורות נתונים ב-Spark במשימה

אם התכונה 'מקורות נתונים ב-Spark' מופעלת באשכול, אפשר להשבית אותה במשימה על ידי העברת המאפיין spark.extraListeners עם ערך ריק ("") כששולחים את המשימה.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.extraListeners=''

שליחת משימה ב-Spark

כששולחים משימת Spark נתמכת באשכול Managed Service for Apache Spark שנוצר עם האפשרות 'מעקב אחר מקורות נתונים' מופעלת, שירות Managed Service for Apache Spark מתעד את פרטי מעקב מקורות הנתונים ומדווח עליהם ל-Data Lineage API.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

הערות:

  • הוספה של המאפיינים spark.openlineage.namespace ו-spark.openlineage.appName, שמשמשים לזיהוי ייחודי של המשרה, היא אופציונלית. אם לא מוסיפים את המאפיינים האלה, Managed Service for Apache Spark משתמש בערכי ברירת המחדל הבאים:
    • ערך ברירת המחדל של spark.openlineage.namespace: PROJECT_ID
    • ערך ברירת המחדל של spark.openlineage.appName: spark.app.name

הצגת שושלת נתונים ב-Knowledge Catalog

תרשים שושלת נתונים מציג את הקשרים בין משאבי הפרויקט לבין התהליכים שיצרו אותם. אפשר לצפות במידע על שרשרת מקורות הנתונים במסוף Google Cloud , או לאחזר אותו מ-Data Lineage API בפורמט JSON.

קוד לדוגמה ב-PySpark:

העבודה הבאה ב-PySpark קוראת נתונים מטבלה ציבורית ב-BigQuery, ואז כותבת את הפלט לטבלה חדשה במערך נתונים קיים ב-BigQuery. הוא משתמש בקטגוריה של Cloud Storage לאחסון זמני.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

מחליפים את הפרטים הבאים:

  • BUCKET: השם של קטגוריה של Cloud Storage קיימת

  • PROJECT_ID,‏ DATASET ו-TABLE: מזהה הפרויקט, השם של מערך נתונים קיים ב-BigQuery והשם של טבלה חדשה שרוצים ליצור במערך הנתונים (הטבלה לא יכולה להיות קיימת)

אפשר לראות את תרשים שושלת הנתונים בממשק המשתמש של Knowledge Catalog.

דוגמה לתרשים של שרשרת מקורות המידע

המאמרים הבאים