Pour personnaliser la configuration de votre catalogue d'environnements d'exécution Lakehouse, vous pouvez utiliser les fonctionnalités supplémentaires suivantes :
- Procédures Apache Spark Iceberg
- Option de filtre pour les tables non compatibles
- Remplacements de connexion BigQuery
- Stratégies de contrôle des accès pour les tables Iceberg du catalogue d'environnements d'exécution Lakehouse
Utiliser des procédures Apache Iceberg Spark
Pour utiliser des procédures Apache Spark, vous devez inclure des extensions SQL Apache Iceberg dans votre configuration Apache Spark. Par exemple, vous pouvez créer une procédure pour revenir à un état antérieur.
Utiliser Apache Spark SQL interactif pour effectuer un rollback vers un état antérieur
Vous pouvez utiliser une procédure Apache Spark pour créer, modifier et rétablir l'état antérieur d'une table. Exemple :
Créer une table Apache Spark :
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Remplacez les éléments suivants :
BIGLAKE_ICEBERG_CATALOG_JAR: URI Cloud Storage du plug-in de catalogue personnalisé Apache Iceberg à utiliser. En fonction du numéro de version d'Apache Iceberg, sélectionnez l'une des options suivantes :- Iceberg 1.9.1:
gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar - Iceberg 1.6.1:
gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
- Iceberg 1.9.1:
CATALOG_NAME: nom du catalogue qui fait référence à votre table Apache Spark.PROJECT_ID: ID du Google Cloud projet.WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
USE `CATALOG_NAME`; CREATE NAMESPACE NAMESPACE_NAME; USE NAMESPACE NAMESPACE_NAME; CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'; INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row"); DESCRIBE EXTENDED TABLE_NAME;
Remplacez les éléments suivants :
NAMESPACE_NAME: nom de l'espace de noms qui fait référence à votre table Apache Spark.TABLE_NAME: nom de table qui fait référence à votre table Apache Spark.
Le résultat contient des informations sur la configuration de la table :
... Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd] ...
Modifiez à nouveau la table, puis rétablissez l'instantané créé précédemment
1659239298328512231:ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double); INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5); SELECT * FROM TABLE_NAME; CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID); SELECT * FROM TABLE_NAME;
Remplacez les éléments suivants :
SNAPSHOT_ID: ID de l'instantané que vous rétablissez.
Le résultat ressemble à ce qui suit :
1 first row Time taken: 0.997 seconds, Fetched 1 row(s)
Filtrer les tables non compatibles à partir des fonctions de liste de tables
Lorsque vous utilisez Apache Spark SQL avec le catalogue d'environnements d'exécution Lakehouse, la commande SHOW TABLES affiche toutes les tables de l'espace de noms spécifié, même celles qui ne sont pas compatibles avec Apache Spark.
Pour n'afficher que les tables compatibles, activez l'option filter_unsupported_tables :
spark-sql --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"
Remplacez les éléments suivants :
BIGLAKE_ICEBERG_CATALOG_JAR: URI Cloud Storage du plug-in de catalogue personnalisé Apache Iceberg à utiliser. En fonction du numéro de version d'Apache Iceberg, sélectionnez l'une des options suivantes :- Iceberg 1.9.1:
gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar - Iceberg 1.6.1:
gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
- Iceberg 1.9.1:
CATALOG_NAME: nom du catalogue Apache Spark à utiliser.PROJECT_ID: ID du Google Cloud projet à utiliser.LOCATION: emplacement des ressources BigQuery.WAREHOUSE_DIRECTORY: dossier Cloud Storage à utiliser comme entrepôt de données.
Définir un remplacement de connexion BigQuery
Vous pouvez utiliser des connexions BigQuery pour accéder à des données stockées en dehors de BigQuery, par exemple dans Cloud Storage.
Pour définir un remplacement de connexion BigQuery qui donne accès à un bucket Cloud Storage, procédez comme suit :
Dans votre projet BigQuery, créez une connexion à votre ressource Cloud Storage. Cette connexion définit la manière dont BigQuery accède à vos données.
Accordez le rôle
roles/bigquery.connectionUsersur la connexion à l'utilisateur ou au compte de service qui accède aux données.Assurez-vous que la ressource de connexion partage le même emplacement que les ressources cibles dans BigQuery. Pour en savoir plus, consultez la page Gérer les connexions.
Spécifiez la connexion dans votre table Apache Iceberg avec la propriété
bq_connection:CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');
Remplacez les éléments suivants :
TABLE_NAME: nom de table pour votre table Apache Spark.WAREHOUSE_DIRECTORY: URI du bucket Cloud Storage qui stocke vos données.PROJECT_ID: ID du Google Cloud projet à utiliser.LOCATION: l'emplacement de la connexion.CONNECTION_ID: ID de la connexion.
Définir des stratégies de contrôle des accès
Vous pouvez activer le contrôle précis des accès (FGAC, Fine-Grained Access Control) sur les tables Apache Iceberg du catalogue d'environnements d'exécution Lakehouse en configurant des stratégies de contrôle des accès. Vous ne pouvez définir des stratégies de contrôle des accès que sur les tables qui utilisent un remplacement de connexion BigQuery. Vous pouvez définir ces stratégies de différentes manières :
Une fois que vous avez configuré vos stratégies FGAC, vous pouvez interroger la table à partir d'Apache Spark à l'aide de l'exemple suivant :
from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder \ .appName("Lakehouse runtime catalog Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") # Configure spark for storing temp results spark.conf.set("viewsEnabled","true") spark.sql("CREATE NAMESPACE IF NOT EXISTS MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") spark.sql("USE NAMESPACE DATASET_NAME;") sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
Remplacez les éléments suivants :
CATALOG_NAME: nom de votre catalogue.PROJECT_ID: ID du projet contenant vos ressources BigQuery.LOCATION: the emplacement des ressources BigQuery.WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage contenant votre entrepôt de données.MATERIALIZATION_NAMESPACE: espace de noms dans lequel vous souhaitez stocker les résultats temporaires.DATASET_NAME: nom de l'ensemble de données contenant la table que vous interrogez.ICEBERG_TABLE_NAME: nom de la table que vous interrogez.