Configurer BigLake Metastore pour Managed Service pour Apache Spark et Spark à l'aide d'Iceberg 1.9

Ce document explique comment configurer le catalogue Iceberg personnalisé pour BigQuery dans BigLake Metastore avec soitManaged Service pour Apache Spark ou Managed Service pour Apache Spark. Cela vous permet de créer un métastore unique et partagé qui fonctionne avec des moteurs Open Source, tels qu'Apache Spark ou Apache Flink.

Avant de commencer

  1. Activez la facturation pour votre Google Cloud projet. Découvrez comment vérifier si la facturation est activée sur un projet.
  2. Activez les API BigQuery et Managed Service pour Apache Spark.

    Activer les API

  3. Comprendre BigLake Metastore.

Rôles requis

Pour obtenir les autorisations nécessaires pour configurer BigLake Metastore, demandez à votre administrateur de vous accorder les rôles IAM suivants :

Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.

Configurer votre métastore avec Managed Service pour Apache Spark

Vous pouvez configurer BigLake Metastore avec Managed Service pour Apache Spark à l'aide de Spark ou de Flink :

Spark

  1. Configurez un cluster. Pour créer un cluster Managed Service pour Apache Spark, exécutez la commande gcloud dataproc clusters create suivante, qui contient les paramètres dont vous avez besoin pour utiliser BigLake Metastore :

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    Remplacez les éléments suivants :

    • CLUSTER_NAME: nom de votre cluster Managed Service pour Apache Spark.
    • PROJECT_ID : ID du Google Cloud projet dans lequel vous créez le cluster.
    • LOCATION: région Compute Engine dans laquelle vous créez le cluster.
  2. Envoyez un job Spark à l'aide de l'une des méthodes suivantes :

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,\
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,\
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,\
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du Google Cloud projet contenant le cluster Managed Service pour Apache Spark.
    • CLUSTER_NAME: nom du cluster Managed Service pour Apache Spark que vous utilisez pour exécuter le job Spark SQL.
    • REGION : région Compute Engine dans laquelle se trouve votre cluster .
    • BIGLAKE_ICEBERG_CATALOG_JAR: URI Cloud Storage du plug-in de catalogue personnalisé Iceberg à utiliser. En fonction du numéro de version d'Iceberg, sélectionnez l'une des options suivantes :
      • Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
      • Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
    • LOCATION: emplacement des ressources BigQuery.
    • CATALOG_NAME: nom du catalogue Spark à utiliser avec votre job SQL.
    • WAREHOUSE_DIRECTORY: dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence par gs://.
    • SPARK_SQL_COMMAND: requête Spark SQL que vous souhaitez exécuter. Cette requête inclut les commandes permettant de créer vos ressources. Par exemple, pour créer un espace de noms et une table.

    CLI spark-sql

    1. Dans la Google Cloud console, accédez à la page Instances de VM.

      Accéder à la page "Instances de VM"

    2. Pour vous connecter à une instance de VM Managed Service pour Apache Spark, cliquez sur SSH dans la ligne qui répertorie le nom de l'instance de VM principale du cluster Managed Service pour Apache Spark, qui correspond au nom du cluster suivi du suffixe -m. Le résultat ressemble à ce qui suit :

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. Dans le terminal, exécutez la commande d'initialisation BigLake Metastore suivante :

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,BIGLAKE_ICEBERG_CATALOG_JAR \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      Remplacez les éléments suivants :

      • BIGLAKE_ICEBERG_CATALOG_JAR: URI Cloud Storage du plug-in de catalogue personnalisé Iceberg à utiliser. En fonction du numéro de version d'Iceberg, sélectionnez l'une des options suivantes :
        • Iceberg 1.9.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.9.1-1.0.1.jar
        • Iceberg 1.6.1: gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.2.jar
      • CATALOG_NAME: nom du catalogue Spark que vous utilisez avec votre job SQL.
      • PROJECT_ID : ID du Google Cloud projet du catalogue BigLake Metastore auquel votre catalogue Spark est associé.
      • LOCATION :emplacement Google Cloud de BigLake Metastore.
      • WAREHOUSE_DIRECTORY: dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence par gs://.

      Une fois que vous êtes connecté au cluster, votre terminal Spark affiche l'invite spark-sql, que vous pouvez utiliser pour envoyer des jobs Spark.

      spark-sql (default)>
      
  1. Créez un cluster Managed Service pour Apache Spark avec le composant Flink facultatif activé, et assurez-vous d'utiliser Managed Service pour Apache Spark 2.2 ou une version ultérieure.
  2. Dans la Google Cloud console, accédez à la page Instances de VM.

    Accéder à la page "Instances de VM"

  3. Dans la liste des instances de machine virtuelle, cliquez sur SSH pour vous connecter à l'instance de VM principale du cluster Managed Service pour Apache Spark, qui est répertoriée comme nom de cluster suivi du suffixe -m.

  4. Configurez le plug-in de catalogue personnalisé Iceberg pour BigLake Metastore :

    FLINK_VERSION=1.19
    ICEBERG_VERSION=1.6.1
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.2.jar lib/
  5. Démarrez la session Flink sur YARN :

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Créez un catalogue dans Flink :

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp_project'='PROJECT_ID',
    'gcp_location'='LOCATION'
    );

    Remplacez les éléments suivants :

    • CATALOG_NAME: identifiant du catalogue Flink, associé à un catalogue BigLake Metastore.
    • WAREHOUSE_DIRECTORY: chemin d'accès de base au répertoire de l'entrepôt (dossier Cloud Storage dans lequel Flink crée des fichiers). Cette valeur commence par gs://.
    • PROJECT_ID: ID du projet du catalogue BigLake Metastore auquel le catalogue Flink est associé.
    • LOCATION: the location of the BigQuery resources.

Votre session Flink est désormais connectée à BigLake Metastore, et vous pouvez exécuter des commandes Flink SQL.

Maintenant que vous êtes connecté à BigLake Metastore, vous pouvez créer et afficher des ressources en fonction des métadonnées stockées dans BigLake Metastore.

Par exemple, essayez d'exécuter les commandes suivantes dans votre session Flink SQL interactive pour créer une base de données et une table Iceberg.

  1. Utilisez le catalogue Iceberg personnalisé :

    USE CATALOG CATALOG_NAME;

    Remplacez CATALOG_NAME par l'identifiant de votre catalogue Flink.

  2. Créez une base de données, ce qui crée un ensemble de données dans BigQuery :

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Remplacez DATABASE_NAME par le nom de votre nouvelle base de données.

  3. Utilisez la base de données que vous avez créée :

    USE DATABASE_NAME;
  4. Créez une table Iceberg. La commande suivante crée un exemple de table de ventes :

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    Remplacez ICEBERG_TABLE_NAME par le nom de votre nouvelle table.

  5. Affichez les métadonnées de la table :

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Répertoriez les tables de la base de données :

    SHOW TABLES;

Ingérer des données dans votre table

Après avoir créé une table Iceberg dans la section précédente, vous pouvez utiliser Flink DataGen comme source de données pour ingérer des données en temps réel dans votre table. Les étapes suivantes illustrent cet exemple de workflow :

  1. Créez une table temporaire à l'aide de DataGen :

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Remplacez les éléments suivants :

    • DATABASE_NAME: nom de la base de données dans laquelle stocker votre table temporaire.
    • TEMP_TABLE_NAME : nom de votre table temporaire.
    • ICEBERG_TABLE_NAME: nom de la table Iceberg que vous avez créée dans la section précédente.
  2. Définissez le parallélisme sur 1 :

    SET 'parallelism.default' = '1';
  3. Définissez l'intervalle de point de contrôle :

    SET 'execution.checkpointing.interval' = '10second';
  4. Définissez le point de contrôle :

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Démarrez le job de streaming en temps réel :

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    Le résultat ressemble à ce qui suit :

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Pour vérifier l'état du job de streaming, procédez comme suit :

    1. Dans la Google Cloud console, accédez à la page Clusters.

      accéder aux clusters

    2. Sélectionnez votre cluster.

    3. Cliquez sur l'onglet Interfaces Web.

    4. Cliquez sur le lien YARN ResourceManager.

    5. Dans l'interface YARN ResourceManager, recherchez votre session Flink, puis cliquez sur le lien ApplicationMaster sous Tracking UI.

    6. Dans la colonne État, vérifiez que l'état de votre job est Running (En cours d'exécution).

  7. Interrogez les données de streaming dans le client Flink SQL :

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Interrogez les données de streaming dans BigQuery :

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Arrêtez le job de streaming dans le client Flink SQL :

    STOP JOB 'JOB_ID';

    Remplacez JOB_ID par l'ID de job qui s'est affiché dans le résultat lorsque vous avez créé le job de streaming.

Configurer votre métastore avec Managed Service pour Apache Spark

Vous pouvez configurer BigLake Metastore avec Managed Service pour Apache Spark à l'aide de Spark SQL ou de PySpark.

Spark SQL

  1. Créez un fichier SQL avec les commandes Spark SQL que vous souhaitez exécuter dans BigLake Metastore. Par exemple, cette commande crée un espace de noms et une table :

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Remplacez les éléments suivants :

    • CATALOG_NAME: nom du catalogue qui référence votre table Spark.
    • NAMESPACE_NAME: nom de l'espace de noms qui référence votre table Spark.
    • TABLE_NAME: nom de table pour votre table Spark.
    • WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
  2. Envoyez un job par lot Spark SQL en exécutant la commande gcloud dataproc batches submit spark-sql suivante :

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Remplacez les éléments suivants :

    • SQL_SCRIPT_PATH: chemin d'accès au fichier SQL utilisé par le job par lot.
    • PROJECT_ID : ID du Google Cloud projet dans lequel exécuter le job par lot.
    • REGION : région dans laquelle votre charge de travail s'exécute.
    • SUBNET_NAME (facultatif) : nom d'un sous-réseau VPC dans le REGION qui répond aux exigences du sous-réseau de session.
    • BUCKET_PATH: emplacement du bucket Cloud Storage dans lequel importer les dépendances de la charge de travail. Le WAREHOUSE_DIRECTORY se trouve dans ce bucket. Le préfixe d'URI gs:// du bucket n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès au bucket ou le nom du bucket, par exemple, mybucketname1.
    • LOCATION : emplacement dans lequel exécuter le job par lot.

    Pour en savoir plus sur l'envoi de jobs par lot Spark, consultez Exécuter une charge de travail par lot Spark.

PySpark

  1. Créez un fichier Python avec les commandes PySpark que vous souhaitez exécuter dans BigLake Metastore.

    Par exemple, la commande suivante configure un environnement Spark pour interagir avec les tables Iceberg stockées dans BigLake Metastore. La commande crée ensuite un espace de noms et une table Iceberg dans cet espace de noms.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du Google Cloud projet dans lequel exécuter le job par lot.
    • LOCATION : emplacement des ressources BigQuery.
    • CATALOG_NAME: nom du catalogue qui référence votre table Spark.
    • TABLE_NAME: nom de table pour votre table Spark.
    • WAREHOUSE_DIRECTORY: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
    • NAMESPACE_NAME: nom de l'espace de noms qui référence votre table Spark.
  2. Envoyez le job par lot à l'aide de la commande suivante gcloud dataproc batches submit pyspark:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    Remplacez les éléments suivants :

    • PYTHON_SCRIPT_PATH: chemin d'accès au script Python utilisé par le job par lot.
    • PROJECT_ID : ID du Google Cloud projet dans lequel exécuter le job par lot.
    • REGION : région dans laquelle votre charge de travail s'exécute.
    • BUCKET_PATH: emplacement du bucket Cloud Storage dans lequel importer les dépendances de la charge de travail. Le préfixe d'URI gs:// du bucket n'est pas obligatoire. Vous pouvez spécifier le chemin d'accès au bucket ou le nom du bucket, par exemple, mybucketname1.

    Pour en savoir plus sur l'envoi de jobs par lot PySpark, consultez la documentation de référence de gcloud pour PySpark.

Étape suivante