轉換資料

擷取原始資料後,下一個階段是建立已處理資料階段。這個階段會將原始資料精煉為經過驗證、清理及符合規範的資料集。本文將說明如何使用 Managed Service for Apache Spark 上的 PySpark,建構這個處理過的資料階段。

事前準備

  1. 登入 Google Cloud 帳戶。如果您是 Google Cloud新手,歡迎 建立帳戶,親自評估產品在實際工作環境中的成效。新客戶還能獲得價值 $300 美元的免費抵免額,可用於執行、測試及部署工作負載。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  8. 確認 Cloud Storage bucket 中有原始資料。
  9. 建立 Managed Service for Apache Spark 叢集,執行 PySpark 工作。

清理及標準化資料

第一步是解決常見的資料品質問題。PySpark DataFrame API 提供多項函式,以分散式方式執行基礎清理工作。

處理遺漏值

處理空值,方法是捨棄含有空值的資料列,或以預設值填入空值。

  1. 如要捨棄特定資料欄為空值的資料列,請使用 dropna() 方法。

  2. 如要以預設值填入特定資料欄的空值,請使用 fillna() 方法。

    # Assuming 'df' is the DataFrame read from the raw data stage
    # Drop rows where 'word' or 'corpus' is null
    cleaned_df = df.dropna(subset=["word", "corpus"])
    
    # Fill nulls in 'word_count' with 0
    cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
    

修正資料類型

強制執行正確且一致的結構定義,確保資料完整性和效能。使用 withColumn() 轉換和 cast() 方法,即可變更資料欄的資料類型。

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))

移除重複記錄

重複記錄可能會導致分析結果出現偏差。使用 dropDuplicates() 方法從 DataFrame 移除重複資料列。如要根據特定資料欄組合定義唯一性,請將資料欄名稱清單傳遞至 subset 參數。

# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()

# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])

篩除離群值

離群值可能會影響匯總結果。識別離群值的常見方法是四分位數間距 (IQR)。篩除超出四分位距定義範圍的值。

from pyspark.sql.functions import col

# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1

lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
    (col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)

根據業務規則驗證資料

根據業務規則和品質限制,以程式輔助方式驗證資料。 使用 Spark 內建函式執行驗證。舉例來說,您可以使用聯結,根據一組已知的參照值檢查資料完整性。內彙整只會保留有效記錄,並篩除任何含有無效值的記錄。

充實資料集

清理及驗證資料後,請套用業務邏輯。這個步驟通常會將資料集與其他資料來源合併,並執行彙整作業,以充實資料集。

# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])

# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
    corpus_metadata_df,
    validated_df.corpus == corpus_metadata_df.corpus_name,
    how="left"
)

將處理後的資料寫入 Cloud Storage

將轉換後的 DataFrame 持久儲存至 Cloud Storage 中的已處理資料階段。 請使用 Parquet 等最佳化欄位式儲存格式。在經常篩選的資料欄上進行資料分區,以提升查詢效能。

# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"

# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
  .format("parquet") \
  .partitionBy("corpus") \
  .mode("overwrite") \
  .save(processed_path)

print(f"Successfully wrote transformed data to {processed_path}")

mode("overwrite") 選項可確保管道每次執行時都會取代先前的輸出內容,讓工作成為冪等。

後續步驟