Consultar tabelas do BigQuery
Este documento explica como usar o Spark SQL e a API DataFrame do Spark em cargas de trabalho do Serviço gerenciado para Apache Spark para consultar tabelas do BigQuery.
Antes de começar
Ative as APIs e, se necessário, conceda papéis do Identity and Access Management.
Ativar APIs
- Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
Conceder papéis de gerenciamento de identidade e acesso
As concessões de função do Serviço Gerenciado para Apache Spark e do BigQuery são necessárias para executar os exemplos nesta página. Dependendo da política da organização, essas funções podem já ter sido concedidas. Para verificar as concessões de papéis, consulte Você precisa conceder papéis?.
Funções do Serviço Gerenciado para Apache Spark
Por padrão, os lotes e as sessões são executados como a conta de serviço padrão do Compute Engine a menos que uma conta de serviço personalizada seja especificada para a carga de trabalho ou sessão.
Papel de usuário da conta de serviço
Para receber as permissões necessárias
para enviar uma carga de trabalho em lote,
peça ao administrador para conceder a você o
papel do IAM de Usuário da conta de serviço (roles/iam.serviceAccountUser)
na conta de serviço padrão do Compute Engine.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias usando papéis personalizados ou outros papéis predefinidos.
Papel de worker do Dataproc
Para garantir que a conta de serviço padrão do Compute Engine tenha as permissões
necessárias para enviar uma carga de trabalho em lote,
peça ao administrador para conceder o
papel do IAM de worker do Dataproc (roles/dataproc.worker)
à conta de serviço padrão do Compute Engine no projeto.
O administrador também pode conceder à conta de serviço padrão do Compute Engine as permissões necessárias por meio de papéis personalizados ou outros papéis predefinidos.
Papéis do BigQuery
A conta de serviço usada para executar uma carga de trabalho em lote ou uma sessão interativa do Serviço Gerenciado para Apache Spark precisa receber os seguintes papéis do IAM nos recursos abaixo:
Leitor de dados do BigQuery (
roles/bigquery.dataViewer) para ler dados de tabelas, da seguinte forma:- Leitura de bigquery.DATASET_ID.SOURCE_TABLE nos exemplos de SELECT e INSERT INTO do Spark SQL.
- Leitura de INFORMATION_SCHEMA no exemplo da API DataFrame.
Usuário do BigQuery (
roles/bigquery.user) para permitir que o Spark execute jobs que interagem com o BigQuery.Editor de dados do BigQuery (
roles/bigquery.dataEditor) para gravar dados ou metadados, da seguinte forma:- No exemplo de INSERT INTO do Spark SQL, para gravar em bigquery.DATASET_ID.DESTINATION_TABLE.
- Para o exemplo da API DataFrame que consulta INFORMATION_SCHEMA, essa função é necessária no DATASET_ID fornecido em
.option('materializationDataset', ...)para permitir que o conector crie tabelas temporárias para os resultados.
Enviar uma carga de trabalho em lote do Spark
Use o console do Google Cloud , a Google Cloud CLI ou a API do Serviço gerenciado para Apache Spark para enviar uma carga de trabalho em lote do Serviço gerenciado para Apache Spark.
Usar o Spark SQL
É possível usar o catálogo do BigQuery para Spark e consultar tabelas padrão do BigQuery diretamente de cargas de trabalho em lote ou sessões interativas. Esse método
permite usar a sintaxe padrão do GoogleSQL para interagir com
dados do BigQuery em jobs spark-sql sem
escrever código PySpark ou criar visualizações temporárias usando a API DataFrame.
Configurar o catálogo do BigQuery
Para ativar o catálogo do BigQuery, forneça as seguintes propriedades do Spark à sua carga de trabalho em lote ou sessão interativa do Spark SQL:
dataproc.sparkBqConnector.version=CONNECTOR_VERSION: especifica a versão do conector do Spark BigQuery.spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog: (Opcional) Registra o catálogobigquerycomo um catálogo do Spark SQL.
Exemplo da Google Cloud CLI:
gcloud dataproc batches submit spark-sql \
--project=PROJECT_ID \
--region=REGION \
--version=RUNTIME_VERSION \
--subnet=SUBNET \
--service-account=SERVICE_ACCOUNT \
--properties="dataproc.sparkBqConnector.version=CONNECTOR_VERSION,spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog" \
gs://BUCKET/my_query.sql
Substitua:
PROJECT_ID: ID do projeto. Os IDs do projeto estão listados na seção Informações do projeto no painel do Google Cloud console.REGION: região em que o lote será executado.RUNTIME_VERSION: opcional. Versão do ambiente de execução do Serviço Gerenciado para Apache Spark. Se não for especificada, a versão padrão atual do ambiente de execução será selecionada.CONNECTOR_VERSION: versão do conector do Spark BigQuery. Para encontrar uma versão do conector compatível com oRUNTIME_VERSION, consulte Versões do ambiente de execução do Serviço Gerenciado para Apache Spark. Se o conector não estiver pré-instalado, você poderá encontrar as versões disponíveis na página de versões do GitHub.SUBNET: opcional. Sub-rede a ser usada para a carga de trabalho em lote. Se não for especificado, a sub-rededefaultserá usada.SERVICE_ACCOUNT: opcional. Conta de serviço em que o job em lote será executado. Se não for especificada, a conta de serviço padrão do Compute Engine será usada.BUCKET: bucket do Cloud Storage que contém o arquivo SQL.
Consultar tabelas do BigQuery
Depois de configurar o catálogo, você pode referenciar tabelas do BigQuery em um script SQL usando o seguinte formato: bigquery.DATASET_ID.TABLE_ID.
Exemplo de consulta SQL:
-- Query data from a BigQuery table.
SELECT
column_a,
SUM(column_b)
FROM
bigquery.DATASET_ID.SOURCE_TABLE
WHERE
partition_date = CURRENT_DATE()
GROUP BY column_a;
-- Insert results into another BigQuery table.
INSERT INTO bigquery.DATASET_ID.DESTINATION_TABLE
SELECT column_a, column_b
FROM bigquery.DATASET_ID.SOURCE_TABLE
WHERE column_c = 'some_value';
Substitua:
DATASET_ID: ID do conjunto de dados do BigQuery.SOURCE_TABLE: ID da tabela a ser consultada.DESTINATION_TABLE: ID da tabela em que os dados serão inseridos.
Usar a API DataFrame
A API DataFrame é necessária para acessar as visualizações INFORMATION_SCHEMA.
Para consultar
INFORMATION_SCHEMA:- Defina
spark.conf.set('viewsEnabled', 'true'). - Forneça
.option('materializationDataset', 'DATASET_ID')para que o conector grave resultados temporários.
- Defina
Exemplo de consulta do PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BigQuery Info Schema Test').getOrCreate()
# Required for INFORMATION_SCHEMA.
spark.conf.set('viewsEnabled', 'true')
# Query INFORMATION_SCHEMA.TABLES.
info_schema_df = spark.read.format('bigquery') \
.option('project', 'PROJECT_ID') \
.option('materializationDataset', 'DATASET_ID') \
.load(f'SELECT table_name, creation_time FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`')
info_schema_df.show(5, truncate=False)
Substitua:
PROJECT_ID: ID do projeto. Os IDs do projeto estão listados na seção Informações do projeto no painel do Google Cloud console.DATASET_ID: ID do conjunto de dados do BigQuery em que o conector SparkvBigQuery pode gravar dados temporários.
Consulte Enviar uma carga de trabalho em lote de contagem de palavras do PySpark para um exemplo do PySpark que lê dados de uma tabela padrão do BigQuery e grava os resultados em uma tabela de saída.
A seguir
- Saiba mais sobre o conector do Spark BigQuery.
- Revise as cotas do Serviço Gerenciado para Apache Spark.