Ce document explique comment configurer le catalogue Apache Iceberg personnalisé pour BigQuery dans le catalogue d'exécution Lakehouse.
Vous pouvez effectuer cette configuration à l'aide d'un cluster Managed Service pour Apache Spark ou de Managed Service pour Apache Spark. Cela crée un catalogue unique et partagé dans Google Cloud Lakehouse qui fonctionne de manière transparente avec des moteurs Open Source tels qu'Apache Spark et Apache Flink.
Avant de commencer
- Activez la facturation pour votre Google Cloud projet. Découvrez comment vérifier si la facturation est activée sur un projet.
Activez les API BigQuery et Managed Service pour Apache Spark.
Comprendre le catalogue d'exécution Lakehouse.
Rôles requis
Pour obtenir les autorisations nécessaires pour configurer le catalogue d'exécution Lakehouse, demandez à votre administrateur de vous accorder les rôles IAM suivants :
-
Créer un cluster Managed Service pour Apache Spark:
Nœud de calcul Dataproc (
roles/dataproc.worker) sur le compte de service Compute Engine par défaut dans le projet -
Créer des tables de catalogue d'exécution Lakehouse :
-
Nœud de calcul Dataproc (
roles/dataproc.worker) sur le compte de service de VM Managed Service pour Apache Spark dans le projet -
Éditeur de données BigQuery (
roles/bigquery.dataEditor) sur le compte de service de VM Managed Service pour Apache Spark dans le projet -
Utilisateur d'objets Storage (
roles/storage.objectUser) sur le compte de service de VM Managed Service pour Apache Spark dans le projet
-
Nœud de calcul Dataproc (
-
Interroger les tables de catalogue d'exécution Lakehouse :
-
Lecteur de données BigQuery (
roles/bigquery.dataViewer) sur le projet -
Utilisateur BigQuery (
roles/bigquery.user) sur le projet -
Lecteur d'objets Storage (
roles/storage.objectViewer) sur le projet
-
Lecteur de données BigQuery (
Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.
Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.
Configurer votre metastore avec Managed Service pour Apache Spark
Vous pouvez configurer le catalogue d'exécution Lakehouse avec Managed Service pour Apache Spark à l'aide de Spark ou de Flink :
Spark
Configurez un nouveau cluster. Pour créer un cluster Managed Service pour Apache Spark, exécutez la commande
gcloud dataproc clusters createsuivante, qui contient les paramètres dont vous avez besoin pour utiliser le catalogue d'exécution Lakehouse :gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
Remplacez les éléments suivants :
CLUSTER_NAME: nom de votre cluster Managed Service pour Apache Spark.PROJECT_ID: ID du Google Cloud projet dans lequel vous créez le cluster.LOCATION: région Compute Engine dans laquelle vous créez le cluster.
Envoyez un job Spark à l'aide de l'une des méthodes suivantes :
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\ spark.sql.catalog.CATALOG_NAME.type=bigquery,\ spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID,\ spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION,\ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Remplacez les éléments suivants :
PROJECT_ID: ID du Google Cloud projet contenant le cluster Managed Service pour Apache Spark.CLUSTER_NAME: nom du cluster Managed Service pour Apache Spark que vous utilisez pour exécuter le job Spark SQL.REGION: région Compute Engine dans laquelle se trouve votre cluster .LOCATION: emplacement des ressources BigQuery.CATALOG_NAME: nom du catalogue Spark à utiliser avec votre job SQL.WAREHOUSE_DIRECTORY: dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence pargs://.SPARK_SQL_COMMAND: requête Spark SQL que vous souhaitez exécuter. Cette requête inclut les commandes permettant de créer vos ressources. Par exemple, pour créer un espace de noms et une table.
CLI spark-sql
Dans la Google Cloud console, accédez à la page Instances de VM.
Pour vous connecter à une instance de VM Managed Service pour Apache Spark, cliquez sur SSH dans la ligne qui répertorie le nom de l'instance de VM principale du cluster Managed Service pour Apache Spark, qui correspond au nom du cluster suivi du suffixe
-m. Le résultat ressemble à ce qui suit :Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$Dans le terminal, exécutez la commande d'initialisation du catalogue d'exécution Lakehouse suivante :
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.type=bigquery \ --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Remplacez les éléments suivants :
CATALOG_NAME: nom du catalogue Spark que vous utilisez avec votre job SQL.PROJECT_ID: ID de projet du catalogue d'exécution Lakehouse auquel votre catalogue Spark est associé. Google CloudLOCATION:emplacement Google Cloud du catalogue d'exécution Lakehouse.WAREHOUSE_DIRECTORY: dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence pargs://.
Une fois que vous vous êtes connecté au cluster, l'invite
spark-sqls'affiche dans votre terminal Spark. Vous pouvez l'utiliser pour envoyer des jobs Spark.spark-sql (default)>
Flink
- Créez un cluster Managed Service pour Apache Spark avec le composant Flink facultatif activé,
et assurez-vous d'utiliser Managed Service pour Apache Spark
2.2ou une version ultérieure. Dans la Google Cloud console, accédez à la page Instances de VM.
Dans la liste des instances de machine virtuelle, cliquez sur SSH pour vous connecter à l'instance de VM principale du cluster Managed Service pour Apache Spark, qui est répertoriée comme le nom du cluster suivi du suffixe
-m.Configurez le plug-in de catalogue personnalisé Apache Iceberg pour le catalogue d'exécution Lakehouse :
FLINK_VERSION=1.20 ICEBERG_VERSION=1.10.0 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/${ICEBERG_VERSION}/iceberg-bigquery-${ICEBERG_VERSION}.jar -P lib sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/${ICEBERG_VERSION}/iceberg-gcp-bundle-${ICEBERG_VERSION}.jar -P lib sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/${ICEBERG_VERSION}/iceberg-gcp-${ICEBERG_VERSION}.jar -P lib
Démarrez la session Flink sur YARN :
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Créez un catalogue dans Flink :
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp.bigquery.project-id'='PROJECT_ID', 'gcp.bigquery.location'='LOCATION' );
Remplacez les éléments suivants :
CATALOG_NAME: identifiant du catalogue Flink, qui est associé à un catalogue d'exécution Lakehouse.WAREHOUSE_DIRECTORY: chemin d'accès de base au répertoire de l'entrepôt (dossier Cloud Storage dans lequel Flink crée des fichiers). Cette valeur commence pargs://.PROJECT_ID: ID de projet du catalogue d'exécution Lakehouse auquel le catalogue Flink est associé.LOCATION: the location of the BigQuery resources.
Votre session Flink est désormais connectée au catalogue d'exécution Lakehouse, et vous pouvez exécuter des commandes Flink SQL.
Gérer les ressources du catalogue d'exécution Lakehouse
Maintenant que vous êtes connecté au catalogue d'exécution Lakehouse, vous pouvez créer et afficher des ressources en fonction des métadonnées stockées dans le catalogue d'exécution Lakehouse.
Par exemple, essayez d'exécuter les commandes suivantes dans votre session Flink SQL interactive pour créer une base de données et une table Apache Iceberg.
Utilisez le catalogue Apache Iceberg personnalisé :
USE CATALOG CATALOG_NAME;
Remplacez
CATALOG_NAMEpar l'identifiant de votre catalogue Flink.Créez une base de données, ce qui crée un ensemble de données dans BigQuery :
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Remplacez
DATABASE_NAMEpar le nom de votre nouvelle base de données.Utilisez la base de données que vous avez créée :
USE DATABASE_NAME;
Créez une table Apache Iceberg. La commande suivante crée un exemple de table des ventes :
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Remplacez
ICEBERG_TABLE_NAMEpar le nom de votre nouvelle table.Affichez les métadonnées de la table :
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Répertoriez les tables de la base de données :
SHOW TABLES;
Ingérer des données dans votre table
Après avoir créé une table Apache Iceberg dans la section précédente, vous pouvez utiliser Flink DataGen comme source de données pour ingérer des données en temps réel dans votre table. Les étapes suivantes illustrent cet exemple de workflow :
Créez une table temporaire à l'aide de DataGen :
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Remplacez les éléments suivants :
DATABASE_NAME: nom de la base de données dans laquelle stocker votre table temporaire.TEMP_TABLE_NAME: nom de votre table temporaire.ICEBERG_TABLE_NAME: nom de la table Apache Iceberg que vous avez créée dans la section précédente.
Définissez le parallélisme sur 1 :
SET 'parallelism.default' = '1';
Définissez l'intervalle de point de contrôle :
SET 'execution.checkpointing.interval' = '10second';
Définissez le point de contrôle :
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Démarrez le job de streaming en temps réel :
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
Le résultat ressemble à ce qui suit :
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
Pour vérifier l'état du job de streaming, procédez comme suit :
Dans la Google Cloud console, accédez à la page Clusters.
Sélectionnez votre cluster.
Cliquez sur l'onglet Interfaces Web.
Cliquez sur le lien YARN ResourceManager.
Dans l'interface YARN ResourceManager, recherchez votre session Flink, puis cliquez sur le lien ApplicationMaster sous Tracking UI.
Dans la colonne État, vérifiez que l'état de votre job est En cours d'exécution.
Interrogez les données de streaming dans le client Flink SQL :
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Interrogez les données de streaming dans BigQuery :
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Arrêtez le job de streaming dans le client Flink SQL :
STOP JOB 'JOB_ID';
Remplacez
JOB_IDpar l'ID de job qui s'est affiché dans le résultat lorsque vous avez créé le job de streaming.
Configurer votre metastore avec Managed Service pour Apache Spark
Vous pouvez configurer le catalogue d'exécution Lakehouse avec Managed Service pour Apache Spark à l'aide de Spark SQL ou de PySpark.
Spark SQL
Créez un fichier SQL avec les commandes Spark SQL que vous souhaitez exécuter dans le catalogue d'exécution Lakehouse. Par exemple, cette commande crée un espace de noms et une table :
SET `spark.sql.catalog.CATALOG_NAME`=`org.apache.iceberg.spark.SparkCatalog`; SET `spark.sql.catalog.CATALOG_NAME.type`=`bigquery`; SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id`=`PROJECT_ID`; SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location`=`LOCATION`; SET `spark.sql.catalog.CATALOG_NAME.warehouse`=`WAREHOUSE_DIRECTORY`; CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
Remplacez les éléments suivants :
CATALOG_NAME: nom du catalogue qui fait référence à votre table Spark.NAMESPACE_NAME: nom de l'espace de noms qui fait référence à votre table Spark.TABLE_NAME: nom de votre table Spark.WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
Envoyez un job par lot Spark SQL en exécutant la commande
gcloud dataproc batches submit spark-sqlsuivante :gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar
Remplacez les éléments suivants :
SQL_SCRIPT_PATH: chemin d'accès au fichier SQL utilisé par le job par lot.PROJECT_ID: ID du Google Cloud projet dans lequel exécuter le job par lot.REGION: région dans laquelle votre charge de travail s'exécute.SUBNET_NAME(facultatif) : nom d'un sous-réseau VPC dans leREGIONqui répond aux exigences du sous-réseau de session.BUCKET_PATH: emplacement du bucket Cloud Storage dans lequel importer les dépendances de la charge de travail. LeWAREHOUSE_DIRECTORYse trouve dans ce bucket. Le préfixe d'URIgs://du bucket n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès au bucket ou le nom du bucket, par exemple,mybucketname1.LOCATION: emplacement dans lequel exécuter le job par lot.
Pour en savoir plus sur l'envoi de jobs par lot Spark, consultez Exécuter une charge de travail par lot Spark.
PySpark
Créez un fichier Python avec les commandes PySpark que vous souhaitez exécuter dans le catalogue d'exécution Lakehouse.
Par exemple, la commande suivante configure un environnement Spark pour interagir avec les tables Apache Iceberg stockées dans le catalogue d'exécution Lakehouse. La commande crée ensuite un espace de noms et une table Apache Iceberg dans cet espace de noms.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Lakehouse runtime catalog Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.type", "bigquery") \ .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
Remplacez les éléments suivants :
PROJECT_ID: ID du Google Cloud projet dans lequel exécuter le job par lot.LOCATION: emplacement des ressources BigQuery.CATALOG_NAME: nom du catalogue qui fait référence à votre table Spark.TABLE_NAME: nom de votre table Spark.WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.NAMESPACE_NAME: nom de l'espace de noms qui fait référence à votre table Spark.
Envoyez le job par lot à l'aide de la commande suivante
gcloud dataproc batches submit pyspark:gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar
Remplacez les éléments suivants :
PYTHON_SCRIPT_PATH: chemin d'accès au script Python utilisé par le job par lot.PROJECT_ID: ID du Google Cloud projet dans lequel exécuter le job par lot.REGION: région dans laquelle votre charge de travail s'exécute.BUCKET_PATH: emplacement du bucket Cloud Storage dans lequel importer les dépendances de la charge de travail. Le préfixe d'URIgs://du bucket n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès au bucket ou le nom du bucket, par exemple,mybucketname1.
Pour en savoir plus sur l'envoi de jobs par lot PySpark, consultez la documentation de référence de gcloud pour PySpark.
Étape suivante
- Créer et gérer des ressources de metastore.
- Configurer les fonctionnalités facultatives du catalogue d'exécution Lakehouse.