데이터 변환
원시 데이터를 수집한 후 다음 단계는 처리된 데이터 스테이지를 만드는 것입니다. 이 단계에서는 원시 데이터를 검증되고 정리되고 표준화된 데이터 세트로 정제합니다. 이 문서에서는 Apache Spark용 관리형 서비스에서 PySpark를 사용하여 처리된 데이터 단계를 빌드하는 방법을 안내합니다.
시작하기 전에
- Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Cloud Storage 버킷에서 원시 데이터를 사용할 수 있는지 확인합니다.
- PySpark 작업을 실행할 Apache Spark용 관리형 서비스 클러스터 만들기
데이터 정리 및 표준화
첫 번째 단계는 일반적인 데이터 품질 문제를 해결하는 것입니다. PySpark DataFrame API는 분산 방식으로 기본적인 정리 작업을 실행하는 함수를 제공합니다.
누락된 값 처리
null 값이 포함된 행을 삭제하거나 기본값으로 채워 null 값을 처리합니다.
특정 열이 null인 행을 삭제하려면
dropna()메서드를 사용합니다.특정 열의 null 값을 기본값으로 채우려면
fillna()메서드를 사용하세요.# Assuming 'df' is the DataFrame read from the raw data stage # Drop rows where 'word' or 'corpus' is null cleaned_df = df.dropna(subset=["word", "corpus"]) # Fill nulls in 'word_count' with 0 cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
데이터 유형 수정
데이터 무결성 및 성능을 위해 올바르고 일관된 스키마를 적용합니다. cast() 메서드와 함께 withColumn() 변환을 사용하여 열의 데이터 유형을 변경합니다.
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))
중복 레코드 삭제
중복 레코드는 분석을 왜곡할 수 있습니다. dropDuplicates() 메서드를 사용하여 DataFrame에서 중복 행을 삭제합니다. 특정 열 집합을 기반으로 고유성을 정의하려면 열 이름 목록을 subset 매개변수에 전달합니다.
# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()
# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])
이상치 필터링
이상치는 집계에 영향을 줄 수 있습니다. 이상치를 식별하는 일반적인 방법은 사분위수 범위 (IQR)입니다. IQR로 정의된 범위를 벗어나는 값을 필터링합니다.
from pyspark.sql.functions import col
# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
(col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)
비즈니스 규칙에 따라 데이터 검증
비즈니스 규칙 및 품질 제약 조건에 따라 데이터를 프로그래매틱 방식으로 검증합니다. Spark의 기본 제공 함수를 사용하여 유효성 검사를 실행합니다. 예를 들어 조인을 사용하여 알려진 참조 값 집합에 대해 데이터의 무결성을 확인할 수 있습니다. 내부 조인은 유효한 레코드만 유지하고 유효하지 않은 값이 있는 레코드는 필터링합니다.
데이터 세트 보강
데이터를 정리하고 검증한 후 비즈니스 로직을 적용합니다. 이 단계에서는 다른 데이터 소스와 결합하고 집계를 실행하여 데이터 세트를 보강하는 경우가 많습니다.
# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])
# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
corpus_metadata_df,
validated_df.corpus == corpus_metadata_df.corpus_name,
how="left"
)
처리된 데이터를 Cloud Storage에 쓰기
변환된 DataFrame을 Cloud Storage의 처리된 데이터 스테이지에 유지합니다. 이 레이어에는 Parquet와 같은 최적화된 열 형식 스토리지를 사용합니다. 자주 필터링되는 열을 기준으로 데이터를 파티셔닝하여 쿼리 성능을 개선합니다.
# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"
# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
.format("parquet") \
.partitionBy("corpus") \
.mode("overwrite") \
.save(processed_path)
print(f"Successfully wrote transformed data to {processed_path}")
mode("overwrite") 옵션을 사용하면 파이프라인을 실행할 때마다 이전 출력이 대체되므로 작업이 동일하게 유지됩니다.
다음 단계
- 처리된 데이터를 BigQuery와 같은 데이터 웨어하우스에 로드하는 방법을 알아봅니다.
- Apache Spark용 관리형 서비스에 대해 자세히 알아보세요.