Zusätzliche Katalogfunktionen für die Lakehouse-Laufzeit

Sie können die Konfiguration Ihres Lakehouse-Laufzeitkatalogs mit den folgenden zusätzlichen Funktionen anpassen:

  • Apache Spark Iceberg-Prozeduren.
  • Die Filteroption für nicht unterstützte Tabellen.
  • Überschreibungen für BigQuery-Verbindungen.
  • Richtlinien zur Zugriffssteuerung für Iceberg-Tabellen im Lakehouse-Laufzeitkatalog.

Apache Iceberg-Spark-Prozeduren verwenden

Wenn Sie Apache Spark-Prozeduren verwenden möchten, müssen Sie Apache Iceberg-SQL-Erweiterungen in Ihre Apache Spark-Konfiguration aufnehmen. Sie können beispielsweise eine Prozedur erstellen, um ein Rollback auf einen vorherigen Zustand durchzuführen.

Interaktives Apache Spark SQL verwenden, um einen vorherigen Zustand wiederherzustellen

Sie können eine Apache Spark-Prozedur verwenden, um eine Tabelle zu erstellen, zu ändern und ein Rollback auf ihren vorherigen Zustand durchzuführen. Beispiel:

  1. Apache Spark-Tabelle erstellen:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Ersetzen Sie Folgendes:

    • BIGLAKE_ICEBERG_CATALOG_JAR: Der Cloud Storage-URI des benutzerdefinierten Apache Iceberg-Katalog-Plug-ins, das verwendet werden soll. Wählen Sie je nach Apache Iceberg-Versionsnummer eine der folgenden Optionen aus:
      • Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
      • Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
    • CATALOG_NAME: Der Katalogname, der auf Ihre Apache Spark-Tabelle verweist.
    • PROJECT_ID: die ID des Google Cloud -Projekts.

    • WAREHOUSE_DIRECTORY: Der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Ersetzen Sie Folgendes:

    • NAMESPACE_NAME: Der Namespace-Name, der auf Ihre Apache Spark-Tabelle verweist.
    • TABLE_NAME: Ein Tabellenname, der auf Ihre Apache Spark-Tabelle verweist.

    Die Ausgabe enthält Details zur Tabellenkonfiguration:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Ändern Sie die Tabelle noch einmal und führen Sie dann ein Rollback zum zuvor erstellten Snapshot 1659239298328512231 durch:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Ersetzen Sie Folgendes:

    • SNAPSHOT_ID: die ID des Snapshots, zu dem Sie ein Rollback durchführen.

    Die Ausgabe sieht etwa so aus:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Nicht unterstützte Tabellen aus Tabellenauflistungsfunktionen filtern

Wenn Sie Apache Spark SQL mit dem Lakehouse-Laufzeitkatalog verwenden, werden mit dem Befehl SHOW TABLES alle Tabellen im angegebenen Namespace angezeigt, auch solche, die nicht mit Apache Spark kompatibel sind.

Wenn Sie nur unterstützte Tabellen anzeigen möchten, aktivieren Sie die Option filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Ersetzen Sie Folgendes:

  • BIGLAKE_ICEBERG_CATALOG_JAR: Der Cloud Storage-URI des benutzerdefinierten Apache Iceberg-Katalog-Plug-ins, das verwendet werden soll. Wählen Sie je nach Apache Iceberg-Versionsnummer eine der folgenden Optionen aus:
    • Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
    • Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
  • CATALOG_NAME: Der Name des zu verwendenden Apache Spark-Katalogs.
  • PROJECT_ID: die ID des zu verwendenden Google Cloud -Projekts.
  • LOCATION: Der Standort der BigQuery-Ressourcen.
  • WAREHOUSE_DIRECTORY: Der Cloud Storage-Ordner, der als Data Warehouse verwendet werden soll.

BigQuery-Verbindungsüberschreibung festlegen

Mit BigQuery-Verbindungen können Sie auf Daten zugreifen, die außerhalb von BigQuery gespeichert sind, z. B. in Cloud Storage.

So legen Sie eine BigQuery-Verbindungsüberschreibung fest, die Zugriff auf einen Cloud Storage-Bucket bietet:

  1. Erstellen Sie in Ihrem BigQuery-Projekt eine neue Verbindung zu Ihrer Cloud Storage-Ressource. Über diese Verbindung wird definiert, wie BigQuery auf Ihre Daten zugreift.

  2. Weisen Sie dem Nutzer oder Dienstkonto, das auf die Daten zugreift, die Rolle roles/bigquery.connectionUser für die Verbindung zu.

    Die Verbindungsressource muss denselben Standort wie die Zielressourcen in BigQuery haben. Weitere Informationen finden Sie unter Verbindungen verwalten.

  3. Geben Sie die Verbindung in Ihrer Apache Iceberg-Tabelle mit dem Attribut bq_connection an:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Ersetzen Sie Folgendes:

    • TABLE_NAME: Ein Tabellenname für Ihre Apache Spark-Tabelle.
    • WAREHOUSE_DIRECTORY: Der URI des Cloud Storage-Bucket, in dem Ihre Daten gespeichert sind.
    • PROJECT_ID: die ID des zu verwendenden Google Cloud -Projekts.
    • LOCATION: der Standort der Verbindung.
    • CONNECTION_ID: die ID der Verbindung.

Richtlinien zur Zugriffssteuerung festlegen

Sie können die detaillierte Zugriffssteuerung für Apache Iceberg-Tabellen im Lakehouse-Laufzeitkatalog aktivieren, indem Sie Zugriffssteuerungsrichtlinien konfigurieren. Sie können Zugriffssteuerungsrichtlinien nur für Tabellen festlegen, für die ein BigQuery-Verbindungs-Override verwendet wird. Sie können diese Richtlinien auf folgende Weise festlegen:

Nachdem Sie Ihre FGAC-Richtlinien konfiguriert haben, können Sie die Tabelle mit dem folgenden Beispiel über Apache Spark abfragen:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("Lakehouse runtime catalog Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE NAMESPACE IF NOT EXISTS MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

Ersetzen Sie Folgendes:

  • CATALOG_NAME: Der Name Ihres Katalogs.
  • PROJECT_ID: die ID des Projekts, das Ihre BigQuery-Ressourcen enthält.
  • LOCATION: Der Standort der BigQuery-Ressourcen.
  • WAREHOUSE_DIRECTORY: Der URI des Cloud Storage-Ordners, der Ihr Data Warehouse enthält.
  • MATERIALIZATION_NAMESPACE: Der Namespace, in dem Sie temporäre Ergebnisse speichern möchten.
  • DATASET_NAME: der Name des Datasets, das die Tabelle enthält, die Sie abfragen.
  • ICEBERG_TABLE_NAME: der Name der Tabelle, die Sie abfragen.

Nächste Schritte