Trasformare i dati
Dopo l'importazione dei dati non elaborati, la fase successiva consiste nel creare una fase di dati elaborati. In questa fase, i dati grezzi vengono perfezionati in un set di dati convalidato, pulito e conforme. Questo documento fornisce una guida per creare questa fase di dati elaborati utilizzando PySpark su Managed Service per Apache Spark.
Prima di iniziare
- Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Assicurati di avere a disposizione i dati non elaborati in un bucket Cloud Storage.
- Crea un cluster Managed Service per Apache Spark per eseguire i job PySpark.
Pulire e standardizzare i dati
Il primo passo è risolvere i problemi comuni relativi alla qualità dei dati. L'API PySpark DataFrame fornisce funzioni per eseguire attività di pulizia di base in modo distribuito.
Gestire i valori mancanti
Gestisci i valori nulli eliminando le righe che li contengono o riempiendole con un valore predefinito.
Per eliminare le righe in cui colonne specifiche sono nulle, utilizza il metodo
dropna().Per riempire i valori null in colonne specifiche con un valore predefinito, utilizza il metodo
fillna().# Assuming 'df' is the DataFrame read from the raw data stage # Drop rows where 'word' or 'corpus' is null cleaned_df = df.dropna(subset=["word", "corpus"]) # Fill nulls in 'word_count' with 0 cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
Tipi di dati corretti
Applica uno schema corretto e coerente per l'integrità e le prestazioni dei dati. Utilizza
la trasformazione withColumn() con il metodo cast() per modificare il tipo di dati di una colonna.
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))
Rimuovere i record duplicati
I record duplicati possono falsare l'analisi. Utilizza il metodo dropDuplicates() per
rimuovere le righe duplicate da un DataFrame. Per definire l'unicità in base a un insieme specifico di colonne, trasmetti un elenco di nomi di colonne al parametro subset.
# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()
# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])
Filtra i valori anomali
I valori anomali possono influire sulle aggregazioni. Un metodo comune per identificare gli outlier è l'intervallo interquartile (IQR). Filtra i valori che non rientrano nell'intervallo definito dall'IQR.
from pyspark.sql.functions import col
# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
(col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)
Convalida dei dati in base alle regole aziendali
Convalida i dati in modo programmatico in base alle regole aziendali e ai vincoli di qualità. Utilizza le funzioni integrate di Spark per eseguire le convalide. Ad esempio, utilizza un join per verificare l'integrità dei dati rispetto a un insieme noto di valori di riferimento. Un inner join conserva solo i record validi, filtrando quelli con valori non validi.
Arricchisci il set di dati
Dopo aver pulito e convalidato i dati, applica la logica di business. Questo passaggio spesso comporta l'arricchimento del set di dati combinandolo con altre origini dati ed eseguendo aggregazioni.
# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])
# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
corpus_metadata_df,
validated_df.corpus == corpus_metadata_df.corpus_name,
how="left"
)
Scrivi i dati elaborati in Cloud Storage
Salva il DataFrame trasformato nella fase dei dati elaborati in Cloud Storage. Utilizza un formato di archiviazione colonnare ottimizzato come Parquet per questo livello. Partiziona i dati in una colonna filtrata di frequente per migliorare le prestazioni delle query.
# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"
# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
.format("parquet") \
.partitionBy("corpus") \
.mode("overwrite") \
.save(processed_path)
print(f"Successfully wrote transformed data to {processed_path}")
L'opzione mode("overwrite") garantisce che ogni esecuzione della pipeline sostituisca
l'output precedente, il che rende il job idempotente.
Passaggi successivi
- Scopri come caricare i dati elaborati in un data warehouse come BigQuery.
- Scopri di più su Managed Service per Apache Spark.