במסמך הזה מוסבר איך להפעיל שקיפות נתוני המקור בעומסי עבודה של אצווה ובסשנים אינטראקטיביים של Managed Service for Apache Spark ברמת הפרויקט, עומס העבודה של האצווה או הסשן האינטראקטיבי.
סקירה כללית
Data lineage היא תכונה של Knowledge Catalog שמאפשרת לעקוב אחרי תנועת הנתונים במערכות שלכם: מאיפה הם מגיעים, לאן הם מועברים ואילו טרנספורמציות מוחלות עליהם.
Managed Service for Apache Spark לעומסי עבודה ולסשנים של Apache Spark מתעד אירועי שושלת נתונים ומפרסם אותם ב-Data Lineage API של Knowledge Catalog. Managed Service for Apache Spark משתלב עם Data Lineage API דרך OpenLineage, באמצעות OpenLineage Spark plugin.
אפשר לגשת למידע על השתלשלות הנתונים דרך Knowledge Catalog באמצעות גרפים של השתלשלות הנתונים ו-Data Lineage API. מידע נוסף זמין במאמר צפייה בתרשימי שושלת נתונים ב-Knowledge Catalog.
זמינות
שושלת נתונים, שתומכת במקורות נתונים של BigQuery ו-Cloud Storage, זמינה לעומסי עבודה ולסשנים שמופעלים עם גרסאות נתמכות של זמן ריצה של Managed Service for Apache Spark, עם היוצאים מן הכלל והמגבלות הבאים:
- היסטוריית הנתונים לא זמינה עבור עומסי עבודה או סשנים של SparkR או Spark Streaming.
לפני שמתחילים
בדף לבחירת הפרויקט במסוף Google Cloud , בוחרים את הפרויקט שבו רוצים להשתמש עבור עומסי העבודה או הסשנים של Managed Service for Apache Spark.
מפעילים את Data Lineage API.
שינויים קרובים ב-Spark data lineage אפשר לעיין בהערות הגרסה של Managed Service for Apache Spark כדי לקרוא על שינוי שיגרום לכך ש-Spark data lineage יהיה זמין אוטומטית לפרויקטים, לעומסי עבודה של אצווה ולסשנים אינטראקטיביים כשמפעילים את Data Lineage API (ראו שליטה בהטמעת lineage בשירות) בלי שיהיה צורך בהגדרות נוספות של פרויקט, עומס עבודה של אצווה או סשן אינטראקטיבי.
התפקידים הנדרשים
אם עומס העבודה של אצווה משתמש בחשבון השירות שמוגדר כברירת מחדל בשירות Managed Service for Apache Spark, הוא כולל את התפקיד Managed Service for Apache Spark Worker, שמכיל את ההרשאות שנדרשות למעקב אחר מקורות נתונים.
עם זאת, אם עומס העבודה של אצווה משתמש בחשבון שירות בהתאמה אישית כדי להפעיל את תכונת שרשרת מקורות הנתונים, צריך להעניק לחשבון השירות בהתאמה אישית אחד מהתפקידים שמפורטים בפסקה הבאה, שמכילים את ההרשאות שנדרשות לשרשרת מקורות הנתונים.
כדי לקבל את ההרשאות שדרושות לשימוש במעקב אחר מקורות נתונים עם Managed Service for Apache Spark, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בחשבון השירות המותאם אישית של עומס העבודה של אצווה:
-
מקצים אחד מהתפקידים הבאים:
- Managed Service for Apache Spark Worker (
roles/dataproc.worker) - הכלי לעריכת שושלת הנתונים (
roles/datalineage.editor) - Data Lineage Producer (
roles/datalineage.producer) - אדמין של מעקב מקורות נתונים (
roles/datalineage.admin)
- Managed Service for Apache Spark Worker (
להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.
יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.
הפעלת מעקב אחר מקורות נתונים ב-Spark
אתם יכולים להפעיל את תכונת שרשרת המקור של נתוני Spark בפרויקט, בעומס עבודה של אצווה או בסשן אינטראקטיבי.
הפעלת מעקב אחר מקורות נתונים ברמת הפרויקט
אחרי שמפעילים את מעקב המקורות של נתוני Spark ברמת הפרויקט, מעקב המקורות של נתוני Spark יופעל במשימות Spark הבאות שיופעלו בעומס עבודה באצווה או בסשן אינטראקטיבי.
כדי להפעיל את תכונת שרשרת המקור של נתוני Spark בפרויקט, מגדירים את המטא-נתונים המותאמים אישית הבאים של הפרויקט:
| מפתח | ערך |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
כדי להשבית את מעקב המקורות של נתוני Spark בפרויקט, צריך להגדיר את המטא-נתונים DATAPROC_LINEAGE_ENABLED לערך false.
הפעלת שושלת נתונים בעומס עבודה של אצווה ב-Spark
כדי להפעיל את מעקב המקורות בעומס עבודה של אצווה, מגדירים את המאפיין spark.dataproc.lineage.enabled לערך true כששולחים את עומס העבודה. ההגדרה הזו מבטלת כל הגדרה של מעקב אחר מקורות נתונים ב-Spark ברמת הפרויקט: אם מעקב אחר מקורות נתונים ב-Spark מושבת ברמת הפרויקט אבל מופעל עבור עומס העבודה של האצווה, ההגדרה של עומס העבודה של האצווה מקבלת עדיפות.
כדי להשבית את מעקב המקורות של נתוני Spark במטלת אצווה של Spark, צריך להגדיר את המאפיין spark.dataproc.lineage.enabled לערך false כששולחים את המטלה.
בדוגמה הזו משתמשים ב-CLI של gcloud כדי לשלוח עומס עבודה של lineage-example.pybatch עם מעקב אחר מקורות נתונים של Spark.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
קוד lineage-example.py הבא קורא נתונים מטבלה ציבורית ב-BigQuery, ואז כותב את הפלט לטבלה חדשה במערך נתונים קיים ב-BigQuery. הוא משתמש בקטגוריה של Cloud Storage לאחסון זמני.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
מחליפים את מה שכתוב בשדות הבאים:
- REGION: האזור שבו יופעל עומס העבודה
- BUCKET: השם של קטגוריה קיימת ב-Cloud Storage לאחסון תלות
- PROJECT_ID, DATASET ו-TABLE: מזהה הפרויקט, השם של מערך נתונים קיים ב-BigQuery והשם של טבלה חדשה שרוצים ליצור במערך הנתונים (הטבלה לא יכולה להיות קיימת)
אפשר לראות את תרשים שושלת הנתונים בממשק המשתמש של Knowledge Catalog.
הפעלת מעקב אחר מקורות נתונים בסשן אינטראקטיבי של Spark או בתבנית סשן
כדי להפעיל את שושלת הנתונים בסשן אינטראקטיבי של Spark או בתבנית סשן, צריך להגדיר את המאפיין spark.dataproc.lineage.enabled לערך true כשיוצרים את הסשן או את תבנית הסשן. ההגדרה הזו מבטלת כל הגדרה של שושלת נתוני Spark ברמת הפרויקט: אם שושלת נתוני Spark מושבתת ברמת הפרויקט אבל מופעלת בסשן האינטראקטיבי, ההגדרה של הסשן האינטראקטיבי מקבלת עדיפות.
כדי להשבית את מעקב המקורות של נתוני Spark בסשן אינטראקטיבי של Spark או בתבנית סשן, צריך להגדיר את המאפיין spark.dataproc.lineage.enabled לערך false כשיוצרים את הסשן האינטראקטיבי או את תבנית הסשן.
קוד ה-Notebook הבא של PySpark מגדיר סשן אינטראקטיבי של Managed Service for Apache Spark עם שושלת נתוני Spark מופעלת. לאחר מכן, המערכת יוצרת סשן של Spark Connect שמריץ שאילתה לספירת מילים במערך נתונים ציבורי של שייקספיר ב-BigQuery, ואז כותבת את הפלט לטבלה חדשה במערך נתונים קיים ב-BigQuery (ראו יצירת סשן של Spark במחברת BigQuery Studio) .
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
מחליפים את מה שכתוב בשדות הבאים:
- PROJECT_ID, DATASET ו-TABLE: מזהה הפרויקט, השם של מערך נתונים קיים ב-BigQuery והשם של טבלה חדשה שרוצים ליצור במערך הנתונים (הטבלה לא יכולה להיות קיימת)
כדי לראות את גרף שרשרת המקורות של הנתונים, לוחצים על שם טבלת היעד שמופיע בחלונית הניווט בדף Explorer ב-BigQuery, ואז בוחרים בכרטיסייה 'שרשרת מקורות' בחלונית הפרטים של הטבלה.
הצגת שושלת נתונים ב-Knowledge Catalog
תרשים שושלת נתונים מציג את הקשרים בין משאבי הפרויקט לבין התהליכים שיצרו אותם. אפשר לצפות במידע על שושלת הנתונים במסוף Google Cloud או לאחזר את המידע מ-Data Lineage API כנתוני JSON.
המאמרים הבאים
- מידע נוסף על שרשרת מקורות הנתונים
- אפשר לנסות את התכונה 'השתלשלות נתונים' במעבדה אינטראקטיבית: Capture and Explore Data Updates With Data Lineage and OpenLineage in Dataplex.