Zusätzliche Katalogfunktionen für die Lakehouse-Laufzeit

Zum Anpassen der Konfiguration Ihres Lakehouse-Laufzeitkatalogs können Sie die folgenden zusätzlichen Funktionen verwenden:

  • Apache Spark Iceberg-Prozeduren
  • Die Filteroption für nicht unterstützte Tabellen
  • Überschreibungen für BigQuery-Verbindungen
  • Richtlinien zur Zugriffssteuerung für Iceberg-Tabellen im Lakehouse-Laufzeitkatalog

Apache Iceberg Spark-Prozeduren verwenden

Wenn Sie Apache Spark-Prozeduren verwenden möchten, müssen Sie Apache Iceberg SQL Erweiterungen in Ihre Apache Spark-Konfiguration einbeziehen. Sie können beispielsweise eine Prozedur erstellen, um ein Rollback auf einen vorherigen Zustand durchzuführen.

Interaktives Apache Spark-SQL verwenden, um zu einem vorherigen Zustand zurückzukehren

Mit einer Apache Spark-Prozedur können Sie eine Tabelle erstellen, ändern und für die Tabelle ein Rollback zu ihrem vorherigen Zustand durchführen. Beispiel:

  1. Apache Spark-Tabelle erstellen:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Ersetzen Sie Folgendes:

    • BIGLAKE_ICEBERG_CATALOG_JAR: die Cloud Storage-URI des benutzerdefinierten Apache Iceberg-Katalog-Plug-ins, das verwendet werden soll. Wählen Sie je nach Apache Iceberg-Versionsnummer eine der folgenden Optionen aus:
      • Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
      • Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
    • CATALOG_NAME: der Katalogname, der auf Ihre Apache Spark-Tabelle verweist.
    • PROJECT_ID: die ID des Google Cloud Projekts.

    • WAREHOUSE_DIRECTORY: die URI des Cloud Storage-Ordners, in dem sich Ihr Data Warehouse befindet.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Ersetzen Sie Folgendes:

    • NAMESPACE_NAME: der Namespace-Name, der auf Ihre Apache Spark-Tabelle verweist.
    • TABLE_NAME: ein Tabellenname, der auf Ihre Apache Spark-Tabelle verweist.

    Die Ausgabe enthält Details zur Tabellenkonfiguration:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Ändern Sie die Tabelle noch einmal und kehren Sie dann zum zuvor erstellten Snapshot 1659239298328512231 zurück:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Ersetzen Sie Folgendes:

    • SNAPSHOT_ID: die ID des Snapshots, zu dem Sie zurückkehren.

    Die Ausgabe sieht etwa so aus:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Nicht unterstützte Tabellen aus Tabellenauflistungsfunktionen filtern

Wenn Sie Apache Spark SQL mit dem Lakehouse-Laufzeitkatalog verwenden, werden mit dem Befehl SHOW TABLES alle Tabellen im angegebenen Namespace angezeigt, auch solche, die nicht mit Apache Spark kompatibel sind.

Wenn Sie nur unterstützte Tabellen anzeigen möchten, aktivieren Sie die Option filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Ersetzen Sie Folgendes:

  • BIGLAKE_ICEBERG_CATALOG_JAR: die Cloud Storage-URI des benutzerdefinierten Apache Iceberg-Katalog-Plug-ins, das verwendet werden soll. Wählen Sie je nach Apache Iceberg-Versionsnummer eine der folgenden Optionen aus:
    • Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
    • Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
  • CATALOG_NAME: der Name des zu verwendenden Apache Spark-Katalogs.
  • PROJECT_ID: die ID des zu verwendenden Projekts. Google Cloud
  • LOCATION: der Standort der BigQuery-Ressourcen.
  • WAREHOUSE_DIRECTORY: der Cloud Storage-Ordner, der als Data Warehouse verwendet werden soll.

Überschreibung für BigQuery-Verbindung festlegen

Sie können BigQuery-Verbindungen verwenden, um auf Daten zuzugreifen, die außerhalb von BigQuery gespeichert sind, z. B. in Cloud Storage.

Führen Sie die folgenden Schritte aus, um eine Überschreibung für eine BigQuery-Verbindung festzulegen, die Zugriff auf einen Cloud Storage-Bucket bietet:

  1. Erstellen Sie in Ihrem BigQuery-Projekt eine neue Verbindung zu Ihrer Cloud Storage-Ressource. Diese Verbindung definiert, wie BigQuery auf Ihre Daten zugreift.

  2. Weisen Sie dem Nutzer oder Dienstkonto, das auf die Daten zugreift, die Rolle roles/bigquery.connectionUser für die Verbindung zu.

    Die Verbindungsressource muss sich am selben Standort wie die Zielressourcen in BigQuery befinden. Weitere Informationen finden Sie unter Verbindungen verwalten.

  3. Geben Sie die Verbindung in Ihrer Apache Iceberg-Tabelle mit der Eigenschaft bq_connection an:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Ersetzen Sie Folgendes:

    • TABLE_NAME: ein Tabellenname für Ihre Apache Spark-Tabelle.
    • WAREHOUSE_DIRECTORY: die URI des Cloud Storage-Bucket, in dem Ihre Daten gespeichert sind.
    • PROJECT_ID: die ID des zu verwendenden Projekts. Google Cloud
    • LOCATION: der Standort der Verbindung.
    • CONNECTION_ID: die ID der Verbindung.

Richtlinien zur Zugriffssteuerung festlegen

Sie können die detaillierte Zugriffssteuerung für Apache Iceberg-Tabellen im Lakehouse-Laufzeitkatalog aktivieren, indem Sie Richtlinien zur Zugriffssteuerung konfigurieren. Sie können Richtlinien zur Zugriffssteuerung nur für Tabellen festlegen, die eine Überschreibung für eine BigQuery-Verbindung verwenden. Sie können diese Richtlinien auf folgende Weise festlegen:

Nachdem Sie Ihre Richtlinien für die detaillierte Zugriffssteuerung konfiguriert haben, können Sie die Tabelle mit dem folgenden Beispiel aus Apache Spark abfragen:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("Lakehouse runtime catalog Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

Ersetzen Sie Folgendes:

  • CATALOG_NAME: der Name Ihres Katalogs.
  • PROJECT_ID: die ID des Projekts, das Ihre BigQuery-Ressourcen enthält.
  • LOCATION: der Standort der BigQuery-Ressourcen.
  • WAREHOUSE_DIRECTORY: die URI des Cloud Storage-Ordners, der Ihr Data Warehouse enthält.
  • MATERIALIZATION_NAMESPACE: der Namespace, in dem Sie temporäre Ergebnisse speichern möchten.
  • DATASET_NAME: der Name des Datasets, das die Tabelle enthält, die Sie abfragen.
  • ICEBERG_TABLE_NAME: der Name der Tabelle, die Sie abfragen.

Nächste Schritte