Créer un lakehouse avec Spark et BigLake Metastore

Une architecture de lakehouse combine la flexibilité d'un lac de données avec les fonctionnalités de gestion des données d'un entrepôt de données. Ce document vous explique comment configurer un lakehouse sur Google Cloud. Vous utilisez Apache Iceberg comme format de table, Managed Service for Apache Spark pour le traitement et le catalogue REST Iceberg BigLake Metastore pour la gestion unifiée des métadonnées.

Cette architecture utilise des formats de table ouverts tels qu'Iceberg pour ajouter des fonctionnalités d'entreposage de données, telles que les transactions et l'évolution des schémas, aux données dans Cloud Storage. Cette approche crée une source unique de vérité pour vos données, accessible par différents moteurs.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud . Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $de crédits sans frais pour exécuter, tester et déployer des charges de travail.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Créez un bucket Cloud Storage pour stocker les données Iceberg.

Rôles requis

Certains rôles IAM sont requis pour exécuter les exemples de cette page. En fonction des règles d'administration, ces rôles peuvent déjà avoir été accordés. Pour vérifier les attributions de rôles, consultez Do you need to grant roles? (Devez-vous attribuer des rôles ?).

Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Rôles utilisateur

Pour obtenir les autorisations nécessaires pour créer un cluster Managed Service for Apache Spark, demandez à votre administrateur de vous accorder les rôles IAM suivants :

Rôle du compte de service

Pour vous assurer que le compte de service Compute Engine par défaut dispose des autorisations nécessaires pour créer un cluster Managed Service for Apache Spark, demandez à votre administrateur d'accorder au compte de service Compute Engine par défaut le rôle IAM Nœud de calcul Dataproc (roles/dataproc.worker) sur le projet.

Créer un cluster Managed Service pour Apache Spark

Créez un cluster Managed Service for Apache Spark avec les composants facultatifs Iceberg et Jupyter.

  1. Pour créer le cluster, exécutez la commande gcloud suivante :

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.3-debian12 \
        --optional-components=ICEBERG,JUPYTER \
        --enable-component-gateway
    

    Remplacez les éléments suivants :

    • CLUSTER_NAME : nom de votre cluster.
    • PROJECT_ID : ID de votre projet Google Cloud .
    • REGION : région Google Cloud du cluster, par exemple us-central1.
  2. Connectez-vous au cluster à l'aide d'un notebook Jupyter. Vous pouvez utiliser un notebook Vertex AI Workbench ou lancer un notebook directement sur le cluster.

Configurer une session Spark

Dans votre notebook Jupyter, créez une session Spark configurée pour utiliser le catalogue REST Iceberg BigLake Metastore.

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"

spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1beta/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://GCS_BUCKET') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', catalog_name) \
  .getOrCreate()

Remplacez les éléments suivants :

  • CATALOG_NAME : nom de votre catalogue Iceberg, par exemple iceberg_catalog.
  • APP_NAME : nom de votre application Spark.
  • GCS_BUCKET : bucket Cloud Storage dans lequel stocker les données de votre table Iceberg.
  • PROJECT_ID : ID de votre projet Google Cloud .

Gérer les données avec Spark SQL

Après avoir configuré la session Spark, utilisez Spark SQL pour effectuer des opérations de gestion des données.

  1. Créez un espace de noms. Dans le catalogue Iceberg REST du metastore BigLake, un espace de noms correspond à un ensemble de données BigQuery.

    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME")
    spark.sql("USE NAMESPACE_NAME")
    

    Remplacez NAMESPACE_NAME par le nom de votre espace de noms, par exemple spark_lakehouse.

  2. Créez une table de base au format Iceberg et insérez des données.

    spark.sql("DROP TABLE IF EXISTS base_table PURGE")
    spark.sql("CREATE TABLE base_table (id LONG) USING iceberg")
    spark.sql("INSERT INTO base_table VALUES 0, 1, 2, 3, 4")
    spark.sql("SELECT * FROM base_table").show()
    

    Le résultat ressemble à ce qui suit :

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       +---+
    
  3. Créez une deuxième table pour les nouvelles données.

    spark.sql("DROP TABLE IF EXISTS newdata PURGE")
    spark.sql("CREATE TABLE newdata(id LONG) USING iceberg")
    spark.sql("INSERT INTO newdata VALUES 3, 4, 5, 6")
    spark.sql("SELECT * FROM newdata").show()
    

    Le résultat ressemble à ce qui suit :

       +---+
       | id|
       +---+
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  4. Fusionnez les nouvelles données dans la table de base.

    spark.sql("""MERGE INTO base_table USING newdata
               ON base_table.id = newdata.id
               WHEN MATCHED THEN
                 UPDATE SET base_table.id = newdata.id
               WHEN NOT MATCHED THEN INSERT * """)
    spark.sql("SELECT * FROM base_table").show()
    

    Le résultat ressemble à ce qui suit :

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  5. Mettre à jour les enregistrements dans la table de base.

    spark.sql(
         "UPDATE base_table SET id = (id + 100) WHERE (id % 2 == 0)"
     )
    spark.sql("SELECT * FROM base_table").show()
    

    Le résultat ressemble à ce qui suit :

       +---+
       | id|
       +---+
       |  3|
       |104|
       |  5|
       |106|
       |100|
       |102|
       |  1|
       +---+
    
  6. Supprimez des enregistrements de la table de base.

    spark.sql("DELETE FROM base_table WHERE (id % 2 == 0)")
    spark.sql("SELECT * FROM base_table").show()
    

    Le résultat ressemble à ce qui suit :

       +---+
       | id|
       +---+
       |  3|
       |  5|
       |  1|
       +---+
    

Interroger un instantané historique

Récupérez une version précédente d'une table en interrogeant un ID d'instantané spécifique. Cette opération est également appelée "voyage dans le temps".

  1. Récupérez l'ID d'instantané de la version de la table avant les opérations MERGE, UPDATE et DELETE.

    snapshot_ids = spark.sql(
         "SELECT snapshot_id FROM `NAMESPACE_NAME`.`base_table`.snapshots"
    ).collect()
    oldest_snapshot_id = snapshot_ids[1]["snapshot_id"]
    

    Remplacez NAMESPACE_NAME par l'espace de noms que vous avez créé.

  2. Interrogez la table à l'aide de l'ID d'instantané récupéré.

    df = (
         spark.read.format("iceberg")
         .option("versionAsOf", oldest_snapshot_id)
         .load("base_table")
     )
    df.show()
    

    Le résultat affiche l'état de la table après l'opération MERGE, mais avant toute opération UPDATE ou DELETE.

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    

Étapes suivantes