데이터 변환

원시 데이터를 수집한 후 다음 단계는 처리된 데이터 스테이지를 만드는 것입니다. 이 단계에서는 원시 데이터를 검증되고 정리되고 표준화된 데이터 세트로 정제합니다. 이 문서에서는 Apache Spark용 관리형 서비스에서 PySpark를 사용하여 처리된 데이터 단계를 빌드하는 방법을 안내합니다.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  8. Cloud Storage 버킷에서 원시 데이터를 사용할 수 있는지 확인합니다.
  9. PySpark 작업을 실행할 Apache Spark용 관리형 서비스 클러스터 만들기

데이터 정리 및 표준화

첫 번째 단계는 일반적인 데이터 품질 문제를 해결하는 것입니다. PySpark DataFrame API는 분산 방식으로 기본적인 정리 작업을 실행하는 함수를 제공합니다.

누락된 값 처리

null 값이 포함된 행을 삭제하거나 기본값으로 채워 null 값을 처리합니다.

  1. 특정 열이 null인 행을 삭제하려면 dropna() 메서드를 사용합니다.

  2. 특정 열의 null 값을 기본값으로 채우려면 fillna() 메서드를 사용하세요.

    # Assuming 'df' is the DataFrame read from the raw data stage
    # Drop rows where 'word' or 'corpus' is null
    cleaned_df = df.dropna(subset=["word", "corpus"])
    
    # Fill nulls in 'word_count' with 0
    cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
    

데이터 유형 수정

데이터 무결성 및 성능을 위해 올바르고 일관된 스키마를 적용합니다. cast() 메서드와 함께 withColumn() 변환을 사용하여 열의 데이터 유형을 변경합니다.

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))

중복 레코드 삭제

중복 레코드는 분석을 왜곡할 수 있습니다. dropDuplicates() 메서드를 사용하여 DataFrame에서 중복 행을 삭제합니다. 특정 열 집합을 기반으로 고유성을 정의하려면 열 이름 목록을 subset 매개변수에 전달합니다.

# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()

# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])

이상치 필터링

이상치는 집계에 영향을 줄 수 있습니다. 이상치를 식별하는 일반적인 방법은 사분위수 범위 (IQR)입니다. IQR로 정의된 범위를 벗어나는 값을 필터링합니다.

from pyspark.sql.functions import col

# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1

lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
    (col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)

비즈니스 규칙에 따라 데이터 검증

비즈니스 규칙 및 품질 제약 조건에 따라 데이터를 프로그래매틱 방식으로 검증합니다. Spark의 기본 제공 함수를 사용하여 유효성 검사를 실행합니다. 예를 들어 조인을 사용하여 알려진 참조 값 집합에 대해 데이터의 무결성을 확인할 수 있습니다. 내부 조인은 유효한 레코드만 유지하고 유효하지 않은 값이 있는 레코드는 필터링합니다.

데이터 세트 보강

데이터를 정리하고 검증한 후 비즈니스 로직을 적용합니다. 이 단계에서는 다른 데이터 소스와 결합하고 집계를 실행하여 데이터 세트를 보강하는 경우가 많습니다.

# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])

# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
    corpus_metadata_df,
    validated_df.corpus == corpus_metadata_df.corpus_name,
    how="left"
)

처리된 데이터를 Cloud Storage에 쓰기

변환된 DataFrame을 Cloud Storage의 처리된 데이터 스테이지에 유지합니다. 이 레이어에는 Parquet와 같은 최적화된 열 형식 스토리지를 사용합니다. 자주 필터링되는 열을 기준으로 데이터를 파티셔닝하여 쿼리 성능을 개선합니다.

# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"

# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
  .format("parquet") \
  .partitionBy("corpus") \
  .mode("overwrite") \
  .save(processed_path)

print(f"Successfully wrote transformed data to {processed_path}")

mode("overwrite") 옵션을 사용하면 파이프라인을 실행할 때마다 이전 출력이 대체되므로 작업이 동일하게 유지됩니다.

다음 단계