Utiliser le connecteur Spark BigQuery

Vous pouvez utiliser le connecteur spark-bigquery avec Managed Service pour Apache Spark pour lire et écrire des données depuis et vers BigQuery. Ce tutoriel présente une application PySpark qui utilise le spark-bigquery-connector.

Confirmer la version du connecteur

Consultez les versions d'exécution de Managed Service pour Apache Spark pour déterminer la version du connecteur BigQuery installée dans votre charge de travail par lot ou dans la version d'exécution de votre session interactive. Si le connecteur n'est pas listé, consultez Rendre le connecteur disponible pour les applications.

Rendre le connecteur disponible pour les applications (si nécessaire)

Le connecteur BigQuery est installé dans toutes les versions d'exécution compatibles de Managed Service pour Apache Spark. Si vous utilisez une version d'exécution non compatible qui n'installe pas le connecteur (Spark runtime 1.0), vous pouvez rendre le connecteur disponible pour une application de l'une des deux manières suivantes :

  • Utilisez le jars paramètre pour pointer vers un fichier JAR de connecteur lorsque vous envoyez une charge de travail par lot Managed Service pour Apache Spark ou exécutez une session interactive. L'exemple de charge de travail par lot suivant spécifie un fichier JAR de connecteur (consultez le dépôt GoogleCloudDataproc/spark-bigquery-connector sur GitHub pour obtenir la liste des fichiers JAR de connecteur disponibles).
    • Exemple de Google Cloud CLI :
      gcloud dataproc batches submit pyspark \
          --region=REGION \
          --jars=spark-3.5-bigquery-version.jar \
          ... other args
      

Calculer les coûts

Ce tutoriel fait appel à des composants payants de Google Cloud, y compris :

  • Managed Service pour Apache Spark
  • BigQuery
  • Cloud Storage

Utilisez le Simulateur de coût pour générer une estimation des coûts en fonction de votre utilisation prévue.

Les nouveaux utilisateurs de Cloud Platform peuvent bénéficier d'un essai sans frais.

Configurer la facturation

Par défaut, le projet qui est facturé pour l'utilisation de l'API est le projet associé aux identifiants ou au compte de service. Pour facturer un autre projet, définissez la propriété de configuration suivante : spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Vous pouvez également ajouter cette propriété à une opération de lecture ou d'écriture, comme suit : .option("parentProject", "<BILLED-GCP-PROJECT>").

Envoyer une charge de travail par lot PySpark wordcount

Cet exemple lit les données de BigQuery dans un DataFrame Spark pour effectuer un décompte de mots à l'aide de l'API de source de données standard.

Le connecteur écrit le résultat du décompte de mots dans BigQuery selon la séquence d'opérations suivante :

  1. Mise en mémoire tampon des données dans des fichiers temporaires de votre bucket Cloud Storage

  2. Copie des données en une seule opération de votre bucket Cloud Storage vers BigQuery

  3. Suppression des fichiers temporaires dans Cloud Storage une fois l'opération de chargement BigQuery terminée (les fichiers temporaires sont également supprimés une fois l'application Spark terminée). Si la suppression échoue, vous devrez supprimer tous les fichiers Cloud Storage temporaires indésirables, qui sont généralement placés dans gs://BUCKET_NAME/.spark-bigquery-JOB_ID-UUID.

Étapes à suivre pour exécuter la charge de travail wordcount

  1. Ouvrez un terminal local ou Cloud Shell.
  2. Créez le wordcount_dataset avec l'outil de ligne de commande bq dans un terminal local ou dans Cloud Shell.
    bq mk wordcount_dataset
    
  3. Créez un bucket Cloud Storage avec le Google Cloud CLI.
    gcloud storage buckets create gs://BUCKET_NAME
    
    Remplacez BUCKET_NAME par le nom du bucket Cloud Storage que vous avez créé.
  4. Créez le fichier wordcount.py en local dans un éditeur de texte et copiez-collez le code PySpark suivant.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Cloud Storage bucket used by the connector for temporary BigQuery
    # export data.
    bucket = "BUCKET_NAME"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .load('bigquery-public-data.samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .save('wordcount_dataset.wordcount_output')
  5. Envoyez la charge de travail par lot PySpark :
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=BUCKET_NAME
    
    Exemple de résultat dans le terminal :
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Pour prévisualiser la table de sortie dans la Google Cloud console, ouvrez la BigQuery page, sélectionnez la wordcount_output table, puis cliquez Aperçu.
    Affichage de l&#39;aperçu de la table BigQuery
    Figure 1: Prévisualiser la table de sortie dans BigQuery

Pour en savoir plus