Trasformare i dati

Dopo l'importazione dei dati non elaborati, la fase successiva consiste nel creare una fase di dati elaborati. In questa fase, i dati grezzi vengono perfezionati in un set di dati convalidato, pulito e conforme. Questo documento fornisce una guida per creare questa fase di dati elaborati utilizzando PySpark su Managed Service per Apache Spark.

Prima di iniziare

  1. Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  8. Assicurati di avere a disposizione i dati non elaborati in un bucket Cloud Storage.
  9. Crea un cluster Managed Service per Apache Spark per eseguire i job PySpark.

Pulire e standardizzare i dati

Il primo passo è risolvere i problemi comuni relativi alla qualità dei dati. L'API PySpark DataFrame fornisce funzioni per eseguire attività di pulizia di base in modo distribuito.

Gestire i valori mancanti

Gestisci i valori nulli eliminando le righe che li contengono o riempiendole con un valore predefinito.

  1. Per eliminare le righe in cui colonne specifiche sono nulle, utilizza il metodo dropna().

  2. Per riempire i valori null in colonne specifiche con un valore predefinito, utilizza il metodo fillna().

    # Assuming 'df' is the DataFrame read from the raw data stage
    # Drop rows where 'word' or 'corpus' is null
    cleaned_df = df.dropna(subset=["word", "corpus"])
    
    # Fill nulls in 'word_count' with 0
    cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
    

Tipi di dati corretti

Applica uno schema corretto e coerente per l'integrità e le prestazioni dei dati. Utilizza la trasformazione withColumn() con il metodo cast() per modificare il tipo di dati di una colonna.

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))

Rimuovere i record duplicati

I record duplicati possono falsare l'analisi. Utilizza il metodo dropDuplicates() per rimuovere le righe duplicate da un DataFrame. Per definire l'unicità in base a un insieme specifico di colonne, trasmetti un elenco di nomi di colonne al parametro subset.

# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()

# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])

Filtra i valori anomali

I valori anomali possono influire sulle aggregazioni. Un metodo comune per identificare gli outlier è l'intervallo interquartile (IQR). Filtra i valori che non rientrano nell'intervallo definito dall'IQR.

from pyspark.sql.functions import col

# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1

lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
    (col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)

Convalida dei dati in base alle regole aziendali

Convalida i dati in modo programmatico in base alle regole aziendali e ai vincoli di qualità. Utilizza le funzioni integrate di Spark per eseguire le convalide. Ad esempio, utilizza un join per verificare l'integrità dei dati rispetto a un insieme noto di valori di riferimento. Un inner join conserva solo i record validi, filtrando quelli con valori non validi.

Arricchisci il set di dati

Dopo aver pulito e convalidato i dati, applica la logica di business. Questo passaggio spesso comporta l'arricchimento del set di dati combinandolo con altre origini dati ed eseguendo aggregazioni.

# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])

# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
    corpus_metadata_df,
    validated_df.corpus == corpus_metadata_df.corpus_name,
    how="left"
)

Scrivi i dati elaborati in Cloud Storage

Salva il DataFrame trasformato nella fase dei dati elaborati in Cloud Storage. Utilizza un formato di archiviazione colonnare ottimizzato come Parquet per questo livello. Partiziona i dati in una colonna filtrata di frequente per migliorare le prestazioni delle query.

# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"

# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
  .format("parquet") \
  .partitionBy("corpus") \
  .mode("overwrite") \
  .save(processed_path)

print(f"Successfully wrote transformed data to {processed_path}")

L'opzione mode("overwrite") garantisce che ogni esecuzione della pipeline sostituisca l'output precedente, il che rende il job idempotente.

Passaggi successivi