Crea un lakehouse con Spark e BigLake Metastore

Un'architettura lakehouse combina la flessibilità di un data lake con le funzionalità di gestione dei dati di un data warehouse. Questo documento mostra come configurare un lakehouse su Google Cloud. Utilizzi Apache Iceberg come formato della tabella, Managed Service for Apache Spark per l'elaborazione e il catalogo REST di BigLake metastore Iceberg per la gestione unificata dei metadati.

Questa architettura utilizza formati di tabella aperti come Iceberg per aggiungere funzionalità di data warehousing, come transazioni ed evoluzione dello schema, ai dati in Cloud Storage. Questo approccio crea un'unica fonte attendibile per i tuoi dati, accessibile a vari motori.

Prima di iniziare

  1. Accedi al tuo account Google Cloud . Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti senza costi per l'esecuzione, il test e il deployment dei workload.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Crea un bucket Cloud Storage per archiviare i dati Iceberg.

Ruoli obbligatori

Per eseguire gli esempi riportati in questa pagina sono necessari determinati ruoli IAM. A seconda delle policy dell'organizzazione, questi ruoli potrebbero essere già stati concessi. Per controllare le concessioni dei ruoli, consulta Devi concedere i ruoli?.

Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Ruoli utente

Per ottenere le autorizzazioni necessarie per creare un cluster Managed Service for Apache Spark, chiedi all'amministratore di concederti i seguenti ruoli IAM:

Ruolo account di servizio

Per assicurarti che il account di servizio predefinito di Compute Engine disponga delle autorizzazioni necessarie per creare un cluster Managed Service for Apache Spark, chiedi all'amministratore di concedere il ruolo IAM Dataproc Worker (roles/dataproc.worker) al account di servizio predefinito di Compute Engine sul progetto.

Crea un cluster Managed Service per Apache Spark

Crea un cluster Managed Service for Apache Spark con i componenti facoltativi Iceberg e Jupyter.

  1. Per creare il cluster, esegui questo comando gcloud:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.3-debian12 \
        --optional-components=ICEBERG,JUPYTER \
        --enable-component-gateway
    

    Sostituisci quanto segue:

    • CLUSTER_NAME: un nome per il cluster.
    • PROJECT_ID: il tuo ID progetto Google Cloud .
    • REGION: la Google Cloud regione del cluster, ad esempio us-central1.
  2. Connettiti al cluster utilizzando un blocco note Jupyter. Puoi utilizzare un notebook Vertex AI Workbench o avviare un notebook direttamente sul cluster.

Configura una sessione Spark

Nel notebook Jupyter, crea una sessione Spark configurata per utilizzare il catalogo REST BigLake metastore Iceberg.

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"

spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1beta/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://GCS_BUCKET') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', catalog_name) \
  .getOrCreate()

Sostituisci quanto segue:

  • CATALOG_NAME: un nome per il catalogo Iceberg, ad esempio iceberg_catalog.
  • APP_NAME: il nome dell'applicazione Spark.
  • GCS_BUCKET: il bucket Cloud Storage in cui archiviare i dati della tabella Iceberg.
  • PROJECT_ID: il tuo ID progetto Google Cloud .

Gestire i dati con Spark SQL

Dopo aver configurato la sessione Spark, utilizza Spark SQL per eseguire operazioni di gestione dei dati.

  1. Crea uno spazio dei nomi. Nel catalogo REST di Iceberg di BigLake Metastore, uno spazio dei nomi corrisponde a un set di dati BigQuery.

    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME")
    spark.sql("USE NAMESPACE_NAME")
    

    Sostituisci NAMESPACE_NAME con il nome del tuo spazio dei nomi, ad esempio spark_lakehouse.

  2. Crea una tabella di base in formato Iceberg e inserisci i dati.

    spark.sql("DROP TABLE IF EXISTS base_table PURGE")
    spark.sql("CREATE TABLE base_table (id LONG) USING iceberg")
    spark.sql("INSERT INTO base_table VALUES 0, 1, 2, 3, 4")
    spark.sql("SELECT * FROM base_table").show()
    

    L'output è simile al seguente:

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       +---+
    
  3. Crea una seconda tabella per i nuovi dati.

    spark.sql("DROP TABLE IF EXISTS newdata PURGE")
    spark.sql("CREATE TABLE newdata(id LONG) USING iceberg")
    spark.sql("INSERT INTO newdata VALUES 3, 4, 5, 6")
    spark.sql("SELECT * FROM newdata").show()
    

    L'output è simile al seguente:

       +---+
       | id|
       +---+
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  4. Unisci i nuovi dati nella tabella di base.

    spark.sql("""MERGE INTO base_table USING newdata
               ON base_table.id = newdata.id
               WHEN MATCHED THEN
                 UPDATE SET base_table.id = newdata.id
               WHEN NOT MATCHED THEN INSERT * """)
    spark.sql("SELECT * FROM base_table").show()
    

    L'output è simile al seguente:

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  5. Aggiorna i record nella tabella di base.

    spark.sql(
         "UPDATE base_table SET id = (id + 100) WHERE (id % 2 == 0)"
     )
    spark.sql("SELECT * FROM base_table").show()
    

    L'output è simile al seguente:

       +---+
       | id|
       +---+
       |  3|
       |104|
       |  5|
       |106|
       |100|
       |102|
       |  1|
       +---+
    
  6. Elimina i record dalla tabella di base.

    spark.sql("DELETE FROM base_table WHERE (id % 2 == 0)")
    spark.sql("SELECT * FROM base_table").show()
    

    L'output è simile al seguente:

       +---+
       | id|
       +---+
       |  3|
       |  5|
       |  1|
       +---+
    

Eseguire una query su un'istantanea storica

Recupera una versione precedente di una tabella eseguendo una query su un ID snapshot specifico. Questa operazione è nota anche come viaggio nel tempo.

  1. Recupera l'ID snapshot della versione della tabella prima delle operazioni MERGE, UPDATE e DELETE.

    snapshot_ids = spark.sql(
         "SELECT snapshot_id FROM `NAMESPACE_NAME`.`base_table`.snapshots"
    ).collect()
    oldest_snapshot_id = snapshot_ids[1]["snapshot_id"]
    

    Sostituisci NAMESPACE_NAME con lo spazio dei nomi che hai creato.

  2. Esegui una query sulla tabella utilizzando l'ID snapshot recuperato.

    df = (
         spark.read.format("iceberg")
         .option("versionAsOf", oldest_snapshot_id)
         .load("base_table")
     )
    df.show()
    

    L'output mostra lo stato della tabella dopo l'operazione MERGE, ma prima di qualsiasi operazione UPDATE o DELETE.

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    

Passaggi successivi