轉換資料
擷取原始資料後,下一個階段是建立已處理資料階段。這個階段會將原始資料精煉為經過驗證、清理及符合規範的資料集。本文將說明如何使用 Managed Service for Apache Spark 上的 PySpark,建構這個處理過的資料階段。
事前準備
- 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- 確認 Cloud Storage bucket 中有原始資料。
- 建立 Managed Service for Apache Spark 叢集,執行 PySpark 工作。
清理及標準化資料
第一步是解決常見的資料品質問題。PySpark DataFrame API 提供多項函式,以分散式方式執行基礎清理工作。
處理遺漏值
處理空值,方法是捨棄含有空值的資料列,或以預設值填入空值。
如要捨棄特定資料欄為空值的資料列,請使用
dropna()方法。如要以預設值填入特定資料欄的空值,請使用
fillna()方法。# Assuming 'df' is the DataFrame read from the raw data stage # Drop rows where 'word' or 'corpus' is null cleaned_df = df.dropna(subset=["word", "corpus"]) # Fill nulls in 'word_count' with 0 cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
修正資料類型
強制執行正確且一致的結構定義,確保資料完整性和效能。使用 withColumn() 轉換和 cast() 方法,即可變更資料欄的資料類型。
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))
移除重複記錄
重複記錄可能會導致分析結果出現偏差。使用 dropDuplicates() 方法從 DataFrame 移除重複資料列。如要根據特定資料欄組合定義唯一性,請將資料欄名稱清單傳遞至 subset 參數。
# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()
# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])
篩除離群值
離群值可能會影響匯總結果。識別離群值的常見方法是四分位數間距 (IQR)。篩除超出四分位距定義範圍的值。
from pyspark.sql.functions import col
# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
(col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)
根據業務規則驗證資料
根據業務規則和品質限制,以程式輔助方式驗證資料。 使用 Spark 內建函式執行驗證。舉例來說,您可以使用聯結,根據一組已知的參照值檢查資料完整性。內彙整只會保留有效記錄,並篩除任何含有無效值的記錄。
充實資料集
清理及驗證資料後,請套用業務邏輯。這個步驟通常會將資料集與其他資料來源合併,並執行彙整作業,以充實資料集。
# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])
# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
corpus_metadata_df,
validated_df.corpus == corpus_metadata_df.corpus_name,
how="left"
)
將處理後的資料寫入 Cloud Storage
將轉換後的 DataFrame 持久儲存至 Cloud Storage 中的已處理資料階段。 請使用 Parquet 等最佳化欄位式儲存格式。在經常篩選的資料欄上進行資料分區,以提升查詢效能。
# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"
# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
.format("parquet") \
.partitionBy("corpus") \
.mode("overwrite") \
.save(processed_path)
print(f"Successfully wrote transformed data to {processed_path}")
mode("overwrite") 選項可確保管道每次執行時都會取代先前的輸出內容,讓工作成為冪等。
後續步驟
- 瞭解如何將處理過的資料載入 BigQuery 等資料倉儲。
- 進一步瞭解 Managed Service for Apache Spark。