Daten in BigQuery laden
In diesem Dokument wird beschrieben, wie Sie mit Managed Service for Apache Spark einen Spark-Job ausführen, der verarbeitete Daten aus Cloud Storage in eine BigQuery-Tabelle lädt. Managed Service for Apache Spark optimiert diesen Prozess, indem die Spark-Umgebung und die erforderlichen Connectors verwaltet werden.
Hinweis
- Melden Sie sich in Ihrem Google Cloud Konto an. Wenn Sie noch kein Google Cloud-Nutzer sind, erstellen Sie ein Konto, um zu sehen, wie sich unsere Produkte in realen Szenarien schlagen. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Erstellen Sie einen Cloud Storage-Bucket.
- Erstellen Sie einen Managed Service for Apache Spark
Cluster mit Image-Version
2.1oder höher. - Erstellen Sie ein BigQuery-Dataset.
PySpark-Skript vorbereiten
Erstellen Sie eine lokale Python-Datei mit dem Namen
load_analytics_data.py.Fügen Sie folgenden Code in die Datei ein. Dieses Skript liest Daten aus einem Cloud Storage-Pfad, führt eine Aggregation durch und schreibt das Ergebnis in BigQuery.
from pyspark.sql import SparkSession from pyspark.sql.functions import sum as _sum import sys # --- Configuration --- gcs_bucket = "BUCKET_NAME" bq_project = "PROJECT_ID" bq_dataset = "DATASET" bq_table = "corpus_word_counts" # --- Paths --- processed_path = f"gs://{gcs_bucket}/processed/processed_data" temp_gcs_path = f"{gcs_bucket}" # --- Spark Session Initialization --- spark = SparkSession.builder \ .appName("Dataproc-BigQuery-Load") \ .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \ .getOrCreate() # --- Read Processed Data from Cloud Storage --- processed_df = spark.read.parquet(processed_path) # --- Final Aggregation for Analytics-Ready Stage --- analytics_df = processed_df.groupBy("corpus") \ .agg(_sum("word_count_int").alias("total_word_count")) \ .orderBy("corpus") print("Aggregated Analytics-Ready data:") analytics_df.show() # --- Write DataFrame to BigQuery --- print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}") analytics_df.write \ .format("bigquery") \ .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \ .mode("append") \ .save() print("Successfully wrote data to BigQuery.") # --- Stop Spark Session --- spark.stop()Ersetzen Sie die folgenden Platzhalter:
BUCKET_NAME: Der Name Ihres Cloud Storage-Bucket.PROJECT_ID: Ihre Google Cloud Projekt-ID.
Laden Sie das Skript
load_analytics_data.pyin Ihren Cloud Storage-Bucket hoch.
Managed Service for Apache Spark-Job senden
Senden Sie das PySpark-Skript als Job an Ihren Managed Service for Apache Spark-Cluster.
Führen Sie in einem Terminal den Befehl
gcloud dataproc jobs submit pysparkaus:gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \ --cluster=CLUSTER_NAME \ --region=REGIONErsetzen Sie die folgenden Platzhalter:
BUCKET_NAME: Der Name Ihres Cloud Storage-Bucket.CLUSTER_NAME: Der Name Ihres Managed Service for Apache Spark-Clusters.REGION: Die Region, in der sich der Cluster befindet.
Mit dem Befehl wird ein PySpark-Job an den Managed Service for Apache Spark-Dienst gesendet. Managed Service for Apache Spark-Worker rufen das Skript aus dem angegebenen Cloud Storage-Pfad ab und führen es im Cluster aus.
Datenlast überprüfen
Rufen Sie in der Google Cloud Console die Seite Jobs von Managed Service for Apache Spark auf, um die Ausführung des Jobs zu beobachten und die Ausgabeprotokolle des Treibers anzusehen.
Rufen Sie nach Abschluss des Jobs die Seite BigQuery auf.
Suchen Sie im Bereich „Explorer“ Ihr Projekt und Dataset und wählen Sie dann die Tabelle
corpus_word_countsaus.Klicken Sie auf den Tab Vorschau , um die geladenen Daten zu prüfen.
Funktionsweise des Spark-BigQuery-Connectors
Mit dem Spark-BigQuery-Connector können Spark-Anwendungen Daten aus BigQuery lesen und in BigQuery schreiben. In Managed Service for Apache Spark-Clustern mit Image-Version 2.1 oder höher ist der Connector vorinstalliert.
Der Connector verwendet eine indirekte Schreibmethode zum Laden von Daten. Bei dieser Methode werden sowohl Managed Service for Apache Spark als auch BigQuery für eine hohe Leistung genutzt.
Der Spark-Job im Managed Service for Apache Spark-Cluster schreibt das endgültige DataFrame an einen temporären Speicherort in einem Cloud Storage-Bucket.
Nachdem der Schreibvorgang in Cloud Storage abgeschlossen ist, löst der Connector einen BigQuery-Ladejob aus.
BigQuery nimmt die Daten aus dem temporären Cloud Storage-Speicherort in die Zieltabelle auf.
Bei diesem indirekten Ansatz wird die Spark-Berechnung von der BigQuery-Aufnahme entkoppelt. So kann jeder Dienst effizient arbeiten und ein hoher Durchsatz bei großen Datenlasten ist gewährleistet.
Nächste Schritte
- Weitere Informationen zum Spark-BigQuery-Connector.
- Informationen zum Abfragen und Visualisieren von Daten in BigQuery.
- Informationen zum Visualisieren von BigQuery-Daten mit Looker.