Para personalizar a configuração do catálogo de ambientes de execução do Lakehouse, use os seguintes recursos adicionais:
- Procedimentos do Apache Spark Iceberg.
- A opção de filtro para tabelas sem suporte.
- Substituições de conexão do BigQuery.
- Políticas de controle de acesso para tabelas do Iceberg do catálogo de ambientes de execução do Lakehouse.
Usar procedimentos do Apache Iceberg Spark
Para usar procedimentos do Apache Spark, inclua extensões SQL do Apache Iceberg na configuração do Apache Spark. Por exemplo, você pode criar um procedimento para reverter para um estado anterior.
Usar o SQL interativo do Apache Spark para reverter a um estado anterior
É possível usar um procedimento do Apache Spark para criar, modificar e reverter uma tabela ao estado anterior. Exemplo:
Crie uma tabela do Apache Spark:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Substitua:
BIGLAKE_ICEBERG_CATALOG_JAR: o URI do Cloud Storage do plug-in do catálogo personalizado do Apache Iceberg a ser usado. Dependendo do número da versão do Apache Iceberg, selecione uma das seguintes opções:- Iceberg 1.9.1:
gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar - Iceberg 1.6.1:
gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
- Iceberg 1.9.1:
CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Apache Spark.PROJECT_ID: o ID do projeto Google Cloud .WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que seu data warehouse está armazenado.
USE `CATALOG_NAME`; CREATE NAMESPACE NAMESPACE_NAME; USE NAMESPACE NAMESPACE_NAME; CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'; INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row"); DESCRIBE EXTENDED TABLE_NAME;
Substitua:
NAMESPACE_NAME: o nome do namespace que referencia sua tabela do Apache Spark.TABLE_NAME: um nome de tabela que faz referência à sua tabela do Apache Spark.
A saída contém detalhes sobre a configuração da tabela:
... Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd] ...
Altere a tabela novamente e reverta para o snapshot
1659239298328512231criado anteriormente:ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double); INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5); SELECT * FROM TABLE_NAME; CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID); SELECT * FROM TABLE_NAME;
Substitua:
SNAPSHOT_ID: o ID do snapshot para o qual você está revertendo.
O resultado será o seguinte:
1 first row Time taken: 0.997 seconds, Fetched 1 row(s)
Filtrar tabelas não compatíveis das funções de listagem de tabelas
Ao usar o Apache Spark SQL com o catálogo do ambiente de execução do Lakehouse, o comando SHOW TABLES mostra todas as tabelas no namespace especificado, mesmo aquelas que não são compatíveis com o Apache Spark.
Para mostrar apenas as tabelas compatíveis, ative a opção filter_unsupported_tables:
spark-sql --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"
Substitua:
BIGLAKE_ICEBERG_CATALOG_JAR: o URI do Cloud Storage do plug-in do catálogo personalizado do Apache Iceberg a ser usado. Dependendo do número da versão do Apache Iceberg, selecione uma das seguintes opções:- Iceberg 1.9.1:
gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar - Iceberg 1.6.1:
gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
- Iceberg 1.9.1:
CATALOG_NAME: o nome do catálogo do Apache Spark a ser usado.PROJECT_ID: o ID do projeto Google Cloud a ser usado.LOCATION: o local dos recursos do BigQuery.WAREHOUSE_DIRECTORY: a pasta do Cloud Storage a ser usada como data warehouse.
Definir uma substituição de conexão do BigQuery
É possível usar conexões do BigQuery para acessar dados armazenados fora do BigQuery, como no Cloud Storage.
Para definir uma substituição de conexão do BigQuery que forneça acesso a um bucket do Cloud Storage, siga estas etapas:
No projeto do BigQuery, crie uma conexão com o recurso do Cloud Storage. Essa conexão define como o BigQuery acessa seus dados.
Conceda à conta de usuário ou de serviço que acessa os dados a função
roles/bigquery.connectionUserna conexão.Verifique se o recurso de conexão compartilha o mesmo local que os recursos de destino no BigQuery. Para mais informações, consulte Gerenciar conexões.
Especifique a conexão na sua tabela do Apache Iceberg com a propriedade
bq_connection:CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');
Substitua:
TABLE_NAME: um nome para sua tabela do Apache Spark.WAREHOUSE_DIRECTORY: o URI do bucket do Cloud Storage que armazena seus dados.PROJECT_ID: o ID do projeto Google Cloud a ser usado.LOCATION: o local da conexão.CONNECTION_ID: o ID da conexão.
Definir políticas de controle de acesso
É possível ativar o controle de acesso refinado (FGAC) nas tabelas do Apache Iceberg do catálogo do tempo de execução do Lakehouse configurando políticas de controle de acesso. Só é possível definir políticas de controle de acesso em tabelas que usam uma substituição de conexão do BigQuery. É possível definir essas políticas das seguintes maneiras:
Depois de configurar as políticas de FGAC, é possível consultar a tabela do Apache Spark usando o exemplo a seguir:
from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder \ .appName("Lakehouse runtime catalog Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") # Configure spark for storing temp results spark.conf.set("viewsEnabled","true") spark.sql("CREATE NAMESPACE IF NOT EXISTS MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") spark.sql("USE NAMESPACE DATASET_NAME;") sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
Substitua:
CATALOG_NAME: o nome do catálogo.PROJECT_ID: o ID do projeto que contém seus recursos do BigQuery.LOCATION: o local dos recursos do BigQuery.WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage que contém seu data warehouse.MATERIALIZATION_NAMESPACE: o namespace em que você quer armazenar resultados temporários.DATASET_NAME: o nome do conjunto de dados que contém a tabela que você está consultando.ICEBERG_TABLE_NAME: o nome da tabela que você está consultando.