转换数据
注入原始数据后,下一个阶段是创建处理后的数据阶段。此阶段会将原始数据优化为经过验证、清理和规范化的数据集。本文档提供了一个指南,介绍了如何使用 Managed Service for Apache Spark 上的 PySpark 构建此处理后的数据阶段。
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- 确保您在 Cloud Storage 存储桶中拥有原始数据。
- 创建 Managed Service for Apache Spark 集群以运行 PySpark 作业。
清理并标准化数据
第一步是解决常见的数据质量问题。PySpark DataFrame API 提供了一些函数,可用于以分布式方式执行基本清理任务。
处理缺失值
通过以下方式处理 null 值:舍弃包含 null 值的行,或使用默认值填充 null 值。
如需舍弃特定列为 null 的行,请使用
dropna()方法。如需使用默认值填充特定列中的 null 值,请使用
fillna()方法。# Assuming 'df' is the DataFrame read from the raw data stage # Drop rows where 'word' or 'corpus' is null cleaned_df = df.dropna(subset=["word", "corpus"]) # Fill nulls in 'word_count' with 0 cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
更正数据类型
强制执行正确且一致的架构,以确保数据完整性和性能。将 withColumn() 转换与 cast() 方法搭配使用,可更改列的数据类型。
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))
移除重复记录
重复记录可能会导致分析结果出现偏差。使用 dropDuplicates() 方法从 DataFrame 中移除重复的行。如需根据一组特定的列来定义唯一性,请将列名称列表传递给 subset 参数。
# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()
# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])
过滤掉离群值
离群值可能会影响汇总。识别离群点的常用方法是四分位距 (IQR)。过滤掉 IQR 定义的范围之外的值。
from pyspark.sql.functions import col
# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
(col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)
根据业务规则验证数据
根据业务规则和质量限制以编程方式验证数据。 使用 Spark 的内置函数执行验证。例如,使用联接根据一组已知的参考值检查数据的完整性。内联接仅保留有效记录,过滤掉所有包含无效值的记录。
丰富数据集
清理并验证数据后,应用业务逻辑。此步骤通常涉及通过将数据集与其他数据源相结合来丰富数据集,并执行汇总。
# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])
# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
corpus_metadata_df,
validated_df.corpus == corpus_metadata_df.corpus_name,
how="left"
)
将处理后的数据写入 Cloud Storage
将转换后的 DataFrame 持久保存到 Cloud Storage 中的已处理数据阶段。 此层使用优化的列式存储格式(例如 Parquet)。按经常过滤的列对数据进行分区,以提高查询性能。
# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"
# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
.format("parquet") \
.partitionBy("corpus") \
.mode("overwrite") \
.save(processed_path)
print(f"Successfully wrote transformed data to {processed_path}")
mode("overwrite") 选项可确保每次运行流水线都会替换之前的输出,从而使作业具有幂等性。
后续步骤
- 了解如何将处理后的数据加载到 BigQuery 等数据仓库中。
- 详细了解 Managed Service for Apache Spark。