Neste documento, explicamos como configurar o catálogo personalizado do Apache Iceberg para BigQuery no catálogo de ambiente de execução do Lakehouse.
É possível configurar isso usando um cluster do Serviço Gerenciado para Apache Spark ou o Serviço Gerenciado para Apache Spark. Isso cria um catálogo único e compartilhado no Google Cloud Lakehouse que funciona perfeitamente com mecanismos de código aberto, como Apache Spark e Apache Flink.
Antes de começar
- Ative o faturamento no seu Google Cloud projeto. Saiba como verificar se o faturamento está ativado em um projeto.
Ative as APIs BigQuery e Serviço Gerenciado para Apache Spark.
Funções exigidas
Para receber as permissões necessárias para configurar o catálogo de ambiente de execução do Lakehouse, peça ao administrador para conceder a você os seguintes papéis do IAM:
-
Criar um cluster do Serviço Gerenciado para Apache Spark:
Worker do Dataproc (
roles/dataproc.worker) na conta de serviço padrão do Compute Engine no projeto -
Criar tabelas de catálogo de ambiente de execução do Lakehouse:
-
Worker do Dataproc (
roles/dataproc.worker) na conta de serviço da VM do Serviço Gerenciado para Apache Spark no projeto -
Editor de dados do BigQuery (
roles/bigquery.dataEditor) na conta de serviço da VM do Serviço Gerenciado para Apache Spark no projeto -
Usuário de objetos do Storage (
roles/storage.objectUser) na conta de serviço da VM do Serviço Gerenciado para Apache Spark no projeto
-
Worker do Dataproc (
-
Consultar tabelas de catálogo de ambiente de execução do Lakehouse:
-
Visualizador de dados do BigQuery (
roles/bigquery.dataViewer) no projeto -
Usuário do BigQuery (
roles/bigquery.user) no projeto -
Leitor de objetos do Storage (
roles/storage.objectViewer) no projeto
-
Visualizador de dados do BigQuery (
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias usando personalizados papéis ou outros predefinidos papéis.
Configurar o metastore com o Serviço Gerenciado para Apache Spark
É possível configurar o catálogo de ambiente de execução do Lakehouse com o Serviço Gerenciado para Apache Spark usando o Spark ou o Flink:
Spark
Configure um novo cluster. Para criar um cluster do Serviço Gerenciado para Apache Spark, execute o comando
gcloud dataproc clusters createa seguir, que contém as configurações necessárias para usar o catálogo de ambiente de execução do Lakehouse:gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
Substitua:
CLUSTER_NAME: um nome para o cluster do Serviço Gerenciado para Apache Spark.PROJECT_ID: o ID do Google Cloud projeto em que você está criando o cluster.LOCATION: a região do Compute Engine em que você está criando o cluster.
Envie um job do Spark usando um dos seguintes métodos:
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\ spark.sql.catalog.CATALOG_NAME.type=bigquery,\ spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID,\ spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION,\ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Substitua:
PROJECT_ID: o ID do Google Cloud projeto que contém o cluster do Serviço Gerenciado para Apache Spark.CLUSTER_NAME: o nome do cluster do Serviço Gerenciado para Apache Spark que você está usando para executar o job do Spark SQL.REGION: a região do Compute Engine em que o cluster está localizado.LOCATION: o local dos recursos do BigQuery.CATALOG_NAME: o nome do catálogo do Spark a ser usado com o job SQL.WAREHOUSE_DIRECTORY: a pasta do Cloud Storage que contém seu data warehouse. Esse valor começa comgs://.SPARK_SQL_COMMAND: a consulta SQL do Spark que você quer executar. Essa consulta inclui os comandos para criar seus recursos. Por exemplo, para criar um namespace e uma tabela.
CLI do spark-sql
Noconsole, acesse a página Instâncias de VM. Google Cloud
Para se conectar a uma instância de VM do Serviço Gerenciado para Apache Spark, clique em SSH na linha que lista o nome da instância de VM principal do cluster do Serviço Gerenciado para Apache Spark, que é o nome do cluster seguido por um sufixo
-m. O resultado será o seguinte:Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$No terminal, execute o seguinte comando de inicialização do catálogo de ambiente de execução do Lakehouse:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.type=bigquery \ --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Substitua:
CATALOG_NAME: o nome do catálogo do Spark que você está usando com o job SQL.PROJECT_ID: o Google Cloud ID do projeto do catálogo de ambiente de execução do Lakehouse ao qual o catálogo do Spark está vinculado.LOCATION: o Google Cloud local do catálogo de ambiente de execução do Lakehouse.WAREHOUSE_DIRECTORY: a pasta do Cloud Storage que contém seu data warehouse. Esse valor começa comgs://.
Depois de se conectar ao cluster, o terminal do Spark mostra o comando
spark-sql, que pode ser usado para enviar jobs do Spark.spark-sql (default)>
Flink
- Crie um cluster do Serviço Gerenciado para Apache Spark com o componente Flink opcional ativado,
e verifique se você está usando o Serviço Gerenciado para Apache Spark
2.2ou mais recente. No Google Cloud console do, acesse a página Instâncias de VM.
Na lista de instâncias de máquina virtual, clique em SSH para se conectar à instância de VM principal do cluster do Serviço Gerenciado para Apache Spark, que é listada como o nome do cluster seguido por um sufixo
-m.Configure o plug-in do catálogo personalizado do Apache Iceberg para o catálogo de ambiente de execução do Lakehouse:
FLINK_VERSION=1.20 ICEBERG_VERSION=1.10.0 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/${ICEBERG_VERSION}/iceberg-bigquery-${ICEBERG_VERSION}.jar -P lib sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/${ICEBERG_VERSION}/iceberg-gcp-bundle-${ICEBERG_VERSION}.jar -P lib sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/${ICEBERG_VERSION}/iceberg-gcp-${ICEBERG_VERSION}.jar -P lib
Inicie a sessão do Flink no YARN:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Crie um catálogo no Flink:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp.bigquery.project-id'='PROJECT_ID', 'gcp.bigquery.location'='LOCATION' );
Substitua:
CATALOG_NAME: o identificador do catálogo do Flink, que está vinculado a um catálogo de ambiente de execução do Lakehouse.WAREHOUSE_DIRECTORY: o caminho base para o diretório do data warehouse (a pasta do Cloud Storage em que o Flink cria arquivos). Esse valor começa comgs://.PROJECT_ID: o ID do projeto do catálogo de ambiente de execução do Lakehouse ao qual o catálogo do Flink está vinculado.LOCATION: o local dos recursos do BigQuery.
A sessão do Flink agora está conectada ao catálogo de ambiente de execução do Lakehouse, e você pode executar comandos SQL do Flink.
Gerenciar recursos do catálogo de ambiente de execução do Lakehouse
Agora que você está conectado ao catálogo de ambiente de execução do Lakehouse, é possível criar e visualizar recursos com base nos metadados armazenados no catálogo de ambiente de execução do Lakehouse.
Por exemplo, tente executar os comandos a seguir na sessão interativa do Flink SQL para criar um banco de dados e uma tabela do Apache Iceberg.
Use o catálogo personalizado do Apache Iceberg:
USE CATALOG CATALOG_NAME;
Substitua
CATALOG_NAMEpelo identificador do catálogo do Flink.Crie um banco de dados, que cria um conjunto de dados no BigQuery:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Substitua
DATABASE_NAMEpelo nome do novo banco de dados.Use o banco de dados que você criou:
USE DATABASE_NAME;
Crie uma tabela do Apache Iceberg. O comando a seguir cria uma tabela de vendas de exemplo:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Substitua
ICEBERG_TABLE_NAMEpor um nome para a nova tabela.Ver metadados da tabela:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Liste as tabelas no banco de dados:
SHOW TABLES;
Ingerir dados na tabela
Depois de criar uma tabela do Apache Iceberg na seção anterior, é possível usar o Flink DataGen como uma fonte de dados para ingerir dados em tempo real na tabela. As etapas a seguir são um exemplo desse fluxo de trabalho:
Crie uma tabela temporária usando o DataGen:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Substitua:
DATABASE_NAME: o nome do banco de dados para armazenar a tabela temporária.TEMP_TABLE_NAME: um nome para a tabela temporária.ICEBERG_TABLE_NAME: o nome da tabela do Apache Iceberg que você criou na seção anterior.
Defina o paralelismo como 1:
SET 'parallelism.default' = '1';
Defina o intervalo de checkpoint:
SET 'execution.checkpointing.interval' = '10second';
Defina o checkpoint:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Inicie o job de streaming em tempo real:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
O resultado será o seguinte:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
Para verificar o status do job de streaming, faça o seguinte:
Noconsole, acesse a página Clusters. Google Cloud
Selecione o cluster.
Clique na guia Interfaces da Web.
Clique no link YARN ResourceManager.
Na interface do YARN ResourceManager , encontre sua sessão do Flink e clique no link ApplicationMaster em Tracking UI.
Na coluna Status, confirme se o status do job é Em execução.
Consulte dados de streaming no cliente Flink SQL:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Consulte dados de streaming no BigQuery:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Encerre o job de streaming no cliente Flink SQL:
STOP JOB 'JOB_ID';
Substitua
JOB_IDpelo ID do job que foi mostrado na saída quando você criou o job de streaming.
Configurar o metastore com o Serviço Gerenciado para Apache Spark
É possível configurar o catálogo de ambiente de execução do Lakehouse com o Serviço Gerenciado para Apache Spark usando o Spark SQL ou o PySpark.
Spark SQL
Crie um arquivo SQL com os comandos do Spark SQL que você quer executar no catálogo de ambiente de execução do Lakehouse. Por exemplo, esse comando cria um namespace e uma tabela:
SET `spark.sql.catalog.CATALOG_NAME`=`org.apache.iceberg.spark.SparkCatalog`; SET `spark.sql.catalog.CATALOG_NAME.type`=`bigquery`; SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id`=`PROJECT_ID`; SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location`=`LOCATION`; SET `spark.sql.catalog.CATALOG_NAME.warehouse`=`WAREHOUSE_DIRECTORY`; CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
Substitua:
CATALOG_NAME: o nome do catálogo que faz referência à tabela do Spark.NAMESPACE_NAME: o nome do namespace que faz referência à tabela do Spark.TABLE_NAME: um nome para a tabela do Spark.WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que o data warehouse está armazenado.
Envie um job em lote do Spark SQL executando o comando
gcloud dataproc batches submit spark-sqla seguir:gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar
Substitua:
SQL_SCRIPT_PATH: o caminho para o arquivo SQL que o job em lote usa.PROJECT_ID: o ID do Google Cloud projeto em que o job em lote será executado.REGION: a região em que a carga de trabalho é executada.SUBNET_NAME(opcional): o nome de uma sub-rede VPC naREGIONque atende aos requisitos de sub-rede da sessão.BUCKET_PATH: o local do bucket do Cloud Storage para fazer upload das dependências da carga de trabalho. OWAREHOUSE_DIRECTORYestá localizado nesse bucket. O prefixo de URIgs://do bucket não é obrigatório. É possível especificar o caminho ou o nome do bucket, por exemplo,mybucketname1.LOCATION: o local para executar o job em lote.
Para mais informações sobre como enviar jobs em lote do Spark, consulte Executar uma carga de trabalho em lote do Spark.
PySpark
Crie um arquivo Python com os comandos do PySpark que você quer executar no catálogo de ambiente de execução do Lakehouse.
Por exemplo, o comando a seguir configura um ambiente do Spark para interagir com tabelas do Apache Iceberg armazenadas no catálogo de ambiente de execução do Lakehouse. Em seguida, o comando cria um novo namespace e uma tabela do Apache Iceberg nesse namespace.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Lakehouse runtime catalog Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.type", "bigquery") \ .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
Substitua:
PROJECT_ID: o ID do Google Cloud projeto em que o job em lote será executado.LOCATION: o local em que os recursos do BigQuery estão localizados.CATALOG_NAME: o nome do catálogo que faz referência à tabela do Spark.TABLE_NAME: um nome para a tabela do Spark.WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que o data warehouse está armazenado.NAMESPACE_NAME: o nome do namespace que faz referência à tabela do Spark.
Envie o job em lote usando o seguinte
gcloud dataproc batches submit pysparkcomando:gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar
Substitua:
PYTHON_SCRIPT_PATH: o caminho para o script Python que o job em lote usa.PROJECT_ID: o ID do Google Cloud projeto em que o job em lote será executado.REGION: a região em que a carga de trabalho é executada.BUCKET_PATH: o local do bucket do Cloud Storage para fazer upload das dependências da carga de trabalho. O prefixo de URIgs://do bucket não é obrigatório. É possível especificar o caminho ou o nome do bucket, por exemplo,mybucketname1.
Para mais informações sobre como enviar jobs em lote do PySpark, consulte a referência do PySpark gcloud.
A seguir
- Criar e gerenciar recursos do metastore.
- Configure recursos opcionais do catálogo de ambiente de execução do Lakehouse.