Daten transformieren
Nachdem Sie Rohdaten aufgenommen haben, erstellen Sie als Nächstes eine Staging-Phase für verarbeitete Daten. In dieser Phase werden Rohdaten in ein validiertes, bereinigtes und angepasstes Dataset umgewandelt. In diesem Dokument wird beschrieben, wie Sie diese Phase der Verarbeitung von Daten mit PySpark in Managed Service for Apache Spark erstellen.
Hinweis
- Melden Sie sich in Ihrem Google Cloud -Konto an. Wenn Sie mit Google Cloudnoch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Rohdaten müssen in einem Cloud Storage-Bucket verfügbar sein.
- Erstellen Sie einen Managed Service for Apache Spark-Cluster, um PySpark-Jobs auszuführen.
Daten bereinigen und standardisieren
Im ersten Schritt müssen Sie häufige Probleme mit der Datenqualität beheben. Die PySpark DataFrame API bietet Funktionen zum Ausführen grundlegender Bereinigungsaufgaben auf verteilte Weise.
Fehlende Werte verarbeiten
Behandeln Sie Nullwerte, indem Sie entweder Zeilen mit Nullwerten entfernen oder sie mit einem Standardwert füllen.
Wenn Sie Zeilen entfernen möchten, in denen bestimmte Spalten null sind, verwenden Sie die Methode
dropna().Verwenden Sie die Methode
fillna(), um Nullwerte in bestimmten Spalten durch einen Standardwert zu ersetzen.# Assuming 'df' is the DataFrame read from the raw data stage # Drop rows where 'word' or 'corpus' is null cleaned_df = df.dropna(subset=["word", "corpus"]) # Fill nulls in 'word_count' with 0 cleaned_df = cleaned_df.fillna(0, subset=["word_count"])
Datentypen korrigieren
Erzwingen Sie ein korrektes und konsistentes Schema für Datenintegrität und Leistung. Verwenden Sie die withColumn()-Transformation mit der cast()-Methode, um den Datentyp einer Spalte zu ändern.
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
# Cast the 'word_count' column to integer
typed_df = cleaned_df.withColumn("word_count_int", col("word_count").cast(IntegerType()))
Doppelte Datensätze entfernen
Doppelte Datensätze können Analysen verfälschen. Mit der Methode dropDuplicates() lassen sich doppelte Zeilen aus einem DataFrame entfernen. Wenn Sie die Eindeutigkeit anhand einer bestimmten Gruppe von Spalten definieren möchten, übergeben Sie eine Liste mit Spaltennamen an den Parameter subset.
# Remove rows that are complete duplicates across all columns
deduped_df = typed_df.dropDuplicates()
# Remove rows with duplicate 'word' and 'corpus' combinations
key_deduped_df = typed_df.dropDuplicates(subset=["word", "corpus"])
Ausreißer herausfiltern
Ausreißer können sich auf Aggregationen auswirken. Eine gängige Methode zum Identifizieren von Ausreißern ist der Interquartilbereich (IQR). Filtern Sie Werte heraus, die außerhalb des durch den IQR definierten Bereichs liegen.
from pyspark.sql.functions import col
# Calculate Q1, Q3, and IQR for the 'word_count_int' column
quantiles = key_deduped_df.approxQuantile("word_count_int", [0.25, 0.75], 0.05)
q1 = quantiles[0]
q3 = quantiles[1]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
# Filter out the outliers
no_outliers_df = key_deduped_df.filter(
(col("word_count_int") >= lower_bound) & (col("word_count_int") <= upper_bound)
)
Daten anhand von Geschäftsregeln validieren
Daten programmatisch anhand von Geschäftsregeln und Qualitätsbeschränkungen validieren Verwenden Sie die integrierten Funktionen von Spark, um Validierungen durchzuführen. Sie können beispielsweise einen Join verwenden, um die Integrität von Daten anhand einer bekannten Gruppe von Referenzwerten zu prüfen. Bei einem Inner Join werden nur gültige Datensätze beibehalten und alle mit ungültigen Werten herausgefiltert.
Dataset anreichern
Nachdem Sie die Daten bereinigt und validiert haben, wenden Sie die Geschäftslogik an. Bei diesem Schritt wird das Dataset häufig angereichert, indem es mit anderen Datenquellen kombiniert und aggregiert wird.
# Example: Enriching with a hypothetical corpus metadata DataFrame
corpus_metadata_data = [("Corpus A", 2010, "Fiction"), ("Corpus B", 1998, "News"), ("Corpus C", 2015, "Scientific")]
corpus_metadata_df = spark.createDataFrame(corpus_metadata_data, ["corpus_name", "est_year", "genre"])
# Join the main DataFrame with the metadata
enriched_df = validated_df.join(
corpus_metadata_df,
validated_df.corpus == corpus_metadata_df.corpus_name,
how="left"
)
Verarbeitete Daten in Cloud Storage schreiben
Speichern Sie den transformierten DataFrame in der Phase „Verarbeitete Daten“ in Cloud Storage. Verwenden Sie für diese Ebene ein optimiertes, spaltenorientiertes Speicherformat wie Parquet. Partitionieren Sie die Daten nach einer Spalte, die häufig gefiltert wird, um die Abfrageleistung zu verbessern.
# --- Configuration ---
processed_path = f"gs://BUCKET_NAME/processed/processed_data"
# --- Write to Cloud Storage in Parquet format, partitioned by corpus ---
enriched_df.write \
.format("parquet") \
.partitionBy("corpus") \
.mode("overwrite") \
.save(processed_path)
print(f"Successfully wrote transformed data to {processed_path}")
Die Option mode("overwrite") sorgt dafür, dass bei jeder Ausführung der Pipeline die vorherige Ausgabe ersetzt wird. Dadurch wird der Job idempotent.