Este documento mostra um exemplo de como usar o catálogo de tempo de execução do Lakehouse com tabelas do BigQuery e o Serviço Gerenciado para Apache Spark.
Com o catálogo de tempo de execução do Lakehouse, é possível criar e usar tabelas padrão (integradas), tabelas do Apache Iceberg gerenciadas pelo BigQuery e tabelas externas do Apache Iceberg no BigQuery.
Formatos de tabela compatíveis
Somente tabelas do Apache Iceberg V2 são aceitas. As tabelas do Iceberg V1 não são compatíveis. Se você tiver tabelas do Iceberg V1, faça upgrade para a V2 (por exemplo, executando ALTER TABLE catalog.schema.table SET TBLPROPERTIES ('format-version'='2'); ou usando operações de mecanismo semelhantes) antes de usá-las com o catálogo de tempo de execução do Lakehouse.
Antes de começar
- Ative o faturamento para o projeto do Google Cloud . Saiba como verificar se o faturamento está ativado em um projeto.
Ative as APIs BigQuery e Dataproc.
Funções exigidas
Para receber as permissões necessárias para usar o Serviço gerenciado para Apache Spark com o catálogo de tempo de execução do Lakehouse como um repositório de metadados, peça ao administrador para conceder a você os seguintes papéis do IAM:
-
Crie tabelas de catálogo de tempo de execução do Lakehouse no Apache Spark:
- Worker do Dataproc (
roles/dataproc.worker) na conta de serviço do Serviço Gerenciado para Apache Spark no projeto - Editor de dados do BigQuery (
roles/bigquery.dataEditor) na conta de serviço do Managed Service for Apache Spark no projeto - Usuário de objetos do Storage (
roles/storage.objectUser) na conta de serviço do Serviço Gerenciado para Apache Spark no projeto
- Worker do Dataproc (
-
Consulte as tabelas do catálogo de ambientes de execução do Lakehouse no BigQuery:
- Leitor de dados do BigQuery (
roles/bigquery.dataViewer) no projeto - Usuário do BigQuery (
roles/bigquery.user) no projeto - Leitor de objetos do Storage (
roles/storage.objectViewer) no projeto
- Leitor de dados do BigQuery (
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias usando papéis personalizados ou outros papéis predefinidos.
Conectar-se a uma tabela
Crie um conjunto de dados no console do Google Cloud .
CREATE SCHEMA `
PROJECT_ID`.DATASET_NAME;Substitua:
PROJECT_ID: o ID do projeto Google Cloud para criar o conjunto de dados.DATASET_NAME: um nome para o conjunto de dados.
Crie uma conexão de recursos do Cloud.
Crie uma tabela padrão do BigQuery.
CREATE TABLE `
PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);Substitua:
TABLE_NAME: um nome para a tabela.
Insira dados na tabela padrão do BigQuery.
INSERT INTO `
PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);Crie uma tabela do Apache Iceberg gerenciada pelo BigQuery.
Por exemplo, para criar uma tabela, execute a seguinte instrução
CREATE.CREATE TABLE `
PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME( name STRING,id INT64 ) WITH CONNECTION `CONNECTION_NAME` OPTIONS ( file_format = 'PARQUET', table_format = 'ICEBERG', storage_uri = 'STORAGE_URI');Substitua:
ICEBERG_TABLE_NAME: um nome para sua tabela gerenciada do Apache Iceberg. Por exemplo,iceberg_managed_table.CONNECTION_NAME: o nome da conexão. Você criou isso na etapa anterior. Por exemplo,myproject.us.myconnection.STORAGE_URI: um URI do Cloud Storage totalmente qualificado. Por exemplo,gs://mybucket/table.
Insira dados na tabela gerenciada do Apache Iceberg do BigQuery.
INSERT INTO `
PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);Crie uma tabela externa do Apache Iceberg.
Por exemplo, para criar uma tabela externa do Apache Iceberg, execute a seguinte instrução
CREATE.CREATE OR REPLACE EXTERNAL TABLE `
PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME WITH CONNECTION `CONNECTION_NAME` OPTIONS ( format = 'ICEBERG', uris = ['BUCKET_PATH'], require_partition_filter = FALSE);Substitua:
READONLY_ICEBERG_TABLE_NAME: um nome para sua tabela somente leitura.BUCKET_PATH: o caminho para o bucket do Cloud Storage que contém os dados da tabela externa, no formato['gs://bucket_name/[folder_name/]file_name'].
No Apache Spark, consulte a tabela padrão, a tabela do Apache Iceberg gerenciada pelo BigQuery e a tabela externa do Apache Iceberg.
from pyspark.sql import SparkSession # Create a spark session spark = SparkSession.builder \ .appName("Lakehouse runtime catalog Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.conf.set("viewsEnabled","true") # Use the Lakehouse runtime catalog spark.sql("USE `CATALOG_NAME`;") spark.sql("USE NAMESPACE DATASET_NAME;") # Configure spark for temp results spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") # List the tables in the dataset df = spark.sql("SHOW TABLES;") df.show(); # Query the tables sql = """SELECT * FROM DATASET_NAME.TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
Substitua:
WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage conectada à tabela gerenciada do Apache Iceberg do BigQuery e à tabela externa do Apache Iceberg.CATALOG_NAME: o nome do catálogo que você está usando.MATERIALIZATION_NAMESPACE: o namespace para armazenar resultados temporários.
Execute o script do Apache Spark usando o Serviço Gerenciado para Apache Spark.
gcloud dataproc batches submit pyspark SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=YOUR_BUCKET \
Substitua:
SCRIPT_PATH: o caminho para o script usado pelo job em lote.PROJECT_ID: o ID do Google Cloud projeto em que o job em lote será executado.REGION: a região em que sua carga de trabalho é executada.YOUR_BUCKET: o local do bucket do Cloud Storage para fazer upload das dependências da carga de trabalho. O prefixo de URIgs://do bucket não é necessário. É possível especificar o caminho ou o nome do bucket, por exemplo,mybucketname1.