Transforma los datos

Después de transferir los datos sin procesar, la siguiente fase es crear una etapa de datos procesados. En esta etapa, se refinan los datos sin procesar en un conjunto de datos validados, limpios y ajustados. En este documento, se proporciona una guía para compilar esta etapa de datos procesados con PySpark en Managed Service for Apache Spark.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud . Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  8. Asegúrate de tener datos sin procesar disponibles en un bucket de Cloud Storage.
  9. Crea un clúster de Managed Service for Apache Spark para ejecutar trabajos de PySpark.

Limpia y estandariza los datos

El primer paso es abordar los problemas comunes de calidad de los datos. La API de DataFrame de PySpark proporciona funciones para realizar tareas de limpieza básicas de forma distribuida.

Cómo controlar los valores faltantes

Controla los valores nulos descartando las filas que los contienen o completándolos con un valor predeterminado.

  1. Para descartar filas en las que columnas específicas son nulas, usa el método dropna().

  2. Para completar los valores nulos en columnas específicas con un valor predeterminado, usa el método fillna().

    # Assuming 'df' is the DataFrame read from the raw data stage
    # Drop rows where 'word' or 'corpus' is null
    cleaned_df = df.dropna(subset=["word", "corpus"])
    
    # Fill nulls in 'word_count' with 0
    cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
    

Tipos de datos correctos

Aplicar un esquema correcto y coherente para la integridad y el rendimiento de los datos Usa la transformación withColumn() con el método cast() para cambiar el tipo de datos de una columna.

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))

Cómo quitar registros duplicados

Los registros duplicados pueden sesgar las estadísticas. Usa el método dropDuplicates() para quitar las filas duplicadas de un DataFrame. Para definir la unicidad en función de un conjunto específico de columnas, pasa una lista de nombres de columna al parámetro subset.

# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()

# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])

Filtrar valores atípicos

Los valores atípicos pueden afectar las agregaciones. Un método común para identificar valores atípicos es el rango intercuartílico (IQR). Filtra los valores que se encuentran fuera del rango definido por el IQR.

from pyspark.sql.functions import col

# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1

lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
    (col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)

Validar los datos según las reglas de negocio

Validar datos de forma programática en función de reglas comerciales y restricciones de calidad Usa las funciones integradas de Spark para realizar validaciones. Por ejemplo, usa una unión para verificar la integridad de los datos en comparación con un conjunto conocido de valores de referencia. Una combinación interna conserva solo los registros válidos y filtra los que tienen valores no válidos.

Enriquece el conjunto de datos

Después de limpiar y validar los datos, aplica la lógica empresarial. Este paso suele implicar enriquecer el conjunto de datos combinándolo con otras fuentes de datos y realizando agregaciones.

# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])

# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
    corpus_metadata_df,
    validated_df.corpus == corpus_metadata_df.corpus_name,
    how="left"
)

Escribe los datos procesados en Cloud Storage

Persiste el DataFrame transformado en la etapa de datos procesados en Cloud Storage. Usa un formato de almacenamiento en columnas optimizado, como Parquet, para esta capa. Particiona los datos en una columna que se filtra con frecuencia para mejorar el rendimiento de las consultas.

# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"

# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
  .format("parquet") \
  .partitionBy("corpus") \
  .mode("overwrite") \
  .save(processed_path)

print(f"Successfully wrote transformed data to {processed_path}")

La opción mode("overwrite") garantiza que cada ejecución de la canalización reemplace el resultado anterior, lo que hace que el trabajo sea idempotente.

¿Qué sigue?