データの変換
生データを取り込んだら、次のフェーズは処理済みデータ ステージの作成です。このステージでは、元データを検証、クレンジング、適合化されたデータセットに変換します。このドキュメントでは、Managed Service for Apache Spark で PySpark を使用して、この処理済みデータ ステージを構築する方法について説明します。
始める前に
- Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Cloud Storage バケットで元のデータを利用できるようにします。
- Managed Service for Apache Spark クラスタを作成して、PySpark ジョブを実行します。
データをクリーンアップして標準化する
最初の手順は、一般的なデータ品質の問題に対処することです。PySpark DataFrame API には、基本的なクリーンアップ タスクを分散方式で実行する関数が用意されています。
欠損値を処理する
NULL 値を含む行を削除するか、デフォルト値で NULL 値を埋めて、NULL 値を処理します。
特定の列が null の行を削除するには、
dropna()メソッドを使用します。特定の列の null 値をデフォルトで埋めるには、
fillna()メソッドを使用します。# Assuming 'df' is the DataFrame read from the raw data stage # Drop rows where 'word' or 'corpus' is null cleaned_df = df.dropna(subset=["word", "corpus"]) # Fill nulls in 'word_count' with 0 cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
データ型を修正する
データの完全性とパフォーマンスを確保するために、正しいスキーマと一貫性のあるスキーマを適用します。cast() メソッドで withColumn() 変換を使用して、列のデータ型を変更します。
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))
重複するレコードを削除する
重複するレコードがあると、分析結果に偏りが生じる可能性があります。DataFrame から重複する行を削除するには、dropDuplicates() メソッドを使用します。特定の列のセットに基づいて一意性を定義するには、列名のリストを subset パラメータに渡します。
# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()
# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])
外れ値をフィルタする
外れ値は集計に影響する可能性があります。外れ値を特定する一般的な方法は、四分位範囲(IQR)です。IQR で定義された範囲外の値を除外します。
from pyspark.sql.functions import col
# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
(col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)
ビジネスルールに対してデータを検証する
ビジネスルールと品質制約に照らしてデータをプログラムで検証します。Spark の組み込み関数を使用して検証を行います。たとえば、結合を使用して、既知の参照値のセットに対してデータの整合性を確認します。内部結合では、有効なレコードのみが保持され、無効な値を含むレコードは除外されます。
データセットを拡充する
データをクリーンアップして検証したら、ビジネス ロジックを適用します。このステップでは、多くの場合、他のデータソースと組み合わせて集計を実行することで、データセットを拡充します。
# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])
# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
corpus_metadata_df,
validated_df.corpus == corpus_metadata_df.corpus_name,
how="left"
)
処理されたデータを Cloud Storage に書き込む
変換された DataFrame を Cloud Storage の処理済みデータ ステージに永続化します。このレイヤには、Parquet などの最適化されたカラム型ストレージ形式を使用します。クエリのパフォーマンスを向上させるため、頻繁にフィルタリングされる列のデータをパーティショニングします。
# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"
# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
.format("parquet") \
.partitionBy("corpus") \
.mode("overwrite") \
.save(processed_path)
print(f"Successfully wrote transformed data to {processed_path}")
mode("overwrite") オプションを使用すると、パイプラインの各実行で以前の出力が置き換えられ、ジョブがべき等になります。
次のステップ
- 処理されたデータを BigQuery などのデータ ウェアハウスに読み込む方法を確認する。
- Managed Service for Apache Spark の詳細を確認する。