Lakehouse mit Spark und Lakehouse-Laufzeitkatalog erstellen

Eine Lakehouse-Architektur kombiniert die Flexibilität eines Data Lake mit den Datenverwaltungsfunktionen eines Data Warehouse. In diesem Dokument wird beschrieben, wie Sie ein Lakehouse in erstellen Google Cloud. Sie verwenden Apache Iceberg als Tabellenformat, Managed Service for Apache Spark für die Verarbeitung und den Iceberg-REST-Katalog des Lakehouse-Laufzeitkatalogs für die einheitliche Metadatenverwaltung.

Diese Architektur verwendet offene Tabellenformate wie Iceberg, um Daten in Cloud Storage Data-Warehouse-Funktionen wie Transaktionen und Schema-Weiterentwicklung hinzuzufügen. Dieser Ansatz schafft eine einzige Quelle der Wahrheit für Ihre Daten, auf die verschiedene Engines zugreifen können.

Diagramm mit den Komponenten einer Lakehouse-Architektur, einschließlich Managed Service for Apache Spark, Cloud Storage und Lakehouse REST-Katalog.
Diagramm der Lakehouse-Architektur.

Hinweis

  1. Melden Sie sich in Ihrem Google Cloud Konto an. Wenn Sie noch kein Konto haben Google Cloud, erstellen Sie ein Konto, um die Leistung unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Erstellen Sie einen Cloud Storage-Bucket zum Speichern von Iceberg-Daten.

Erforderliche Rollen

Bestimmte IAM-Rollen (Identity and Access Management) sind erforderlich, um die Beispiele auf dieser Seite auszuführen. Je nach Organisationsrichtlinien wurden diese Rollen möglicherweise bereits gewährt. Informationen zum Prüfen von Rollenzuweisungen finden Sie unter Müssen Sie Rollen zuweisen?.

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Nutzerrollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Managed Service for Apache Spark-Clusters benötigen:

Dienstkontorolle

Bitten Sie Ihren Administrator, dem Compute Engine-Standarddienstkonto die IAM-Rolle Dataproc Worker (roles/dataproc.worker) für das Projekt zuzuweisen, damit das Compute Engine-Standarddienstkonto die erforderlichen Berechtigungen zum Erstellen eines Managed Service for Apache Spark-Clusters hat.

Managed Service for Apache Spark-Cluster erstellen

Erstellen Sie einen Managed Service for Apache Spark-Cluster mit den optionalen Komponenten Iceberg und Jupyter.

  1. Führen Sie den folgenden gcloud-Befehl aus, um den Cluster zu erstellen:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.3-debian12 \
        --optional-components=ICEBERG,JUPYTER \
        --enable-component-gateway \
        --properties 'dataproc:dataproc.lineage.enabled=true'
    

    Ersetzen Sie Folgendes:

    • CLUSTER_NAME: ein Name für Ihren Cluster.
    • PROJECT_ID: Ihre Google Cloud Projekt-ID.
    • REGION: die Google Cloud Region für den Cluster, z. B. us-central1.

    Das Festlegen von dataproc:dataproc.lineage.enabled=true ist nicht erforderlich, damit der Iceberg-REST-Katalog des Lakehouse-Laufzeitkatalogs ordnungsgemäß funktioniert. Er wird für die Lineage-Verfolgung im folgenden Beispiel für die Datenherkunft hinzugefügt.

  2. Stellen Sie über ein Jupyter-Notebook eine Verbindung zum Cluster her. Sie können ein Vertex AI Workbench-Notebook verwenden oder ein Notebook direkt im Cluster starten.

Spark-Sitzung konfigurieren

Erstellen Sie in Ihrem Jupyter-Notebook eine Spark-Sitzung, die für die Verwendung des Iceberg-REST-Katalogs des Lakehouse-Laufzeitkatalogs konfiguriert ist.

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"

spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1beta/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://GCS_BUCKET') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', catalog_name) \
  .getOrCreate()

Ersetzen Sie Folgendes:

  • CATALOG_NAME: ein Name für Ihren Iceberg-Katalog, z. B. iceberg_catalog.
  • APP_NAME: der Name Ihrer Spark-Anwendung.
  • GCS_BUCKET: der Cloud Storage-Bucket zum Speichern Ihrer Iceberg-Tabellendaten.
  • PROJECT_ID: Ihre Google Cloud Projekt-ID.

Daten mit Spark SQL verwalten

Nachdem Sie die Spark-Sitzung konfiguriert haben, können Sie mit Spark SQL Datenverwaltungsaufgaben ausführen.

  1. Namespace erstellen Im Iceberg-REST-Katalog des Lakehouse-Laufzeitkatalogs entspricht ein Namespace einem BigQuery-Dataset.

    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME")
    spark.sql("USE NAMESPACE_NAME")
    

    Ersetzen Sie NAMESPACE_NAME durch den Namen für Ihren Namespace, z. B. spark_lakehouse.

  2. Erstellen Sie eine Basistabelle im Iceberg-Format und fügen Sie Daten ein.

    spark.sql("DROP TABLE IF EXISTS base_table PURGE")
    spark.sql("CREATE TABLE base_table (id LONG) USING iceberg")
    spark.sql("INSERT INTO base_table VALUES 0, 1, 2, 3, 4")
    spark.sql("SELECT * FROM base_table").show()
    

    Die Ausgabe sieht etwa so aus:

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       +---+
    
  3. Erstellen Sie eine zweite Tabelle für neue Daten.

    spark.sql("DROP TABLE IF EXISTS newdata PURGE")
    spark.sql("CREATE TABLE newdata(id LONG) USING iceberg")
    spark.sql("INSERT INTO newdata VALUES 3, 4, 5, 6")
    spark.sql("SELECT * FROM newdata").show()
    

    Die Ausgabe sieht etwa so aus:

       +---+
       | id|
       +---+
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  4. Führen Sie die neuen Daten in die Basistabelle zusammen.

    spark.sql("""MERGE INTO base_table USING newdata
               ON base_table.id = newdata.id
               WHEN MATCHED THEN
                 UPDATE SET base_table.id = newdata.id
               WHEN NOT MATCHED THEN INSERT * """)
    spark.sql("SELECT * FROM base_table").show()
    

    Die Ausgabe sieht etwa so aus:

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  5. Aktualisieren Sie Datensätze in der Basistabelle.

    spark.sql(
         "UPDATE base_table SET id = (id + 100) WHERE (id % 2 == 0)"
     )
    spark.sql("SELECT * FROM base_table").show()
    

    Die Ausgabe sieht etwa so aus:

       +---+
       | id|
       +---+
       |  3|
       |104|
       |  5|
       |106|
       |100|
       |102|
       |  1|
       +---+
    
  6. Löschen Sie Datensätze aus der Basistabelle.

    spark.sql("DELETE FROM base_table WHERE (id % 2 == 0)")
    spark.sql("SELECT * FROM base_table").show()
    

    Die Ausgabe sieht etwa so aus:

       +---+
       | id|
       +---+
       |  3|
       |  5|
       |  1|
       +---+
    

Verlaufs-Snapshot abfragen

Rufen Sie eine frühere Version einer Tabelle ab, indem Sie eine bestimmte Snapshot-ID abfragen. Diese Operation wird auch als Zeitreise bezeichnet.

  1. Rufen Sie die Snapshot-ID der Tabellenversion vor den Vorgängen MERGE, UPDATE und DELETE ab.

    snapshot_ids = spark.sql(
         "SELECT snapshot_id FROM `NAMESPACE_NAME`.`base_table`.snapshots"
    ).collect()
    oldest_snapshot_id = snapshot_ids[1]["snapshot_id"]
    

    Ersetzen Sie NAMESPACE_NAME durch den von Ihnen erstellten Namespace.

  2. Fragen Sie die Tabelle mit der abgerufenen Snapshot-ID ab.

    df = (
         spark.read.format("iceberg")
         .option("versionAsOf", oldest_snapshot_id)
         .load("base_table")
     )
    df.show()
    

    Die Ausgabe zeigt den Status der Tabelle nach dem Vorgang MERGE, aber vor allen Vorgängen UPDATE oder DELETE.

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    

Datenherkunft ermitteln

Mit der Datenherkunft können Sie die Datenbewegung zwischen Tabellen des Lakehouse-Laufzeitkatalogs Iceberg-REST-Katalogs verfolgen. Diese Funktion ist in Managed Service for Apache Spark-Imageversionen ab 2.2 verfügbar.

Beispiel für Datenherkunft

  1. Erstellen Sie Iceberg-Quell- und -Zieltabellen und kopieren Sie dann Daten.

    spark.sql("DROP TABLE IF EXISTS source_table PURGE")
    spark.sql("DROP TABLE IF EXISTS target_table PURGE")
    spark.sql("CREATE TABLE source_table (id LONG) USING iceberg")
    spark.sql("""CREATE TABLE target_table
      USING ICEBERG
      AS SELECT max(id) as top_id FROM source_table
      """)
    
  2. Rufen Sie in der Google Cloud Console die Seite Suchen des Knowledge Catalog auf.

    Zur Suche

  3. Suchen Sie nach einer der Tabellen und klicken Sie dann auf den Tab Lineage:

    Beispiel für die Datenherkunft auf der Seite „Knowledge Catalog“ in der Google Cloud Console.
    Beispiel für ein Diagramm zur Datenherkunft auf der Knowledge Catalog-Seite in der Google Cloud Console.

    Die Datenherkunft erkennt sowohl die logische (Tabelle des Lakehouse-Laufzeitkatalogs) als auch die physische (Cloud Storage) Darstellung von Tabellen des Iceberg-REST-Katalogs des Lakehouse-Laufzeitkatalogs.

Bekanntes Problem mit der Datenherkunft

In einigen Managed Service for Apache Spark-Clustern wird die vollständige Datenherkunft möglicherweise nicht generiert aufgrund eines OpenLineage Bibliotheksproblems. Problemumgehung: Legen Sie in der Spark-Sitzungskonfiguration das Attribut spark.sql.catalog.{catalog_name}.uri auf https://biglake.googleapis.com/iceberg/v1beta/restcatalog fest.

Nächste Schritte