Criar um lakehouse com o Spark e o catálogo de ambientes de execução do Lakehouse
Uma arquitetura de lakehouse combina a flexibilidade de um data lake com os recursos de gerenciamento de dados de um data warehouse. Neste documento, mostramos como configurar um lakehouse no Google Cloud. Você usa o Apache Iceberg como formato de tabela, o Serviço Gerenciado para Apache Spark para processamento e o catálogo de tempo de execução do Lakehouse, o catálogo REST do Iceberg, para gerenciamento unificado de metadados.
Essa arquitetura usa formatos de tabela abertos, como o Iceberg, para adicionar recursos de armazenamento em data warehouse, como transações e evolução de esquema, aos dados no Cloud Storage. Essa abordagem cria uma única fonte de verdade para seus dados, que pode ser acessada por vários mecanismos.
Antes de começar
- Faça login na sua conta do Google Cloud . Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that you have the permissions required to complete this guide.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.- Crie um bucket do Cloud Storage para armazenar dados do Iceberg.
Funções exigidas
Alguns papéis do Identity and Access Management (IAM) são necessários para executar os exemplos nesta página. Dependendo das políticas da organização, essas funções já podem ter sido concedidas. Para verificar as concessões de papéis, consulte Você precisa conceder papéis?.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Papéis do usuário
Para receber as permissões necessárias para criar um cluster do Managed Service for Apache Spark, peça ao administrador para conceder a você os seguintes papéis do IAM:
- Editor do Dataproc (
roles/dataproc.editor) no projeto - Usuário da conta de serviço (
roles/iam.serviceAccountUser) na conta de serviço padrão do Compute Engine
Papel de conta de serviço
Para garantir que a conta de serviço padrão do Compute Engine tenha as permissões necessárias para criar um cluster do Managed Service para Apache Spark, peça ao administrador para conceder o papel do IAM Worker do Dataproc (roles/dataproc.worker) à conta de serviço padrão do Compute Engine no projeto.
Criar um cluster do Serviço Gerenciado para Apache Spark
Crie um cluster do Serviço Gerenciado para Apache Spark com os componentes opcionais Iceberg e Jupyter.
Para criar o cluster, execute o seguinte comando
gcloud:gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.3-debian12 \ --optional-components=ICEBERG,JUPYTER \ --enable-component-gateway \ --properties 'dataproc:dataproc.lineage.enabled=true'Substitua:
CLUSTER_NAME: um nome para o cluster.PROJECT_ID: o ID do projeto Google Cloud .REGION: a região Google Cloud do cluster, por exemplo,us-central1.
Observação: definir
dataproc:dataproc.lineage.enabled=truenão é necessário para que o catálogo REST do Iceberg do catálogo de tempo de execução do Lakehouse funcione corretamente. Ele é adicionado para rastreamento de linhagem no exemplo de linhagem de dados abaixo.Conecte-se ao cluster usando um notebook do Jupyter. É possível usar um notebook do Vertex AI Workbench ou iniciar um notebook diretamente no cluster.
Configurar uma sessão do Spark
No notebook do Jupyter, crie uma sessão do Spark configurada para usar o catálogo de tempo de execução do Lakehouse Catálogo REST do Iceberg.
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
.config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
.config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
.config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1beta/restcatalog') \
.config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://GCS_BUCKET') \
.config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
.config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
.config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
.config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', catalog_name) \
.getOrCreate()
Substitua:
CATALOG_NAME: um nome para o catálogo do Iceberg, por exemplo,iceberg_catalog.APP_NAME: o nome do seu aplicativo Spark.GCS_BUCKET: o bucket do Cloud Storage para armazenar os dados da tabela do Iceberg.PROJECT_ID: o ID do projeto Google Cloud .
Gerenciar dados com o Spark SQL
Depois de configurar a sessão do Spark, use o Spark SQL para realizar operações de gerenciamento de dados.
Crie um namespace No catálogo REST do Iceberg do catálogo de ambientes de execução do Lakehouse, um namespace corresponde a um conjunto de dados do BigQuery.
spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME") spark.sql("USE NAMESPACE_NAME")Substitua
NAMESPACE_NAMEpelo nome do namespace, por exemplo,spark_lakehouse.Crie uma tabela de base no formato Iceberg e insira dados.
spark.sql("DROP TABLE IF EXISTS base_table PURGE") spark.sql("CREATE TABLE base_table (id LONG) USING iceberg") spark.sql("INSERT INTO base_table VALUES 0, 1, 2, 3, 4") spark.sql("SELECT * FROM base_table").show()O resultado será o seguinte:
+---+ | id| +---+ | 0| | 1| | 2| | 3| | 4| +---+Crie uma segunda tabela para novos dados.
spark.sql("DROP TABLE IF EXISTS newdata PURGE") spark.sql("CREATE TABLE newdata(id LONG) USING iceberg") spark.sql("INSERT INTO newdata VALUES 3, 4, 5, 6") spark.sql("SELECT * FROM newdata").show()O resultado será o seguinte:
+---+ | id| +---+ | 3| | 4| | 5| | 6| +---+Mescle os novos dados na tabela de base.
spark.sql("""MERGE INTO base_table USING newdata ON base_table.id = newdata.id WHEN MATCHED THEN UPDATE SET base_table.id = newdata.id WHEN NOT MATCHED THEN INSERT * """) spark.sql("SELECT * FROM base_table").show()O resultado será o seguinte:
+---+ | id| +---+ | 0| | 1| | 2| | 3| | 4| | 5| | 6| +---+Atualize os registros na tabela de base.
spark.sql( "UPDATE base_table SET id = (id + 100) WHERE (id % 2 == 0)" ) spark.sql("SELECT * FROM base_table").show()O resultado será o seguinte:
+---+ | id| +---+ | 3| |104| | 5| |106| |100| |102| | 1| +---+Exclua registros da tabela de base.
spark.sql("DELETE FROM base_table WHERE (id % 2 == 0)") spark.sql("SELECT * FROM base_table").show()O resultado será o seguinte:
+---+ | id| +---+ | 3| | 5| | 1| +---+
Consultar um snapshot histórico
Recupere uma versão anterior de uma tabela consultando um ID de snapshot específico. Essa operação também é conhecida como viagem no tempo.
Recupere o ID do snapshot da versão da tabela antes das operações
MERGE,UPDATEeDELETE.snapshot_ids = spark.sql( "SELECT snapshot_id FROM `NAMESPACE_NAME`.`base_table`.snapshots" ).collect() oldest_snapshot_id = snapshot_ids[1]["snapshot_id"]Substitua
NAMESPACE_NAMEpelo namespace que você criou.Consulte a tabela usando o ID do snapshot recuperado.
df = ( spark.read.format("iceberg") .option("versionAsOf", oldest_snapshot_id) .load("base_table") ) df.show()A saída mostra o estado da tabela após a operação
MERGE, mas antes de qualquer operaçãoUPDATEouDELETE.+---+ | id| +---+ | 0| | 1| | 2| | 3| | 4| | 5| | 6| +---+
Descobrir a linhagem de dados
É possível rastrear a movimentação de dados entre tabelas do catálogo de ambientes de execução do Lakehouse e do catálogo REST do Iceberg com a linhagem de dados, que está disponível no Serviço Gerenciado para Apache Spark 2.2 e em versões de imagem mais recentes.
Exemplo de linhagem de dados
Crie tabelas Iceberg de origem e de destino e copie os dados.
spark.sql("DROP TABLE IF EXISTS source_table PURGE") spark.sql("DROP TABLE IF EXISTS target_table PURGE") spark.sql("CREATE TABLE source_table (id LONG) USING iceberg") spark.sql("""CREATE TABLE target_table USING ICEBERG AS SELECT max(id) as top_id FROM source_table """)No console Google Cloud , acesse a página Pesquisa do Knowledge Catalog.
Pesquise uma das tabelas e clique na guia
Lineage:
Exemplo de um gráfico de linhagem de dados na página do Knowledge Catalog no console Google Cloud . A linhagem de dados reconhece as representações lógica (tabela do catálogo de ambiente de execução do Lakehouse) e física (Cloud Storage) das tabelas do catálogo REST do Iceberg do catálogo de ambiente de execução do Lakehouse.
Problema conhecido da linhagem de dados
Em alguns clusters do Serviço Gerenciado para Apache Spark, talvez não seja possível gerar a linhagem de dados completa devido a um problema na biblioteca OpenLineage.
Solução alternativa: na configuração da sessão do Spark, defina a propriedade spark.sql.catalog.{catalog_name}.uri como https://biglake.googleapis.com/iceberg/v1beta/restcatalog.
A seguir
- Saiba mais sobre o catálogo REST do Iceberg do catálogo de ambientes de execução do Lakehouse.
- Conheça os recursos do Apache Iceberg.
- Saiba como consultar dados do Iceberg no catálogo de ambientes de execução do Lakehouse.
- Saiba mais sobre a linhagem de dados e o Serviço Gerenciado para Apache Spark.