Funzionalità aggiuntive del catalogo runtime Lakehouse

Per personalizzare la configurazione del catalogo di runtime Lakehouse, puoi utilizzare le seguenti funzionalità aggiuntive:

  • Procedure Apache Spark Iceberg.
  • L'opzione di filtro per le tabelle non supportate.
  • Override della connessione BigQuery.
  • Criteri di controllo degli accessi per le tabelle Iceberg del catalogo di runtime Lakehouse.

Utilizza le procedure Apache Iceberg Spark

Per utilizzare le procedure Apache Spark, devi includere le estensioni SQL di Apache Iceberg nella configurazione di Apache Spark. Ad esempio, puoi creare una procedura per rollback uno stato precedente.

Utilizza Apache Spark-SQL interattivo per eseguire il rollback a uno stato precedente

Puoi utilizzare una procedura Apache Spark per creare, modificare ed eseguire il rollback di una tabella al suo stato precedente. Ad esempio:

  1. Crea una tabella Apache Spark:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Sostituisci quanto segue:

    • BIGLAKE_ICEBERG_CATALOG_JAR: l'URI Cloud Storage del plug-in del catalogo personalizzato Apache Iceberg da utilizzare. A seconda del numero di versione di Apache Iceberg, seleziona una delle seguenti opzioni:
      • Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
      • Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
    • CATALOG_NAME: il nome del catalogo che fa riferimento alla tabella Apache Spark.
    • PROJECT_ID: l'ID del Google Cloud progetto.

    • WAREHOUSE_DIRECTORY: l'URI della cartella Cloud Storage in cui è archiviato il data warehouse.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Sostituisci quanto segue:

    • NAMESPACE_NAME: il nome dello spazio dei nomi che fa riferimento alla tabella Apache Spark.
    • TABLE_NAME: un nome di tabella che fa riferimento alla tua tabella Apache Spark.

    L'output contiene i dettagli della configurazione della tabella:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Modifica di nuovo la tabella, quindi esegui il rollback allo snapshot 1659239298328512231 creato in precedenza:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Sostituisci quanto segue:

    • SNAPSHOT_ID: l'ID dello snapshot a cui stai eseguendo il rollback.

    L'output è simile al seguente:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Filtrare le tabelle non supportate dalle funzioni di elenco delle tabelle

Quando utilizzi Apache Spark SQL con il catalogo del runtime Lakehouse, il comando SHOW TABLES mostra tutte le tabelle nello spazio dei nomi specificato, anche quelle non compatibili con Apache Spark.

Per visualizzare solo le tabelle supportate, attiva l'opzione filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Sostituisci quanto segue:

  • BIGLAKE_ICEBERG_CATALOG_JAR: l'URI Cloud Storage del plug-in del catalogo personalizzato Apache Iceberg da utilizzare. A seconda del numero di versione di Apache Iceberg, seleziona una delle seguenti opzioni:
    • Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
    • Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
  • CATALOG_NAME: il nome del catalogo Apache Spark da utilizzare.
  • PROJECT_ID: l'ID del Google Cloud progetto da utilizzare.
  • LOCATION: la posizione delle risorse BigQuery.
  • WAREHOUSE_DIRECTORY: la cartella Cloud Storage da utilizzare come data warehouse.

Imposta una sostituzione della connessione BigQuery

Puoi utilizzare le connessioni BigQuery per accedere ai dati archiviati al di fuori di BigQuery, ad esempio in Cloud Storage.

Per impostare un override della connessione BigQuery che fornisca l'accesso a un bucket Cloud Storage, completa i seguenti passaggi:

  1. Nel progetto BigQuery, crea una nuova connessione alla risorsa Cloud Storage. Questa connessione definisce la modalità di accesso di BigQuery ai tuoi dati.

  2. Concedi all'utente o al account di servizio che accede ai dati il ruolo roles/bigquery.connectionUser sulla connessione.

    Assicurati che la risorsa di connessione condivida la stessa località delle risorse di destinazione in BigQuery. Per saperne di più, consulta Gestire le connessioni.

  3. Specifica la connessione nella tabella Apache Iceberg con la proprietà bq_connection:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Sostituisci quanto segue:

    • TABLE_NAME: il nome di una tabella per la tabella Apache Spark.
    • WAREHOUSE_DIRECTORY: l'URI del bucket Cloud Storage in cui sono archiviati i dati.
    • PROJECT_ID: l'ID del Google Cloud progetto da utilizzare.
    • LOCATION: la posizione della connessione.
    • CONNECTION_ID: l'ID della connessione.

Imposta i criteri di controllo dell'accesso

Puoi attivare il controllo dell'accesso granulare (FGAC) nelle tabelle Apache Iceberg del catalogo del runtime Lakehouse configurando i critericontrollo dell'accessoo dell'accesso. Puoi impostare criteri di controllo dell'accesso solo sulle tabelle che utilizzano un override della connessione BigQuery. Puoi impostare questi criteri nei seguenti modi:

Dopo aver configurato le policy FGAC, puoi eseguire query sulla tabella da Apache Spark utilizzando il seguente esempio:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("Lakehouse runtime catalog Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

Sostituisci quanto segue:

  • CATALOG_NAME: il nome del catalogo.
  • PROJECT_ID: l'ID del progetto che contiene le risorse BigQuery.
  • LOCATION: la posizione delle risorse BigQuery.
  • WAREHOUSE_DIRECTORY: l'URI della cartella Cloud Storage che contiene il data warehouse.
  • MATERIALIZATION_NAMESPACE: lo spazio dei nomi in cui vuoi archiviare i risultati temporanei.
  • DATASET_NAME: il nome del set di dati che contiene la tabella su cui stai eseguendo la query.
  • ICEBERG_TABLE_NAME: il nome della tabella che stai interrogando.

Passaggi successivi