Configura el catálogo del entorno de ejecución de Lakehouse para Managed Service para Apache Spark con Iceberg 1.10

La configuración del catálogo personalizado de Apache Iceberg para BigQuery conecta tus motores de Apache Spark y Apache Flink al catálogo de entorno de ejecución de Lakehouse.

Con esta integración establecida para Lakehouse para Apache Iceberg, creas una sola capa de metadatos compartida para administrar tus formatos de tabla abierta con clústeres de Managed Service para Apache Spark o Managed Service para Apache Spark.

Antes de comenzar

  1. Habilita la facturación en tu Google Cloud proyecto. Obtén información sobre cómo verificar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs de BigQuery y Managed Service para Apache Spark.

    Habilitar las API

  3. Comprende el catálogo de entorno de ejecución de Lakehouse.

Roles obligatorios

Para obtener los permisos que necesitas para configurar el catálogo de entorno de ejecución de Lakehouse, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.

Configura tu almacén de metadatos con Managed Service para Apache Spark

Puedes configurar el catálogo de entorno de ejecución de Lakehouse con Managed Service para Apache Spark con Spark o Flink:

Spark

  1. Configura un clúster nuevo. Para crear un clúster nuevo de Managed Service para Apache Spark, ejecuta el siguiente comando gcloud dataproc clusters create, que contiene la configuración que necesitas para usar el catálogo de entorno de ejecución de Lakehouse:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    Reemplaza lo siguiente:

    • CLUSTER_NAME: Un nombre para tu clúster de Managed Service para Apache Spark.
    • PROJECT_ID: El ID del proyecto en el que creas el clúster. Google Cloud
    • LOCATION: La región de Compute Engine en la que creas el clúster.
  2. Envía un trabajo de Spark con uno de los siguientes métodos:

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
        spark.sql.catalog.CATALOG_NAME.type=bigquery,\
        spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID,\
        spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION,\
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del Google Cloud proyecto que contiene el clúster de Managed Service para Apache Spark.
    • CLUSTER_NAME: El nombre del clúster de Managed Service para Apache Spark que usas para ejecutar el trabajo de Spark SQL.
    • REGION: La región de Compute Engine en la que se encuentra tu clúster.
    • LOCATION: La ubicación de los recursos de BigQuery.
    • CATALOG_NAME: El nombre del catálogo de Spark que se usará con tu trabajo de SQL.
    • WAREHOUSE_DIRECTORY: La carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza con gs://.
    • SPARK_SQL_COMMAND: La consulta en SQL de Spark que deseas ejecutar. Esta consulta incluye los comandos para crear tus recursos. Por ejemplo, para crear un espacio de nombres y una tabla.

    CLI de spark-sql

    1. En la Google Cloud consola de, ve a la páginaInstancias de VM.

      Ir a Instancias de VM

    2. Para conectarte a una instancia de VM de Managed Service para Apache Spark, haz clic en SSH en la fila que muestra el nombre de la instancia de VM principal del clúster de Managed Service para Apache Spark, que es el nombre del clúster seguido de un sufijo -m. El resultado es similar a lo siguiente:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. En la terminal, ejecuta el siguiente comando de inicialización del catálogo de entorno de ejecución de Lakehouse:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.type=bigquery \
          --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      Reemplaza lo siguiente:

      • CATALOG_NAME: El nombre del catálogo de Spark que usas con tu trabajo de SQL.
      • PROJECT_ID: El Google Cloud ID del proyecto del catálogo de entorno de ejecución de Lakehouse con el que se vincula tu catálogo de Spark.
      • LOCATION: La Google Cloud ubicación del catálogo de entorno de ejecución de Lakehouse.
      • WAREHOUSE_DIRECTORY: La carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza con gs://.

      Después de conectarte correctamente al clúster, tu terminal de Spark muestra el mensaje spark-sql, que puedes usar para enviar trabajos de Spark.

      spark-sql (default)>
      
  1. Crea un clúster de Managed Service para Apache Spark con el componente opcional de Flink habilitado, y asegúrate de usar Managed Service para Apache Spark 2.2 o una versión posterior.
  2. En la Google Cloud consola de, ve a la página Instancias de VM.

    Ir a Instancias de VM

  3. En la lista de instancias de máquina virtual, haz clic en SSH para conectarte a la instancia de VM principal del clúster de Managed Service para Apache Spark, que aparece como el nombre del clúster seguido de un sufijo -m.

  4. Configura el complemento de catálogo personalizado de Apache Iceberg para el catálogo de entorno de ejecución de Lakehouse:

    FLINK_VERSION=1.20
    ICEBERG_VERSION=1.10.0
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/${ICEBERG_VERSION}/iceberg-bigquery-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/${ICEBERG_VERSION}/iceberg-gcp-bundle-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/${ICEBERG_VERSION}/iceberg-gcp-${ICEBERG_VERSION}.jar -P lib
  5. Inicia la sesión de Flink en YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Crea un catálogo en Flink:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp.bigquery.project-id'='PROJECT_ID',
    'gcp.bigquery.location'='LOCATION'
    );

    Reemplaza lo siguiente:

    • CATALOG_NAME: El identificador de catálogo de Flink, que está vinculado a un catálogo de entorno de ejecución de Lakehouse.
    • WAREHOUSE_DIRECTORY: La ruta de acceso base para el directorio del almacén (la carpeta de Cloud Storage en la que Flink crea archivos). Este valor comienza con gs://.
    • PROJECT_ID: El ID del proyecto del catálogo de entorno de ejecución de Lakehouse con el que se vincula el catálogo de Flink.
    • LOCATION: La ubicación de los recursos de BigQuery.

Tu sesión de Flink ahora está conectada al catálogo de entorno de ejecución de Lakehouse y puedes ejecutar comandos de Flink SQL.

Ahora que estás conectado al catálogo de entorno de ejecución de Lakehouse, puedes crear y ver recursos en función de los metadatos almacenados en el catálogo de entorno de ejecución de Lakehouse.

Por ejemplo, intenta ejecutar los siguientes comandos en tu sesión interactiva de Flink SQL para crear una base de datos y una tabla de Apache Iceberg.

  1. Usa el catálogo personalizado de Apache Iceberg:

    USE CATALOG CATALOG_NAME;

    Reemplaza CATALOG_NAME por el identificador de catálogo de Flink.

  2. Crea una base de datos, que crea un conjunto de datos en BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Reemplaza DATABASE_NAME por el nombre de tu nueva base de datos.

  3. Usa la base de datos que creaste:

    USE DATABASE_NAME;
  4. Crea una tabla de Apache Iceberg. A continuación, se crea una tabla de ventas de ejemplo:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    Reemplaza ICEBERG_TABLE_NAME por un nombre para tu tabla nueva.

  5. Visualiza los metadatos de tablas:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Haz una lista de las tablas en la base de datos:

    SHOW TABLES;

Transfiere datos a tu tabla

Después de crear una tabla de Apache Iceberg en la sección anterior, puedes usar Flink DataGen como fuente de datos para transferir datos en tiempo real a tu tabla. Los siguientes pasos son un ejemplo de este flujo de trabajo:

  1. Crea una tabla temporal con DataGen:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Reemplaza lo siguiente:

    • DATABASE_NAME: El nombre de la base de datos para almacenar tu tabla temporal.
    • TEMP_TABLE_NAME: Un nombre para tu tabla temporal.
    • ICEBERG_TABLE_NAME: El nombre de la tabla de Apache Iceberg que creaste en la sección anterior.
  2. Establece el paralelismo en 1:

    SET 'parallelism.default' = '1';
  3. Establece el intervalo de puntos de control:

    SET 'execution.checkpointing.interval' = '10second';
  4. Establece el punto de control:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Inicia el trabajo de transmisión en tiempo real:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    El resultado es similar a lo siguiente:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Para verificar el estado del trabajo de transmisión, haz lo siguiente:

    1. En la Google Cloud consola de, ve a la página Clústeres.

      Ir a los clústeres

    2. Selecciona tu clúster.

    3. Haz clic en la pestaña Interfaces web.

    4. Haz clic en el vínculo YARN ResourceManager.

    5. En la interfaz de YARN ResourceManager , busca tu sesión de Flink y haz clic en el vínculo ApplicationMaster en IU de seguimiento.

    6. En la columna Estado, confirma que el estado de tu trabajo sea En ejecución.

  7. Consulta datos de transmisión en el cliente de Flink SQL:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Consulta datos de transmisión en BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Finaliza el trabajo de transmisión en el cliente de Flink SQL:

    STOP JOB 'JOB_ID';

    Reemplaza JOB_ID por el ID de trabajo que se mostró en el resultado cuando creaste el trabajo de transmisión.

Configura tu almacén de metadatos con Managed Service para Apache Spark

Puedes configurar el catálogo de entorno de ejecución de Lakehouse con Managed Service para Apache Spark con Spark SQL o PySpark.

Spark SQL

  1. Crea un archivo SQL con los comandos de Spark SQL que deseas ejecutar en el catálogo de entorno de ejecución de Lakehouse. Por ejemplo, este comando crea un espacio de nombres y una tabla:

    SET `spark.sql.catalog.CATALOG_NAME`=`org.apache.iceberg.spark.SparkCatalog`;
    SET `spark.sql.catalog.CATALOG_NAME.type`=`bigquery`;
    SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id`=`PROJECT_ID`;
    SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location`=`LOCATION`;
    SET `spark.sql.catalog.CATALOG_NAME.warehouse`=`WAREHOUSE_DIRECTORY`;
    
    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    Reemplaza lo siguiente:

    • CATALOG_NAME: El nombre del catálogo que hace referencia a tu tabla de Spark.
    • NAMESPACE_NAME: El nombre del espacio de nombres que hace referencia a tu tabla de Spark.
    • TABLE_NAME: Un nombre de tabla para tu tabla de Spark.
    • WAREHOUSE_DIRECTORY: El URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
  2. Para enviar un trabajo por lotes de Spark SQL, ejecuta el siguiente gcloud dataproc batches submit spark-sql comando:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar

    Reemplaza lo siguiente:

    • SQL_SCRIPT_PATH: La ruta de acceso al archivo SQL que usa el trabajo por lotes.
    • PROJECT_ID: El ID del Google Cloud proyecto en el que se ejecutará el trabajo por lotes.
    • REGION: La región en la que se ejecuta tu carga de trabajo.
    • SUBNET_NAME (opcional): El nombre de una subred de VPC en la REGION que cumple con los requisitos de subred de la sesión.
    • BUCKET_PATH: La ubicación del bucket de Cloud Storage para subir dependencias de la carga de trabajo. El WAREHOUSE_DIRECTORY se encuentra en este bucket. No se requiere el prefijo de URI gs:// del bucket. Puedes especificar la ruta de acceso o el nombre del bucket, por ejemplo, mybucketname1.
    • LOCATION: La ubicación en la que se ejecutará el trabajo por lotes.

    Para obtener más información sobre el envío de trabajos por lotes de Spark, consulta Ejecuta una carga de trabajo por lotes de Spark.

PySpark

  1. Crea un archivo de Python con los comandos de PySpark que deseas ejecutar en el catálogo de entorno de ejecución de Lakehouse.

    Por ejemplo, el siguiente comando configura un entorno de Spark para interactuar con las tablas de Apache Iceberg almacenadas en el catálogo de entorno de ejecución de Lakehouse. Luego, el comando crea un espacio de nombres nuevo y una tabla de Apache Iceberg dentro de ese espacio de nombres.

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("Lakehouse runtime catalog Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.type", "bigquery") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del Google Cloud proyecto en el que se ejecutará el trabajo por lotes.
    • LOCATION: la ubicación en la que se encuentran los recursos de BigQuery.
    • CATALOG_NAME: El nombre del catálogo que hace referencia a tu tabla de Spark.
    • TABLE_NAME: Un nombre de tabla para tu tabla de Spark.
    • WAREHOUSE_DIRECTORY: El URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
    • NAMESPACE_NAME: El nombre del espacio de nombres que hace referencia a tu tabla de Spark.
  2. Envía el trabajo por lotes con el siguiente gcloud dataproc batches submit pyspark comando:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar

    Reemplaza lo siguiente:

    • PYTHON_SCRIPT_PATH: La ruta de acceso a la secuencia de comandos de Python que usa el trabajo por lotes.
    • PROJECT_ID: El ID del Google Cloud proyecto en el que se ejecutará el trabajo por lotes.
    • REGION: La región en la que se ejecuta tu carga de trabajo.
    • BUCKET_PATH: La ubicación del bucket de Cloud Storage para subir dependencias de la carga de trabajo. No se requiere el prefijo de URI gs:// del bucket. Puedes especificar la ruta de acceso o el nombre del bucket, por ejemplo, mybucketname1.

    Para obtener más información sobre el envío de trabajos por lotes de PySpark, consulta la referencia de gcloud de PySpark.

¿Qué sigue?