Consultar tabelas do BigQuery
Neste documento, explicamos como usar o Spark SQL e a API DataFrame do Spark em cargas de trabalho do Serviço Gerenciado para Apache Spark para consultar tabelas do BigQuery.
Antes de começar
Ative as APIs e, se necessário, conceda papéis do Identity and Access Management.
Ativar APIs
- Faça login na sua Google Cloud conta do. Se você não conhece o Google Cloud, crie uma conta para avaliar a performance dos nossos produtos em cenários reais. Clientes novos também recebem US $300 em créditos para executar, testar e implantar cargas de trabalho.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc and BigQuery APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
Conceder papéis do Identity and Access Management
As concessões de papéis do Serviço Gerenciado para Apache Spark e do BigQuery são necessárias para executar os exemplos nesta página. Dependendo da política da organização, esses papéis já podem ter sido concedidos. Para verificar as concessões de papéis, consulte Você precisa conceder papéis?.
Papéis do Serviço Gerenciado para Apache Spark
Por padrão, os lotes e as sessões são executados como a conta de serviço padrão do Compute Engine , a menos que uma conta de serviço personalizada seja especificada para a carga de trabalho ou sessão.
Papel de usuário da conta de serviço
Para receber as permissões necessárias para enviar uma carga de trabalho em lote, peça ao administrador para conceder a você o papel do IAM de usuário da conta de serviço (roles/iam.serviceAccountUser) na conta de serviço padrão do Compute Engine.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias usando personalizados papéis ou outros predefinidos papéis.
Papel de worker do Dataproc
Para garantir que a conta de serviço padrão do Compute Engine tenha as permissões necessárias para enviar uma carga de trabalho em lote, peça ao administrador para conceder o papel do IAM de worker do Dataproc (roles/dataproc.worker) à conta de serviço padrão do Compute Engine no projeto.
O administrador também pode conceder à conta de serviço padrão do Compute Engine as permissões necessárias por meio de papéis personalizados ou outros papéis predefinidos.
Papéis do BigQuery
A conta de serviço usada para executar uma carga de trabalho em lote ou uma sessão interativa do Serviço Gerenciado para Apache Spark precisa receber os seguintes papéis do IAM nos seguintes recursos:
Leitor de dados do BigQuery (
roles/bigquery.dataViewer) para ler dados de tabelas, da seguinte maneira:- Leitura de bigquery.DATASET_ID.SOURCE_TABLE nos exemplos de SELECT e INSERT INTO do Spark SQL.
- Leitura de INFORMATION_SCHEMA no exemplo da API DataFrame.
Usuário do BigQuery (
roles/bigquery.user) para permitir que o Spark execute jobs que interagem com o BigQuery.Editor de dados do BigQuery (
roles/bigquery.dataEditor) para gravar dados ou metadados, da seguinte maneira:- Para o exemplo de INSERT INTO do Spark SQL, para gravar em bigquery.DATASET_ID.DESTINATION_TABLE.
- Para o exemplo da API DataFrame que consulta INFORMATION_SCHEMA, esse papel é
necessário no DATASET_ID fornecido em
.option('materializationDataset', ...)para permitir que o conector crie tabelas temporárias para os resultados.
Enviar uma carga de trabalho em lote do Spark
É possível usar o Google Cloud console, a Google Cloud CLI ou a API do Serviço Gerenciado para Apache Spark para enviar uma carga de trabalho em lote do Serviço Gerenciado para Apache Spark.
Usar o Spark SQL
É possível usar o catálogo do Spark BigQuery para consultar tabelas padrão do BigQuery diretamente de cargas de trabalho em lote ou sessões interativas. Esse método permite usar a sintaxe padrão do GoogleSQL para interagir com os dados do BigQuery em jobs spark-sql sem escrever código PySpark ou criar visualizações temporárias usando a API DataFrame.
Configurar o catálogo do BigQuery
Para ativar o catálogo do BigQuery, forneça as seguintes propriedades do Spark para sua carga de trabalho em lote do Spark SQL ou sessão interativa:
dataproc.sparkBqConnector.version=CONNECTOR_VERSION: especifica a versão do conector do Spark BigQuery.spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog: (opcional) registra o catálogobigquerycomo um catálogo do Spark SQL.
Exemplo da Google Cloud CLI:
gcloud dataproc batches submit spark-sql \
--project=PROJECT_ID \
--region=REGION \
--version=RUNTIME_VERSION \
--subnet=SUBNET \
--service-account=SERVICE_ACCOUNT \
--properties="dataproc.sparkBqConnector.version=CONNECTOR_VERSION,spark.sql.catalog.bigquery=com.google.cloud.spark.bigquery.BigQueryCatalog" \
gs://BUCKET/my_query.sql
Substitua:
PROJECT_ID: ID do projeto. Os IDs do projeto estão listados em a seção Informações do projeto no Google Cloud painel doconsole .REGION: região em que o lote será executadoRUNTIME_VERSION: opcional. Versão do ambiente de execução do Serviço Gerenciado para Apache Spark. Se não for especificada, a versão padrão atual do ambiente de execução será selecionada.CONNECTOR_VERSION: versão do conector do Spark BigQuery. Para encontrar uma versão do conector compatível com aRUNTIME_VERSION, consulte Versões do ambiente de execução do Serviço Gerenciado para Apache Spark. Se o conector não estiver pré-instalado, você poderá encontrar as versões disponíveis na página de versões do GitHub.SUBNET: opcional. Sub-rede a ser usada para a carga de trabalho em lote. Se não for especificada, a sub-rededefaultserá usada.SERVICE_ACCOUNT: opcional. Conta de serviço em que o job em lote será executado. Se não for especificada, a conta de serviço padrão do Compute Engine será usada.BUCKET: bucket do Cloud Storage que contém o arquivo SQL.
Consultar tabelas do BigQuery
Depois de configurar o catálogo, é possível referenciar
tabelas do BigQuery em um script SQL usando o seguinte
formato: bigquery.DATASET_ID.TABLE_ID.
Exemplo de consulta SQL:
-- Query data from a BigQuery table.
SELECT
column_a,
SUM(column_b)
FROM
bigquery.DATASET_ID.SOURCE_TABLE
WHERE
partition_date = CURRENT_DATE()
GROUP BY column_a;
-- Insert results into another BigQuery table.
INSERT INTO bigquery.DATASET_ID.DESTINATION_TABLE
SELECT column_a, column_b
FROM bigquery.DATASET_ID.SOURCE_TABLE
WHERE column_c = 'some_value';
Substitua:
DATASET_ID: ID do conjunto de dados do BigQuery.SOURCE_TABLE: ID da tabela a ser consultada.DESTINATION_TABLE: ID da tabela em que os dados serão inseridos.
Usar a API DataFrame
A API DataFrame é necessária para acessar visualizações INFORMATION_SCHEMA.
Para consultar
INFORMATION_SCHEMA:- Defina
spark.conf.set('viewsEnabled', 'true'). - Forneça
.option('materializationDataset', 'DATASET_ID')para que o conector grave resultados temporários.
- Defina
Exemplo de consulta do PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BigQuery Info Schema Test').getOrCreate()
# Required for INFORMATION_SCHEMA.
spark.conf.set('viewsEnabled', 'true')
# Query INFORMATION_SCHEMA.TABLES.
info_schema_df = spark.read.format('bigquery') \
.option('project', 'PROJECT_ID') \
.option('materializationDataset', 'DATASET_ID') \
.load(f'SELECT table_name, creation_time FROM `PROJECT_ID.DATASET_ID.INFORMATION_SCHEMA.TABLES`')
info_schema_df.show(5, truncate=False)
Substitua:
PROJECT_ID: ID do projeto. Os IDs do projeto estão listados em a seção Informações do projeto no Google Cloud painel doconsole .DATASET_ID: ID do conjunto de dados do BigQuery em que o conector SparkvBigQuery pode gravar dados temporários.
Consulte Enviar uma carga de trabalho em lote de contagem de palavras do PySpark para um exemplo do PySpark que lê dados de uma tabela padrão do BigQuery e grava os resultados em uma tabela de saída.
A seguir
- Saiba mais sobre o conector do Spark BigQuery.
- Analise as cotas do Serviço Gerenciado para Apache Spark.