Transformer les données
Une fois les données brutes ingérées, l'étape suivante consiste à créer une étape de données traitées. Cette étape consiste à affiner les données brutes pour obtenir un ensemble de données validé, nettoyé et conforme. Ce document explique comment créer cette étape de traitement des données à l'aide de PySpark sur Managed Service pour Apache Spark.
Avant de commencer
- Connectez-vous à votre compte Google Cloud . Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $de crédits sans frais pour exécuter, tester et déployer des charges de travail.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Assurez-vous de disposer de données brutes dans un bucket Cloud Storage.
- Créez un cluster Managed Service pour Apache Spark pour exécuter des jobs PySpark.
Nettoyer et normaliser les données
La première étape consiste à résoudre les problèmes courants de qualité des données. L'API PySpark DataFrame fournit des fonctions permettant d'effectuer des tâches de nettoyage de base de manière distribuée.
Gérer les valeurs manquantes
Gérez les valeurs nulles en supprimant les lignes qui en contiennent ou en les remplaçant par une valeur par défaut.
Pour supprimer les lignes dans lesquelles des colonnes spécifiques sont nulles, utilisez la méthode
dropna().Pour remplir les valeurs nulles dans des colonnes spécifiques avec une valeur par défaut, utilisez la méthode
fillna().# Assuming 'df' is the DataFrame read from the raw data stage # Drop rows where 'word' or 'corpus' is null cleaned_df = df.dropna(subset=["word", "corpus"]) # Fill nulls in 'word_count' with 0 cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
Corriger les types de données
Appliquez un schéma correct et cohérent pour assurer l'intégrité et les performances des données. Utilisez la transformation withColumn() avec la méthode cast() pour modifier le type de données d'une colonne.
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))
Supprimer les enregistrements en double
Les enregistrements en double peuvent fausser les données analytiques. Utilisez la méthode dropDuplicates() pour supprimer les lignes en double d'un DataFrame. Pour définir l'unicité en fonction d'un ensemble spécifique de colonnes, transmettez une liste de noms de colonnes au paramètre subset.
# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()
# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])
Filtrer les valeurs aberrantes
Les valeurs aberrantes peuvent affecter les agrégations. Une méthode courante pour identifier les valeurs aberrantes est l'écart interquartile (IQR). Filtrez les valeurs qui se situent en dehors de la plage définie par l'IQR.
from pyspark.sql.functions import col
# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
(col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)
Valider les données par rapport aux règles métier
Validez les données de manière programmatique par rapport aux règles métier et aux contraintes de qualité. Utilisez les fonctions intégrées de Spark pour effectuer des validations. Par exemple, utilisez une jointure pour vérifier l'intégrité des données par rapport à un ensemble connu de valeurs de référence. Une jointure interne ne conserve que les enregistrements valides et filtre ceux qui contiennent des valeurs non valides.
Enrichir l'ensemble de données
Après avoir nettoyé et validé les données, appliquez la logique métier. Cette étape consiste souvent à enrichir l'ensemble de données en le combinant avec d'autres sources de données et en effectuant des agrégations.
# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])
# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
corpus_metadata_df,
validated_df.corpus == corpus_metadata_df.corpus_name,
how="left"
)
Écrire les données traitées dans Cloud Storage
Conservez le DataFrame transformé dans l'étape de données traitées de Cloud Storage. Utilisez un format de stockage en colonnes optimisé comme Parquet pour cette couche. Partitionnez les données sur une colonne fréquemment filtrée pour améliorer les performances des requêtes.
# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"
# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
.format("parquet") \
.partitionBy("corpus") \
.mode("overwrite") \
.save(processed_path)
print(f"Successfully wrote transformed data to {processed_path}")
L'option mode("overwrite") garantit que chaque exécution du pipeline remplace la sortie précédente, ce qui rend le job idempotent.
Étapes suivantes
- Découvrez comment charger les données traitées dans un entrepôt de données tel que BigQuery.
- En savoir plus sur Managed Service pour Apache Spark