Interroger des tables BigQuery

Ce document explique comment utiliser Spark SQL et l'API Spark DataFrame dans les charges de travail Managed Service pour Apache Spark afin d'interroger des tables BigQuery.

Avant de commencer

Activez les API et, si nécessaire, attribuez des rôles Identity and Access Management.

Activer les API

  1. Connectez-vous à votre Google Cloud compte. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits sans frais pour exécuter, tester et déployer des charges de travail.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Attribuer des rôles Identity and Access Management

L'attribution de rôles Managed Service pour Apache Spark et BigQuery est requise pour exécuter les exemples de cette page. En fonction de la règle d'administration, ces rôles peuvent déjà avoir été attribués. Pour vérifier les attributions de rôles, consultez la section Devez-vous attribuer des rôles ?.

Rôles Managed Service pour Apache Spark

Par défaut, les lots et les sessions s'exécutent en tant que compte de service Compute Engine par défaut , sauf si un compte de service personnalisé est spécifié pour la charge de travail ou la session.

Rôle Utilisateur du compte de service

Pour obtenir les autorisations nécessaires pour envoyer une charge de travail par lot, demandez à votre administrateur de vous accorder le rôle IAM Utilisateur du compte de service (roles/iam.serviceAccountUser) sur le compte de service Compute Engine par défaut. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.

Rôle Nœud de calcul Dataproc

Pour vous assurer que le compte de service Compute Engine par défaut dispose des autorisations nécessaires pour envoyer une charge de travail par lot, demandez à votre administrateur d'accorder le rôle IAM Nœud de calcul Dataproc (roles/dataproc.worker) au compte de service Compute Engine par défaut sur le projet.

Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Votre administrateur peut également attribuer au compte de service Compute Engine par défaut les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Rôles BigQuery

Le compte de service utilisé pour exécuter une charge de travail par lot ou une session interactive Managed Service pour Apache Spark doit disposer des rôles IAM suivants sur les ressources suivantes :

  • Lecteur de données BigQuery (roles/bigquery.dataViewer) pour lire les données des tables, comme suit :

    • Lecture à partir de bigquery.DATASET_ID.SOURCE_TABLE dans les exemples Spark SQL SELECT et INSERT INTO.
    • Lecture à partir d'INFORMATION_SCHEMA dans l'exemple d'API DataFrame.
  • Utilisateur BigQuery (roles/bigquery.user) pour autoriser Spark à exécuter des tâches qui interagissent avec BigQuery.

  • Éditeur de données BigQuery (roles/bigquery.dataEditor) pour écrire des données ou des métadonnées, comme suit :

    • Pour l'exemple Spark SQL INSERT INTO, écriture dans bigquery.DATASET_ID.DESTINATION_TABLE.
    • Pour l'exemple d'API DataFrame interrogeant INFORMATION_SCHEMA, ce rôle est nécessaire sur le DATASET_ID fourni dans .option('materializationDataset', ...) pour permettre au connecteur de créer des tables temporaires pour les résultats.

Envoyer une charge de travail par lot Spark

Vous pouvez utiliser la Google Cloud console, Google Cloud CLI ou l' API Managed Service pour Apache Spark afin d' envoyer une charge de travail par lot Managed Service pour Apache Spark.

Utiliser Spark SQL

Vous pouvez utiliser le catalogue Spark BigQuery pour interroger directement les tables BigQuery standards à partir de charges de travail par lot ou de sessions interactives. Cette méthode vous permet d'utiliser la syntaxe GoogleSQL standard pour interagir avec les données BigQuery dans les tâches spark-sql sans écrire de code PySpark ni créer de vues temporaires à l'aide de l'API DataFrame.

Configurer le catalogue BigQuery

Pour activer le catalogue BigQuery, fournissez les propriétés Spark suivantes à votre charge de travail par lot ou à votre session interactive Spark SQL :

  • dataproc.sparkBqConnector.version=CONNECTOR_VERSION : spécifie la version du connecteur Spark BigQuery.
  • spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog : (facultatif) enregistre le catalogue bigquery en tant que catalogue Spark SQL.

Exemple Google Cloud CLI :

gcloud dataproc batches submit spark-sql \
    --project=PROJECT_ID \
    --region=REGION \
    --version=RUNTIME_VERSION \
    --subnet=SUBNET \
    --service-account=SERVICE_ACCOUNT \
    --properties="dataproc.sparkBqConnector.version=CONNECTOR_VERSION,spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog" \
    gs://BUCKET/my_query.sql

Remplacez les éléments suivants :

Interroger des tables BigQuery

Une fois le catalogue configuré, vous pouvez faire référence aux tables BigQuery dans un script SQL au format suivant : bigquery.DATASET_ID.TABLE_ID.

Exemple de requête SQL :

-- Query data from a BigQuery table.
SELECT
  column_a,
  SUM(column_b)
FROM
  bigquery.DATASET_ID.SOURCE_TABLE
WHERE
  partition_date = CURRENT_DATE()
GROUP BY column_a;

-- Insert results into another BigQuery table.
INSERT INTO bigquery.DATASET_ID.DESTINATION_TABLE
SELECT column_a, column_b
FROM bigquery.DATASET_ID.SOURCE_TABLE
WHERE column_c = 'some_value';

Remplacez les éléments suivants :

  • DATASET_ID : ID d'ensemble de données BigQuery.
  • SOURCE_TABLE : ID de la table à interroger.
  • DESTINATION_TABLE : ID de la table dans laquelle insérer des données.

Utiliser l'API DataFrame

L'API DataFrame est requise pour accéder aux vues INFORMATION_SCHEMA.

  • Pour interroger INFORMATION_SCHEMA :

    • Définissez spark.conf.set('viewsEnabled', 'true').
    • Fournissez .option('materializationDataset', 'DATASET_ID') pour que le connecteur écrive des résultats temporaires.

Exemple de requête PySpark :

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('BigQuery Info Schema Test').getOrCreate()

# Required for INFORMATION_SCHEMA.
spark.conf.set('viewsEnabled', 'true')

# Query INFORMATION_SCHEMA.TABLES.
info_schema_df = spark.read.format('bigquery') \
  .option('project', 'PROJECT_ID') \
  .option('materializationDataset', 'DATASET_ID') \
  .load(f'SELECT table_name, creation_time FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`')
info_schema_df.show(5, truncate=False)

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet. Les ID de projet sont répertoriés dans la section Informations sur le projet du Google Cloud tableau de bord de la console.
  • DATASET_ID : ID d'ensemble de données BigQuery dans lequel le connecteur SparkvBigQuery peut écrire des données temporaires.

Consultez Envoyer une charge de travail par lot de comptage de mots PySpark pour obtenir un exemple PySpark qui lit les données d'une table BigQuery standard, puis écrit les résultats dans une table de sortie.

Étape suivante