Transformar dados

Depois de ingerir dados brutos, a próxima fase é criar um estágio de dados processados. Esse estágio refina os dados brutos em um conjunto de dados validados, limpos e conformados. Este documento fornece um guia para criar esse estágio de dados processados usando o PySpark no Managed Service for Apache Spark.

Antes de começar

  1. Faça login na sua Google Cloud conta do. Se você não conhece o Google Cloud, crie uma conta para avaliar o desempenho dos nossos produtos em cenários reais. Clientes novos também recebem US $300 em créditos para executar, testar e implantar cargas de trabalho.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  8. Verifique se você tem dados brutos disponíveis em um bucket do Cloud Storage.
  9. Crie um cluster do Managed Service for Apache Spark para executar jobs do PySpark.

Limpar e padronizar dados

A primeira etapa é resolver problemas comuns de qualidade de dados. A API do DataFrame do PySpark fornece funções para realizar tarefas básicas de limpeza de maneira distribuída.

Processar valores ausentes

Para processar valores nulos, descarte as linhas que os contêm ou preencha-as com um valor padrão.

  1. Para descartar linhas em que colunas específicas são nulas, use o método dropna().

  2. Para preencher valores nulos em colunas específicas com um padrão, use o método fillna().

    # Assuming 'df' is the DataFrame read from the raw data stage
    # Drop rows where 'word' or 'corpus' is null
    cleaned_df = df.dropna(subset=["word", "corpus"])
    
    # Fill nulls in 'word_count' with 0
    cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
    

Corrigir tipos de dados

Aplique um esquema correto e consistente para integridade de dados e desempenho. Use a transformação withColumn() com o método cast() para mudar o tipo de dados de uma coluna.

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))

Remover registros duplicados

Registros duplicados podem distorcer as análises. Use o método dropDuplicates() para remover linhas duplicadas de um DataFrame. Para definir a exclusividade com base em um conjunto específico de colunas, transmita uma lista de nomes de colunas para o parâmetro subset.

# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()

# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])

Filtrar outliers

Os outliers podem afetar as agregações. Um método comum para identificar outliers é o intervalo interquartil (IQR, na sigla em inglês). Filtre os valores que estão fora do intervalo definido pelo IQR.

from pyspark.sql.functions import col

# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1

lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
    (col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)

Validar dados com base nas regras de negócios

Valide dados de maneira programática com base nas regras de negócios e nas restrições de qualidade. Use as funções integradas do Spark para realizar validações. Por exemplo, use uma mesclagem para verificar a integridade dos dados em relação a um conjunto conhecido de valores de referência. Uma mesclagem interna retém apenas registros válidos, filtrando aqueles com valores inválidos.

Enriquecer o conjunto de dados

Depois de limpar e validar os dados, aplique a lógica de negócios. Essa etapa geralmente envolve o enriquecimento do conjunto de dados combinando-o com outras fontes de dados e realizando agregações.

# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])

# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
    corpus_metadata_df,
    validated_df.corpus == corpus_metadata_df.corpus_name,
    how="left"
)

Gravar os dados processados no Cloud Storage

Mantenha o DataFrame transformado no estágio de dados processados no Cloud Storage. Use um formato de armazenamento colunar otimizado, como o Parquet, para essa camada. Particione os dados em uma coluna filtrada com frequência para melhorar o desempenho da consulta.

# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"

# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
  .format("parquet") \
  .partitionBy("corpus") \
  .mode("overwrite") \
  .save(processed_path)

print(f"Successfully wrote transformed data to {processed_path}")

A opção mode("overwrite") garante que cada execução do pipeline substitua a saída anterior, o que torna o job idempotente.

A seguir