データの変換

生データを取り込んだら、次のフェーズは処理済みデータ ステージの作成です。このステージでは、元データを検証、クレンジング、適合化されたデータセットに変換します。このドキュメントでは、Managed Service for Apache Spark で PySpark を使用して、この処理済みデータ ステージを構築する方法について説明します。

始める前に

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  8. Cloud Storage バケットで元のデータを利用できるようにします。
  9. Managed Service for Apache Spark クラスタを作成して、PySpark ジョブを実行します。

データをクリーンアップして標準化する

最初の手順は、一般的なデータ品質の問題に対処することです。PySpark DataFrame API には、基本的なクリーンアップ タスクを分散方式で実行する関数が用意されています。

欠損値を処理する

NULL 値を含む行を削除するか、デフォルト値で NULL 値を埋めて、NULL 値を処理します。

  1. 特定の列が null の行を削除するには、dropna() メソッドを使用します。

  2. 特定の列の null 値をデフォルトで埋めるには、fillna() メソッドを使用します。

    # Assuming 'df' is the DataFrame read from the raw data stage
    # Drop rows where 'word' or 'corpus' is null
    cleaned_df = df.dropna(subset=["word", "corpus"])
    
    # Fill nulls in 'word_count' with 0
    cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
    

データ型を修正する

データの完全性とパフォーマンスを確保するために、正しいスキーマと一貫性のあるスキーマを適用します。cast() メソッドで withColumn() 変換を使用して、列のデータ型を変更します。

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))

重複するレコードを削除する

重複するレコードがあると、分析結果に偏りが生じる可能性があります。DataFrame から重複する行を削除するには、dropDuplicates() メソッドを使用します。特定の列のセットに基づいて一意性を定義するには、列名のリストを subset パラメータに渡します。

# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()

# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])

外れ値をフィルタする

外れ値は集計に影響する可能性があります。外れ値を特定する一般的な方法は、四分位範囲(IQR)です。IQR で定義された範囲外の値を除外します。

from pyspark.sql.functions import col

# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1

lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
    (col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)

ビジネスルールに対してデータを検証する

ビジネスルールと品質制約に照らしてデータをプログラムで検証します。Spark の組み込み関数を使用して検証を行います。たとえば、結合を使用して、既知の参照値のセットに対してデータの整合性を確認します。内部結合では、有効なレコードのみが保持され、無効な値を含むレコードは除外されます。

データセットを拡充する

データをクリーンアップして検証したら、ビジネス ロジックを適用します。このステップでは、多くの場合、他のデータソースと組み合わせて集計を実行することで、データセットを拡充します。

# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])

# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
    corpus_metadata_df,
    validated_df.corpus == corpus_metadata_df.corpus_name,
    how="left"
)

処理されたデータを Cloud Storage に書き込む

変換された DataFrame を Cloud Storage の処理済みデータ ステージに永続化します。このレイヤには、Parquet などの最適化されたカラム型ストレージ形式を使用します。クエリのパフォーマンスを向上させるため、頻繁にフィルタリングされる列のデータをパーティショニングします。

# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"

# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
  .format("parquet") \
  .partitionBy("corpus") \
  .mode("overwrite") \
  .save(processed_path)

print(f"Successfully wrote transformed data to {processed_path}")

mode("overwrite") オプションを使用すると、パイプラインの各実行で以前の出力が置き換えられ、ジョブがべき等になります。

次のステップ