טרנספורמציה של נתונים
אחרי שמבצעים המרה של נתונים גולמיים, השלב הבא הוא ליצור שלב של נתונים מעובדים. בשלב הזה, הנתונים הגולמיים עוברים עידון והופכים למערך נתונים מאומת, מנוקה ותואם. במסמך הזה מוסבר איך ליצור את שלב עיבוד הנתונים הזה באמצעות PySpark ב-Managed Service for Apache Spark.
לפני שמתחילים
- נכנסים לחשבון Google Cloud . אם אתם משתמשים חדשים ב- Google Cloud, צרו חשבון כדי שתוכלו להעריך את הביצועים של המוצרים שלנו בתרחישים מהעולם האמיתי. לקוחות חדשים מקבלים בחינם גם קרדיט בשווי 300$ להרצה, לבדיקה ולפריסה של עומסי העבודה.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- מוודאים שיש לכם נתונים גולמיים בקטגוריה של Cloud Storage.
- יצירת אשכול Managed Service for Apache Spark להרצת משימות PySpark.
ניקוי הנתונים והפיכתם לתקניים
השלב הראשון הוא לטפל בבעיות נפוצות שקשורות לאיכות הנתונים. PySpark DataFrame API מספק פונקציות לביצוע משימות ניקוי בסיסיות באופן מבוזר.
טיפול בערכים חסרים
אפשר לטפל בערכי null על ידי השמטת שורות שמכילות אותם או על ידי מילוי שלהם בערך ברירת מחדל.
כדי להשמיט שורות שבהן עמודות ספציפיות הן null, משתמשים בשיטה
dropna().כדי למלא ערכי null בעמודות ספציפיות בערך ברירת מחדל, משתמשים בשיטה
fillna().# Assuming 'df' is the DataFrame read from the raw data stage # Drop rows where 'word' or 'corpus' is null cleaned_df = df.dropna(subset=["word", "corpus"]) # Fill nulls in 'word_count' with 0 cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
סוגי נתונים נכונים
כדי לאכוף סכימה נכונה ועקבית לתקינות נתונים ולביצועים. משתמשים בהמרת withColumn() עם השיטה cast() כדי לשנות את סוג הנתונים של עמודה.
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))
הסרת רשומות כפולות
רשומות כפולות עלולות לשבש את הניתוח. משתמשים בשיטה dropDuplicates() כדי להסיר שורות כפולות מ-DataFrame. כדי להגדיר ייחודיות על סמך קבוצה ספציפית של עמודות, מעבירים רשימה של שמות עמודות לפרמטר subset.
# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()
# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])
סינון חריגים
ערכים חריגים יכולים להשפיע על צבירות. שיטה נפוצה לזיהוי חריגים היא טווח בין-רבעוני (IQR). סינון ערכים שלא נכללים בטווח שהוגדר על ידי ה-IQR.
from pyspark.sql.functions import col
# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
(col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)
אימות נתונים בהתאם לכללים העסקיים
אימות נתונים באופן פרוגרמטי בהתאם לכללים עסקיים ולמגבלות איכות. משתמשים בפונקציות המובנות של Spark כדי לבצע אימותים. לדוגמה, אפשר להשתמש בפעולת join כדי לבדוק את תקינות הנתונים מול קבוצה ידועה של ערכי הפניה. ב-inner join נשמרים רק רשומות תקינות, וכל הרשומות עם ערכים לא תקינים מסוננות.
העשרת מערך הנתונים
אחרי שמנקים את הנתונים ומאמתים אותם, צריך להחיל עליהם לוגיקה עסקית. בשלב הזה, לרוב, צריך להעשיר את מערך הנתונים על ידי שילוב שלו עם מקורות נתונים אחרים וביצוע צבירות.
# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])
# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
corpus_metadata_df,
validated_df.corpus == corpus_metadata_df.corpus_name,
how="left"
)
כתיבת הנתונים המעובדים ל-Cloud Storage
שמירת ה-DataFrame שעבר טרנספורמציה בשלב של עיבוד הנתונים ב-Cloud Storage. כדאי להשתמש בפורמט אחסון אופטימלי של עמודות כמו Parquet לשכבה הזו. כדי לשפר את ביצועי השאילתות, כדאי לחלק את הנתונים לפי עמודה שמוגדרת בה מסנן בתדירות גבוהה.
# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"
# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
.format("parquet") \
.partitionBy("corpus") \
.mode("overwrite") \
.save(processed_path)
print(f"Successfully wrote transformed data to {processed_path}")
האפשרות mode("overwrite") מבטיחה שכל הרצה של צינור עיבוד הנתונים תחליף את הפלט הקודם, וכך העבודה תהיה אידמפוטנטית.