Consulta tablas de BigQuery

En este documento, se explica cómo puedes usar Spark SQL y la API de DataFrame de Spark en cargas de trabajo de Managed Service para Apache Spark para consultar tablas de BigQuery.

Antes de comenzar

Habilita las APIs y, si es necesario, otorga roles de Identity and Access Management.

Habilita las APIs

  1. Accede a tu cuenta de Google Cloud . Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and BigQuery APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Otorga funciones de Identity and Access Management

Para ejecutar los ejemplos de esta página, se requieren los roles de Managed Service para Apache Spark y BigQuery. Según la política de la organización, es posible que estos roles ya se hayan otorgado. Para verificar las asignaciones de roles, consulta ¿Necesitas otorgar roles?.

Roles de Managed Service para Apache Spark

De forma predeterminada, los lotes y las sesiones se ejecutan como la cuenta de servicio predeterminada de Compute Engine, a menos que se especifique una cuenta de servicio personalizada para la carga de trabajo o la sesión.

Función de usuario de cuenta de servicio

Para obtener los permisos que necesitas para enviar una carga de trabajo por lotes, pídele a tu administrador que te otorgue el rol de IAM de Usuario de cuenta de servicio (roles/iam.serviceAccountUser) en la cuenta de servicio predeterminada de Compute Engine. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.

Rol de trabajador de Dataproc

Para asegurarte de que la cuenta de servicio predeterminada de Compute Engine tenga los permisos necesarios para enviar una carga de trabajo por lotes, pídele a tu administrador que otorgue el rol de IAM de trabajador de Dataproc (roles/dataproc.worker) a la cuenta de servicio predeterminada de Compute Engine en el proyecto.

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Es posible que tu administrador también pueda otorgarle los permisos necesarios a la cuenta de servicio predeterminada de Compute Engine a través de roles personalizados o de otros roles predefinidos.

Funciones de BigQuery

La cuenta de servicio que se usa para ejecutar una carga de trabajo por lotes o una sesión interactiva de Managed Service para Apache Spark debe tener los siguientes roles de IAM en los siguientes recursos:

  • Visualizador de datos de BigQuery (roles/bigquery.dataViewer) para leer datos de las tablas, de la siguiente manera:

    • Lectura desde bigquery.DATASET_ID.SOURCE_TABLE en los ejemplos de SELECT y INSERT INTO de Spark SQL
    • Lectura de INFORMATION_SCHEMA en el ejemplo de la API de DataFrame
  • Usuario de BigQuery (roles/bigquery.user) para permitir que Spark ejecute trabajos que interactúen con BigQuery

  • Editor de datos de BigQuery (roles/bigquery.dataEditor) para escribir datos o metadatos, de la siguiente manera:

    • En el ejemplo de Spark SQL INSERT INTO, para escribir en bigquery.DATASET_ID.DESTINATION_TABLE.
    • En el ejemplo de la API de DataFrame que consulta INFORMATION_SCHEMA, este rol es necesario en el DATASET_ID proporcionado en .option('materializationDataset', ...) para permitir que el conector cree tablas temporales para los resultados.

Envía una carga de trabajo por lotes de Spark

Puedes usar la consola de Google Cloud , Google Cloud CLI o la API de Managed Service para Apache Spark para enviar una carga de trabajo por lotes de Managed Service para Apache Spark.

Usa Spark SQL

Puedes usar el catálogo de Spark BigQuery para consultar tablas estándar de BigQuery directamente desde cargas de trabajo por lotes o sesiones interactivas. Este método te permite usar la sintaxis estándar de GoogleSQL para interactuar con los datos de BigQuery en trabajos de spark-sql sin escribir código de PySpark ni crear vistas temporales con la API de DataFrame.

Configura el catálogo de BigQuery

Para habilitar el catálogo de BigQuery, proporciona las siguientes propiedades de Spark a tu carga de trabajo por lotes de Spark SQL o a tu sesión interactiva:

  • dataproc.sparkBqConnector.version=CONNECTOR_VERSION: Especifica la versión del conector de BigQuery para Spark.
  • spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog: (Opcional) Registra el catálogo bigquery como un catálogo de Spark SQL.

Ejemplo de Google Cloud CLI:

gcloud dataproc batches submit spark-sql \
    --project=PROJECT_ID \
    --region=REGION \
    --version=RUNTIME_VERSION \
    --subnet=SUBNET \
    --service-account=SERVICE_ACCOUNT \
    --properties="dataproc.sparkBqConnector.version=CONNECTOR_VERSION,spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog" \
    gs://BUCKET/my_query.sql

Reemplaza lo siguiente:

Consulta tablas de BigQuery

Después de configurar el catálogo, puedes hacer referencia a las tablas de BigQuery en una secuencia de comandos de SQL con el siguiente formato: bigquery.DATASET_ID.TABLE_ID.

Ejemplo de consulta en SQL:

-- Query data from a BigQuery table.
SELECT
  column_a,
  SUM(column_b)
FROM
  bigquery.DATASET_ID.SOURCE_TABLE
WHERE
  partition_date = CURRENT_DATE()
GROUP BY column_a;

-- Insert results into another BigQuery table.
INSERT INTO bigquery.DATASET_ID.DESTINATION_TABLE
SELECT column_a, column_b
FROM bigquery.DATASET_ID.SOURCE_TABLE
WHERE column_c = 'some_value';

Reemplaza lo siguiente:

  • DATASET_ID: ID del conjunto de datos de BigQuery.
  • SOURCE_TABLE: ID de la tabla que se consultará.
  • DESTINATION_TABLE: ID de la tabla en la que se insertarán los datos.

Usa la API de DataFrame

Se requiere la API de DataFrame para acceder a las vistas de INFORMATION_SCHEMA.

  • Para consultar INFORMATION_SCHEMA, haz lo siguiente:

    • Establece spark.conf.set('viewsEnabled', 'true').
    • Proporciona .option('materializationDataset', 'DATASET_ID') para que el conector escriba resultados temporales.

Ejemplo de consulta de PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('BigQuery Info Schema Test').getOrCreate()

# Required for INFORMATION_SCHEMA.
spark.conf.set('viewsEnabled', 'true')

# Query INFORMATION_SCHEMA.TABLES.
info_schema_df = spark.read.format('bigquery') \
  .option('project', 'PROJECT_ID') \
  .option('materializationDataset', 'DATASET_ID') \
  .load(f'SELECT table_name, creation_time FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`')
info_schema_df.show(5, truncate=False)

Reemplaza lo siguiente:

  • PROJECT_ID: ID del proyecto Los IDs de los proyectos se enumeran en la sección Información del proyecto del panel de la consola de Google Cloud .
  • DATASET_ID: ID del conjunto de datos de BigQuery en el que el conector de Spark a BigQuery puede escribir datos temporales.

Consulta Envía una carga de trabajo por lotes de recuento de palabras de PySpark para ver un ejemplo de PySpark que lee datos de una tabla estándar de BigQuery y, luego, escribe los resultados en una tabla de salida.

¿Qué sigue?