Charger des données dans BigQuery
Ce document explique comment utiliser le service géré pour Apache Spark afin d'exécuter un job Spark qui charge des données traitées depuis Cloud Storage dans une table BigQuery. Managed Service pour Apache Spark simplifie ce processus en gérant l'environnement Spark et les connecteurs nécessaires.
Avant de commencer
- Connectez-vous à votre compte Google Cloud . Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $de crédits sans frais pour exécuter, tester et déployer des charges de travail.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Créez un bucket Cloud Storage.
- Créez un cluster Managed Service pour Apache Spark qui utilise la version d'image
2.1ou ultérieure. - Créez un ensemble de données BigQuery.
Préparer le script PySpark
Créez un fichier Python local nommé
load_analytics_data.py.Ajoutez le code suivant au fichier. Ce script lit les données d'un chemin Cloud Storage, effectue une agrégation et écrit le résultat dans BigQuery.
from pyspark.sql import SparkSession from pyspark.sql.functions import sum as _sum import sys # --- Configuration --- gcs_bucket = "BUCKET_NAME" bq_project = "PROJECT_ID" bq_dataset = "DATASET" bq_table = "corpus_word_counts" # --- Paths --- processed_path = f"gs://{gcs_bucket}/processed/processed_data" temp_gcs_path = f"{gcs_bucket}" # --- Spark Session Initialization --- spark = SparkSession.builder \ .appName("Dataproc-BigQuery-Load") \ .config("spark.hadoop.google.cloud.bigdata.connector.temporary.gcs.bucket", temp_gcs_path) \ .getOrCreate() # --- Read Processed Data from Cloud Storage --- processed_df = spark.read.parquet(processed_path) # --- Final Aggregation for Analytics-Ready Stage --- analytics_df = processed_df.groupBy("corpus") \ .agg(_sum("word_count_int").alias("total_word_count")) \ .orderBy("corpus") print("Aggregated Analytics-Ready data:") analytics_df.show() # --- Write DataFrame to BigQuery --- print(f"Writing data to BigQuery table: {bq_dataset}.{bq_table}") analytics_df.write \ .format("bigquery") \ .option("table", f"{bq_project}.{bq_dataset}.{bq_table}") \ .mode("append") \ .save() print("Successfully wrote data to BigQuery.") # --- Stop Spark Session --- spark.stop()Remplacez les espaces réservés suivants :
BUCKET_NAME: nom du bucket Cloud Storage.PROJECT_ID: ID de votre projet Google Cloud.
Importez le script
load_analytics_data.pydans votre bucket Cloud Storage.
Envoyer le job Managed Service pour Apache Spark
Envoyez le script PySpark en tant que tâche à votre cluster Managed Service pour Apache Spark.
Dans un terminal, exécutez la commande
gcloud dataproc jobs submit pyspark:gcloud dataproc jobs submit pyspark gs://YOUR_BUCKET_NAME/scripts/load_analytics_data.py \ --cluster=CLUSTER_NAME \ --region=REGIONRemplacez les espaces réservés suivants :
BUCKET_NAME: nom du bucket Cloud Storage.CLUSTER_NAME: nom de votre cluster Managed Service pour Apache Spark.REGION: région dans laquelle se trouve votre cluster.
La commande envoie un job PySpark au service Managed Service pour Apache Spark. Les nœuds de calcul Managed Service pour Apache Spark récupèrent le script à partir du chemin Cloud Storage spécifié et l'exécutent sur le cluster.
Vérifier le chargement des données
Dans la console Google Cloud , accédez à la page Jobs (Tâches) du service géré pour Apache Spark afin de surveiller l'exécution de la tâche et d'afficher les journaux de sortie du pilote.
Une fois le job terminé, accédez à la page BigQuery.
Dans le panneau "Explorateur", recherchez votre projet et votre ensemble de données, puis sélectionnez la table
corpus_word_counts.Cliquez sur l'onglet Aperçu pour inspecter les données chargées.
Fonctionnement du connecteur Spark-BigQuery
Le connecteur Spark-BigQuery permet aux applications Spark de lire et d'écrire des données dans BigQuery. Sur les clusters Managed Service pour Apache Spark avec des versions d'image 2.1 ou ultérieures, le connecteur est préinstallé.
Le connecteur utilise une méthode d'écriture indirecte pour charger les données. Cette méthode exploite à la fois Managed Service pour Apache Spark et BigQuery pour des performances élevées.
Le job Spark sur le cluster Managed Service pour Apache Spark écrit le DataFrame final dans un emplacement temporaire d'un bucket Cloud Storage.
Une fois l'écriture dans Cloud Storage terminée, le connecteur déclenche une tâche de chargement BigQuery.
BigQuery ingère les données de l'emplacement Cloud Storage temporaire dans la table cible.
Cette approche indirecte découple le calcul Spark de l'ingestion BigQuery. Ce découplage permet à chaque service de fonctionner efficacement et assure un débit élevé pour les grandes charges de données.
Étapes suivantes
- En savoir plus sur le connecteur Spark-BigQuery
- Découvrez comment interroger et visualiser des données dans BigQuery.
- Découvrez comment visualiser les données BigQuery à l'aide de Looker.