Eseguire query sulle tabelle BigQuery

Questo documento spiega come utilizzare Spark SQL e l'API DataFrame di Spark nei carichi di lavoro Managed Service for Apache Spark per eseguire query sulle tabelle BigQuery.

Prima di iniziare

Abilita le API e, se necessario, concedi i ruoli Identity and Access Management.

Abilita API

  1. Accedi al tuo Google Cloud account. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Concedi i ruoli Identity and Access Management

Per eseguire gli esempi in questa pagina, sono necessarie le concessioni dei ruoli Managed Service for Apache Spark e BigQuery. A seconda delle norme dell'organizzazione, questi ruoli potrebbero essere già stati concessi. Per verificare le concessioni dei ruoli, consulta Devi concedere i ruoli?.

Ruoli Managed Service for Apache Spark

Per impostazione predefinita, i batch e le sessioni vengono eseguiti come il service account predefinito di Compute Engine , a meno che non venga specificato un account di servizio personalizzato per il carico di lavoro o la sessione.

Ruolo Utente service account

Per ottenere le autorizzazioni necessarie per inviare un carico di lavoro batch, chiedi all'amministratore di concederti il ruolo IAM Utente service account (roles/iam.serviceAccountUser) sul account di servizio predefinito di Compute Engine. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Ruolo Worker Dataproc

Per assicurarti che il account di servizio predefinito di Compute Engine disponga delle autorizzazioni necessarie per inviare un carico di lavoro batch, chiedi all'amministratore di concedere il ruolo IAM Worker Dataproc (roles/dataproc.worker) al account di servizio predefinito di Compute Engine sul progetto.

Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

L'amministratore potrebbe anche essere in grado di concedere al account di servizio predefinito di Compute Engine le autorizzazioni richieste tramite ruoli personalizzati o altri ruoli predefiniti.

Ruoli BigQuery

Al account di servizio utilizzato per eseguire un carico di lavoro batch o una sessione interattiva di Managed Service for Apache Spark devono essere concessi i seguenti ruoli IAM sulle seguenti risorse:

  • Visualizzatore dati BigQuery (roles/bigquery.dataViewer) per leggere i dati dalle tabelle, come segue:

    • Lettura da bigquery.DATASET_ID.SOURCE_TABLE negli esempi di Spark SQL SELECT e INSERT INTO.
    • Lettura da INFORMATION_SCHEMA nell'esempio dell'API DataFrame.
  • Utente BigQuery (roles/bigquery.user) per consentire a Spark di eseguire job che interagiscono con BigQuery.

  • Editor dati BigQuery (roles/bigquery.dataEditor) per scrivere dati o metadati, come segue:

    • Per l'esempio di Spark SQL INSERT INTO, per scrivere in bigquery.DATASET_ID.DESTINATION_TABLE.
    • Per l'esempio dell'API DataFrame che esegue query su INFORMATION_SCHEMA, questo ruolo è necessario per DATASET_ID fornito in .option('materializationDataset', ...) per consentire al connettore di creare tabelle temporanee per i risultati.

Invia un carico di lavoro batch Spark

Puoi utilizzare la Google Cloud console, Google Cloud CLI o l' API Managed Service for Apache Spark per inviare un carico di lavoro batch Managed Service for Apache Spark.

Utilizza Spark SQL

Puoi utilizzare il catalogo Spark BigQuery per eseguire query sulle tabelle BigQuery standard direttamente da carichi di lavoro batch o sessioni interattive. Questo metodo ti consente di utilizzare la sintassi GoogleSQL standard per interagire con i dati BigQuery all'interno dei job spark-sql senza scrivere codice PySpark o creare visualizzazioni temporanee utilizzando l'API DataFrame.

Configura il catalogo BigQuery

Per abilitare il catalogo BigQuery, fornisci le seguenti proprietà Spark al carico di lavoro batch Spark SQL o alla sessione interattiva:

  • dataproc.sparkBqConnector.version=CONNECTOR_VERSION: specifica la versione del connettore Spark BigQuery.
  • spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog: (facoltativo) registra il catalogo bigquery come catalogo Spark SQL.

Esempio di Google Cloud CLI:

gcloud dataproc batches submit spark-sql \
    --project=PROJECT_ID \
    --region=REGION \
    --version=RUNTIME_VERSION \
    --subnet=SUBNET \
    --service-account=SERVICE_ACCOUNT \
    --properties="dataproc.sparkBqConnector.version=CONNECTOR_VERSION,spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog" \
    gs://BUCKET/my_query.sql

Sostituisci quanto segue:

Esegui query sulle tabelle BigQuery

Dopo aver configurato il catalogo, puoi fare riferimento alle tabelle BigQuery in uno script SQL utilizzando il seguente formato: bigquery.DATASET_ID.TABLE_ID.

Esempio di query SQL:

-- Query data from a BigQuery table.
SELECT
  column_a,
  SUM(column_b)
FROM
  bigquery.DATASET_ID.SOURCE_TABLE
WHERE
  partition_date = CURRENT_DATE()
GROUP BY column_a;

-- Insert results into another BigQuery table.
INSERT INTO bigquery.DATASET_ID.DESTINATION_TABLE
SELECT column_a, column_b
FROM bigquery.DATASET_ID.SOURCE_TABLE
WHERE column_c = 'some_value';

Sostituisci quanto segue:

  • DATASET_ID: ID set di dati BigQuery.
  • SOURCE_TABLE: ID della tabella su cui eseguire la query.
  • DESTINATION_TABLE: ID della tabella in cui inserire i dati.

Utilizza l'API DataFrame

L'API DataFrame è necessaria per accedere alle visualizzazioni INFORMATION_SCHEMA.

  • Per eseguire query su INFORMATION_SCHEMA:

    • Imposta spark.conf.set('viewsEnabled', 'true').
    • Fornisci .option('materializationDataset', 'DATASET_ID') al connettore per scrivere i risultati temporanei.

Esempio di query PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('BigQuery Info Schema Test').getOrCreate()

# Required for INFORMATION_SCHEMA.
spark.conf.set('viewsEnabled', 'true')

# Query INFORMATION_SCHEMA.TABLES.
info_schema_df = spark.read.format('bigquery') \
  .option('project', 'PROJECT_ID') \
  .option('materializationDataset', 'DATASET_ID') \
  .load(f'SELECT table_name, creation_time FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`')
info_schema_df.show(5, truncate=False)

Sostituisci quanto segue:

  • PROJECT_ID: ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto nella Google Cloud dashboard della console.
  • DATASET_ID: ID set di dati BigQuery in cui il connettore SparkvBigQuery può scrivere dati temporanei.

Per un esempio di PySpark che legge i dati da una tabella BigQuery standard e poi scrive i risultati in una tabella di output, consulta Invia un carico di lavoro batch di conteggio parole PySpark.

Passaggi successivi