שימוש בתיעוד מקורות הנתונים עם Serverless for Apache Spark

במאמר הזה מוסבר איך להפעיל שקיפות נתונים ב-Google Cloud Serverless for Apache Spark לעומסי עבודה של אצווה ולסשנים אינטראקטיביים ברמה של פרויקט, עומס עבודה של אצווה או סשן אינטראקטיבי.

סקירה כללית

‫Data lineage היא תכונה של Dataplex Universal Catalog שמאפשרת לעקוב אחרי תנועת הנתונים במערכות: מאיפה הם מגיעים, לאן הם מועברים ואילו טרנספורמציות מוחלות עליהם.

‫Google Cloud Serverless for Apache Spark (בלי שרת) לעומסי עבודה ולסשנים של Apache Spark מתעד אירועים של שושלת נתונים ומפרסם אותם ב-Data Lineage API של Dataplex Universal Catalog. ‫Serverless (בלי שרת) ל-Apache Spark משתלב עם Data Lineage API דרך OpenLineage, באמצעות OpenLineage Spark plugin.

אפשר לגשת למידע על השתלשלות הנתונים דרך Dataplex Universal Catalog באמצעות תרשימי השתלשלות הנתונים ו-Data Lineage API. מידע נוסף זמין במאמר בנושא הצגת תרשימי שושלת ב-Dataplex Universal Catalog.

זמינות

התכונה 'מקורות נתונים' תומכת במקורות נתונים של BigQuery ו-Cloud Storage, והיא זמינה לעומסי עבודה ולסשנים שמופעלים עם גרסאות זמן ריצה נתמכות של Serverless for Apache Spark, עם היוצאים מן הכלל והמגבלות הבאים:

  • השגת שושלת נתונים לא אפשרית עבור עומסי עבודה או סשנים של SparkR או Spark Streaming.

לפני שמתחילים

  1. בדף לבחירת הפרויקט במסוף Google Cloud , בוחרים את הפרויקט שבו רוצים להשתמש עבור עומסי העבודה או הסשנים של Serverless for Apache Spark.

    כניסה לדף לבחירת הפרויקט

  2. מפעילים את Data Lineage API.

    הפעלת ממשקי ה-API

    שינויים צפויים בתיעוד מקורות הנתונים של Spark אפשר לעיין בהערות לגבי הגרסה של Serverless for Apache Spark כדי לקרוא על שינוי שיגרום לכך שתיעוד מקורות הנתונים של Spark יהיה זמין באופן אוטומטי לפרויקטים, לעומסי עבודה של אצווה ולסשנים אינטראקטיביים כשמפעילים את Data Lineage API (ראו שליטה בהטמעת תיעוד מקורות הנתונים בשירות) בלי שיהיה צורך בהגדרות נוספות של פרויקט, עומס עבודה של אצווה או סשן אינטראקטיבי.

התפקידים הנדרשים

אם עומס העבודה של אצווה משתמש בחשבון השירות שמוגדר כברירת מחדל ב-Serverless for Apache Spark, הוא כולל את התפקיד Dataproc Worker, שמכיל את ההרשאות שנדרשות לתיעוד מקורות הנתונים.

עם זאת, אם עומס העבודה של אצווה משתמש בחשבון שירות בהתאמה אישית כדי להפעיל את מעקב המקורות של הנתונים, צריך להעניק לחשבון השירות בהתאמה אישית אחד מהתפקידים שמפורטים בפסקה הבאה, שכוללים את ההרשאות שנדרשות למעקב המקורות של הנתונים.

כדי לקבל את ההרשאות שדרושות לשימוש ב-Data Lineage עם Dataproc, צריך לבקש מהאדמין להקצות לכם את תפקידי ה-IAM הבאים בחשבון השירות המותאם אישית של עומס העבודה של Batch:

להסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.

הפעלת מעקב אחר מקורות נתונים ב-Spark

אפשר להפעיל את תכונת שרשרת המקור של נתוני Spark בפרויקט, בעומס עבודה של אצווה או בסשן אינטראקטיבי.

הפעלת מעקב אחר מקורות נתונים ברמת הפרויקט

אחרי שמפעילים את מעקב המקורות של נתוני Spark ברמת הפרויקט, מעקב המקורות של נתוני Spark יופעל במשימות Spark הבאות שיופעלו בעומס עבודה באצווה או בסשן אינטראקטיבי.

כדי להפעיל את תכונת שרשרת המקור של נתוני Spark בפרויקט, מגדירים את המטא-נתונים המותאמים אישית הבאים של הפרויקט:

מפתח ערך
DATAPROC_LINEAGE_ENABLED true

כדי להשבית את מעקב המקורות של נתוני Spark בפרויקט, צריך להגדיר את המטא-נתונים DATAPROC_LINEAGE_ENABLED לערך false.

הפעלת שושלת נתונים בעומס עבודה של אצווה ב-Spark

כדי להפעיל את מעקב המקורות בעומס עבודה של אצווה, מגדירים את המאפיין spark.dataproc.lineage.enabled לערך true כששולחים את עומס העבודה. ההגדרה הזו מבטלת כל הגדרה של מעקב אחר מקורות נתונים ב-Spark ברמת הפרויקט: אם מעקב אחר מקורות נתונים ב-Spark מושבת ברמת הפרויקט אבל מופעל לעומס העבודה של האצווה, ההגדרה של עומס העבודה של האצווה מקבלת עדיפות.

כדי להשבית את מעקב המקורות של נתוני Spark במשימת אצווה של Spark, צריך להגדיר את המאפיין spark.dataproc.lineage.enabled לערך false כששולחים את המשימה.

בדוגמה הזו משתמשים ב-CLI של gcloud כדי לשלוח עומס עבודה של lineage-example.pybatch עם מעקב אחר מקורות נתונים של Spark.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

קטע הקוד הבא ב-lineage-example.py קורא נתונים מטבלה ציבורית ב-BigQuery, ואז כותב את הפלט לטבלה חדשה במערך נתונים קיים ב-BigQuery. הוא משתמש בקטגוריה של Cloud Storage לאחסון זמני.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

מחליפים את מה שכתוב בשדות הבאים:

  • REGION: האזור שבו יופעל עומס העבודה
  • BUCKET: השם של קטגוריה קיימת ב-Cloud Storage לאחסון תלות
  • PROJECT_ID,‏ DATASET ו-TABLE: מזהה הפרויקט, השם של מערך נתונים קיים ב-BigQuery והשם של טבלה חדשה שרוצים ליצור במערך הנתונים (הטבלה לא יכולה להיות קיימת)

אפשר לראות את תרשים השושלת בממשק המשתמש של Dataplex Universal Catalog.

תרשים שושלת של Spark

הפעלת מעקב אחר מקורות נתונים בסשן אינטראקטיבי של Spark או בתבנית סשן

כדי להפעיל את מעקב המקורות בסשן אינטראקטיבי של Spark או בתבנית סשן, צריך להגדיר את המאפיין spark.dataproc.lineage.enabled לערך true כשיוצרים את הסשן או את תבנית הסשן. ההגדרה הזו מחליפה כל הגדרה של שושלת נתוני Spark ברמת הפרויקט: אם שושלת נתוני Spark מושבתת ברמת הפרויקט אבל מופעלת בסשן האינטראקטיבי, ההגדרה של הסשן האינטראקטיבי מקבלת עדיפות.

כדי להשבית את מעקב המקורות של נתוני Spark בסשן אינטראקטיבי של Spark או בתבנית סשן, צריך להגדיר את המאפיין spark.dataproc.lineage.enabled לערך false כשיוצרים את הסשן האינטראקטיבי או את תבנית הסשן.

הקוד הבא במחברת PySpark מגדיר סשן אינטראקטיבי של Serverless for Apache Spark עם מעקב אחרי מקורות הנתונים של Spark. לאחר מכן, המערכת יוצרת סשן של Spark Connect שמריץ שאילתת ספירת מילים במערך נתונים ציבורי של שייקספיר ב-BigQuery, ואז כותבת את הפלט לטבלה חדשה במערך נתונים קיים ב-BigQuery (ראו יצירת סשן של Spark במחברת BigQuery Studio) .

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID,‏ DATASET ו-TABLE: מזהה הפרויקט, השם של מערך נתונים קיים ב-BigQuery והשם של טבלה חדשה שרוצים ליצור במערך הנתונים (הטבלה לא יכולה להיות קיימת)

כדי לראות את גרף שרשרת המקורות של הנתונים, לוחצים על שם טבלת היעד שמופיע בחלונית הניווט בדף Explorer ב-BigQuery, ואז בוחרים בכרטיסייה 'שרשרת מקורות' בחלונית פרטי הטבלה.

תרשים שושלת של Spark

הצגת שרשרת המקור ב-Dataplex Universal Catalog

תרשים שושלת מציג את הקשרים בין משאבי הפרויקט לבין התהליכים שיצרו אותם. אתם יכולים לצפות במידע על שושלת הנתונים במסוף Google Cloud או לאחזר את המידע מ-Data Lineage API כנתוני JSON.

המאמרים הבאים