Mentransformasi data
Setelah Anda menyerap data mentah, fase berikutnya adalah membuat tahap data yang diproses. Tahap ini menyaring data mentah menjadi set data yang divalidasi, dibersihkan, dan disesuaikan. Dokumen ini memberikan panduan untuk membangun tahap data yang diproses ini menggunakan PySpark di Managed Service for Apache Spark.
Sebelum memulai
- Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Pastikan Anda memiliki data mentah yang tersedia di bucket Cloud Storage.
- Buat cluster Managed Service for Apache Spark untuk menjalankan tugas PySpark.
Membersihkan dan menstandardisasi data
Langkah pertama adalah mengatasi masalah kualitas data umum. PySpark DataFrame API menyediakan fungsi untuk melakukan tugas pembersihan dasar secara terdistribusi.
Menangani nilai yang tidak ada
Tangani nilai null dengan menghapus baris yang berisi nilai null atau dengan mengisi nilai null dengan nilai default.
Untuk menghapus baris tempat kolom tertentu bernilai null, gunakan metode
dropna().Untuk mengisi nilai null di kolom tertentu dengan nilai default, gunakan metode
fillna().# Assuming 'df' is the DataFrame read from the raw data stage # Drop rows where 'word' or 'corpus' is null cleaned_df = df.dropna(subset=["word", "corpus"]) # Fill nulls in 'word_count' with 0 cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
Jenis data yang benar
Terapkan skema yang benar dan konsisten untuk integritas dan performa data. Gunakan
transformasi withColumn() dengan metode cast() untuk mengubah jenis data kolom.
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))
Menghapus data duplikat
Data duplikat dapat mendistorsi analisis. Gunakan metode dropDuplicates() untuk
menghapus baris duplikat dari DataFrame. Untuk menentukan keunikan berdasarkan kumpulan kolom tertentu, teruskan daftar nama kolom ke parameter subset.
# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()
# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])
Memfilter pencilan
Pencilan dapat memengaruhi penggabungan. Metode umum untuk mengidentifikasi pencilan adalah Rentang Interkuartil (IQR). Mengecualikan nilai yang berada di luar rentang yang ditentukan oleh IQR.
from pyspark.sql.functions import col
# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
(col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)
Memvalidasi data berdasarkan aturan bisnis
Memvalidasi data secara terprogram terhadap aturan bisnis dan batasan kualitas. Gunakan fungsi bawaan Spark untuk melakukan validasi. Misalnya, gunakan gabungan untuk memeriksa integritas data terhadap sekumpulan nilai referensi yang diketahui. Inner join hanya mempertahankan kumpulan data yang valid, memfilter kumpulan data yang memiliki nilai tidak valid.
Memperkaya set data
Setelah Anda membersihkan dan memvalidasi data, terapkan logika bisnis. Langkah ini sering kali melibatkan pengayaan set data dengan menggabungkannya dengan sumber data lain dan melakukan agregasi.
# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])
# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
corpus_metadata_df,
validated_df.corpus == corpus_metadata_df.corpus_name,
how="left"
)
Menulis data yang diproses ke Cloud Storage
Pertahankan DataFrame yang ditransformasikan ke tahap data yang diproses di Cloud Storage. Gunakan format penyimpanan berbasis kolom yang dioptimalkan seperti Parquet untuk lapisan ini. Partisi data pada kolom yang sering difilter untuk meningkatkan performa kueri.
# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"
# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
.format("parquet") \
.partitionBy("corpus") \
.mode("overwrite") \
.save(processed_path)
print(f"Successfully wrote transformed data to {processed_path}")
Opsi mode("overwrite") memastikan bahwa setiap operasi pipeline menggantikan
output sebelumnya, yang membuat tugas menjadi idempoten.
Langkah berikutnya
- Pelajari cara memuat data yang diproses ke dalam data warehouse seperti BigQuery.
- Pelajari lebih lanjut Managed Service for Apache Spark.