Criar um lakehouse com o Spark e o catálogo de ambiente de execução do Lakehouse

Uma arquitetura de lakehouse combina a flexibilidade de um data lake com os recursos de gerenciamento de dados de um data warehouse. Este documento mostra como configurar um lakehouse no Google Cloud. Você usa o Apache Iceberg como formato de tabela, o Serviço Gerenciado para Apache Spark para processamento e o catálogo de ambiente de execução do Lakehouse Iceberg REST Catalog para gerenciamento unificado de metadados.

Essa arquitetura usa formatos de tabela abertos, como o Iceberg, para adicionar recursos de armazenamento em data warehouse, como transações e evolução de esquema, aos dados no Cloud Storage. Essa abordagem cria uma única fonte de verdade para seus dados que pode ser acessada por vários mecanismos.

Diagrama mostrando os componentes de uma arquitetura de lakehouse, incluindo o Serviço Gerenciado para Apache Spark, o Cloud Storage e o catálogo REST do lakehouse.
Diagrama da arquitetura do lakehouse.

Antes de começar

  1. Faça login na sua Google Cloud conta do. Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho dos nossos produtos em situações reais. Clientes novos também recebem US $300 em créditos para executar, testar e implantar cargas de trabalho.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Crie um bucket do Cloud Storage para armazenar dados do Iceberg.

Funções exigidas

Algumas funções do Identity and Access Management (IAM) são necessárias para executar os exemplos nesta página. Dependendo das políticas da organização, essas funções já podem ter sido concedidas. Para verificar as concessões de papéis, consulte Você precisa conceder papéis?.

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos,pastas e organizações.

Papéis do usuário

Para receber as permissões necessárias para criar um cluster do Serviço Gerenciado para Apache Spark, peça ao administrador para conceder a você os seguintes papéis do IAM:

Papel de conta de serviço

Para garantir que a conta de serviço padrão do Compute Engine tenha as permissões necessárias para criar um cluster do Serviço Gerenciado para Apache Spark, peça ao administrador para conceder o papel do IAM de Worker do Dataproc (roles/dataproc.worker) à conta de serviço padrão do Compute Engine no projeto.

Criar um cluster do Serviço Gerenciado para Apache Spark

Crie um cluster do Serviço Gerenciado para Apache Spark com os componentes opcionais do Iceberg e do Jupyter.

  1. Para criar o cluster, execute o seguinte comando gcloud:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.3-debian12 \
        --optional-components=ICEBERG,JUPYTER \
        --enable-component-gateway \
        --properties 'dataproc:dataproc.lineage.enabled=true'
    

    Substitua:

    • CLUSTER_NAME: um nome para o cluster.
    • PROJECT_ID: o ID do Google Cloud projeto.
    • REGION: aregião do Google Cloud cluster, por exemplo, us-central1.

    A definição de dataproc:dataproc.lineage.enabled=true não é necessária para que o catálogo REST do Iceberg do catálogo de ambiente de execução do Lakehouse funcione corretamente. Ele é adicionado para rastreamento de linhagem no exemplo de linhagem de dados abaixo.

  2. Conecte-se ao cluster usando um notebook do Jupyter. É possível usar um notebook do Vertex AI Workbench ou iniciar um notebook diretamente no cluster.

Configurar uma sessão do Spark

No notebook do Jupyter, crie uma sessão do Spark configurada para usar o catálogo REST do Iceberg do catálogo de ambiente de execução do Lakehouse.

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"

spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1beta/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://GCS_BUCKET') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', catalog_name) \
  .getOrCreate()

Substitua:

  • CATALOG_NAME: um nome para o catálogo do Iceberg, por exemplo, iceberg_catalog.
  • APP_NAME: o nome do aplicativo Spark.
  • GCS_BUCKET: o bucket do Cloud Storage para armazenar os dados da tabela do Iceberg.
  • PROJECT_ID: o ID do Google Cloud projeto.

Gerenciar dados com o Spark SQL

Depois de configurar a sessão do Spark, use o Spark SQL para realizar operações de gerenciamento de dados.

  1. Crie um namespace. No catálogo REST do Iceberg do catálogo de ambiente de execução do Lakehouse, um namespace corresponde a um conjunto de dados do BigQuery.

    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME")
    spark.sql("USE NAMESPACE_NAME")
    

    Substitua NAMESPACE_NAME pelo nome do namespace, por exemplo, spark_lakehouse.

  2. Crie uma tabela base no formato Iceberg e insira dados.

    spark.sql("DROP TABLE IF EXISTS base_table PURGE")
    spark.sql("CREATE TABLE base_table (id LONG) USING iceberg")
    spark.sql("INSERT INTO base_table VALUES 0, 1, 2, 3, 4")
    spark.sql("SELECT * FROM base_table").show()
    

    O resultado será assim:

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       +---+
    
  3. Crie uma segunda tabela para novos dados.

    spark.sql("DROP TABLE IF EXISTS newdata PURGE")
    spark.sql("CREATE TABLE newdata(id LONG) USING iceberg")
    spark.sql("INSERT INTO newdata VALUES 3, 4, 5, 6")
    spark.sql("SELECT * FROM newdata").show()
    

    O resultado será assim:

       +---+
       | id|
       +---+
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  4. Mescle os novos dados na tabela base.

    spark.sql("""MERGE INTO base_table USING newdata
               ON base_table.id = newdata.id
               WHEN MATCHED THEN
                 UPDATE SET base_table.id = newdata.id
               WHEN NOT MATCHED THEN INSERT * """)
    spark.sql("SELECT * FROM base_table").show()
    

    O resultado será assim:

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  5. Atualize os registros na tabela base.

    spark.sql(
         "UPDATE base_table SET id = (id + 100) WHERE (id % 2 == 0)"
     )
    spark.sql("SELECT * FROM base_table").show()
    

    O resultado será assim:

       +---+
       | id|
       +---+
       |  3|
       |104|
       |  5|
       |106|
       |100|
       |102|
       |  1|
       +---+
    
  6. Exclua registros da tabela base.

    spark.sql("DELETE FROM base_table WHERE (id % 2 == 0)")
    spark.sql("SELECT * FROM base_table").show()
    

    O resultado será assim:

       +---+
       | id|
       +---+
       |  3|
       |  5|
       |  1|
       +---+
    

Consultar um snapshot histórico

Recupere uma versão anterior de uma tabela consultando um ID de snapshot específico. Essa operação também é conhecida como viagem no tempo.

  1. Recupere o ID do snapshot da versão da tabela antes das operações MERGE, UPDATE e DELETE.

    snapshot_ids = spark.sql(
         "SELECT snapshot_id FROM `NAMESPACE_NAME`.`base_table`.snapshots"
    ).collect()
    oldest_snapshot_id = snapshot_ids[1]["snapshot_id"]
    

    Substitua NAMESPACE_NAME pelo namespace que você criou.

  2. Consulte a tabela usando o ID do snapshot recuperado.

    df = (
         spark.read.format("iceberg")
         .option("versionAsOf", oldest_snapshot_id)
         .load("base_table")
     )
    df.show()
    

    A saída mostra o estado da tabela após a operação MERGE, mas antes de qualquer operação UPDATE ou DELETE.

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    

Descobrir a linhagem de dados

É possível rastrear a movimentação de dados entre as tabelas do catálogo REST do Iceberg do catálogo de ambiente de execução do Lakehouse com a linhagem de dados, que está disponível no Serviço Gerenciado para Apache Spark 2.2 e versões de imagem mais recentes.

Exemplo de linhagem de dados

  1. Crie tabelas de origem e destino do Iceberg e copie os dados.

    spark.sql("DROP TABLE IF EXISTS source_table PURGE")
    spark.sql("DROP TABLE IF EXISTS target_table PURGE")
    spark.sql("CREATE TABLE source_table (id LONG) USING iceberg")
    spark.sql("""CREATE TABLE target_table
      USING ICEBERG
      AS SELECT max(id) as top_id FROM source_table
      """)
    
  2. No Google Cloud console, acesse a página Pesquisa do Knowledge Catalog.

    Acesse Pesquisar

  3. Pesquise uma das tabelas e clique na guia Lineage:

    Exemplo de linhagem de dados na página do Knowledge Catalog no console Google Cloud .
    Exemplo de gráfico de linhagem de dados na página do Knowledge Catalog no Google Cloud console.

    A linhagem de dados reconhece as representações lógicas (tabela do catálogo de ambiente de execução do Lakehouse) e físicas (Cloud Storage) das tabelas do catálogo REST do Iceberg do catálogo de ambiente de execução do Lakehouse.

Problema conhecido de linhagem de dados

Em alguns clusters do Serviço Gerenciado para Apache Spark, a linhagem de dados completa pode não ser gerada devido a um OpenLineage. Solução alternativa: na configuração da sessão do Spark, defina a propriedade spark.sql.catalog.{catalog_name}.uri como https://biglake.googleapis.com/iceberg/v1beta/restcatalog.

A seguir