转换数据

注入原始数据后,下一个阶段是创建处理后的数据阶段。此阶段会将原始数据优化为经过验证、清理和规范化的数据集。本文档提供了一个指南,介绍了如何使用 Managed Service for Apache Spark 上的 PySpark 构建此处理后的数据阶段。

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  8. 确保您在 Cloud Storage 存储桶中拥有原始数据。
  9. 创建 Managed Service for Apache Spark 集群以运行 PySpark 作业。

清理并标准化数据

第一步是解决常见的数据质量问题。PySpark DataFrame API 提供了一些函数,可用于以分布式方式执行基本清理任务。

处理缺失值

通过以下方式处理 null 值:舍弃包含 null 值的行,或使用默认值填充 null 值。

  1. 如需舍弃特定列为 null 的行,请使用 dropna() 方法。

  2. 如需使用默认值填充特定列中的 null 值,请使用 fillna() 方法。

    # Assuming 'df' is the DataFrame read from the raw data stage
    # Drop rows where 'word' or 'corpus' is null
    cleaned_df = df.dropna(subset=["word", "corpus"])
    
    # Fill nulls in 'word_count' with 0
    cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
    

更正数据类型

强制执行正确且一致的架构,以确保数据完整性和性能。将 withColumn() 转换与 cast() 方法搭配使用,可更改列的数据类型。

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))

移除重复记录

重复记录可能会导致分析结果出现偏差。使用 dropDuplicates() 方法从 DataFrame 中移除重复的行。如需根据一组特定的列来定义唯一性,请将列名称列表传递给 subset 参数。

# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()

# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])

过滤掉离群值

离群值可能会影响汇总。识别离群点的常用方法是四分位距 (IQR)。过滤掉 IQR 定义的范围之外的值。

from pyspark.sql.functions import col

# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1

lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
    (col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)

根据业务规则验证数据

根据业务规则和质量限制以编程方式验证数据。 使用 Spark 的内置函数执行验证。例如,使用联接根据一组已知的参考值检查数据的完整性。内联接仅保留有效记录,过滤掉所有包含无效值的记录。

丰富数据集

清理并验证数据后,应用业务逻辑。此步骤通常涉及通过将数据集与其他数据源相结合来丰富数据集,并执行汇总。

# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])

# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
    corpus_metadata_df,
    validated_df.corpus == corpus_metadata_df.corpus_name,
    how="left"
)

将处理后的数据写入 Cloud Storage

将转换后的 DataFrame 持久保存到 Cloud Storage 中的已处理数据阶段。 此层使用优化的列式存储格式(例如 Parquet)。按经常过滤的列对数据进行分区,以提高查询性能。

# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"

# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
  .format("parquet") \
  .partitionBy("corpus") \
  .mode("overwrite") \
  .save(processed_path)

print(f"Successfully wrote transformed data to {processed_path}")

mode("overwrite") 选项可确保每次运行流水线都会替换之前的输出,从而使作业具有幂等性。

后续步骤