使用 Iceberg 1.10 为 Managed Service for Apache Spark 配置 Lakehouse 运行时目录

本文档介绍了如何在湖仓一体运行时目录中配置 BigQuery 的自定义 Apache Iceberg 目录

您可以使用 Managed Service for Apache Spark 集群或 Managed Service for Apache Spark 来设置此功能。这样一来,您就可以在 Google Cloud 湖仓一体中创建单个共享目录,该目录可与 Apache Spark 和 Apache Flink 等开源引擎无缝协作。

准备工作

  1. 为您的 Google Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能
  2. 启用 BigQuery API 和 Managed Service for Apache Spark API。

    启用 API

  3. 了解 Lakehouse 运行时目录

所需的角色

如需获得配置 Lakehouse 运行时目录所需的权限,请让管理员为您授予以下 IAM 角色:

  • 创建 Managed Service for Apache Spark 集群:针对项目中 Compute Engine 默认服务账号的 Dataproc Worker (roles/dataproc.worker)
  • 创建 Lakehouse 运行时目录表:
    • 项目中 Managed Service for Apache Spark 虚拟机服务账号的 Dataproc Worker (roles/dataproc.worker)
    • 针对项目中 Managed Service for Apache Spark 虚拟机服务账号的 BigQuery Data Editor (roles/bigquery.dataEditor) 角色
    • 针对项目中 Managed Service for Apache Spark 虚拟机服务账号的 Storage Object User (roles/storage.objectUser) 角色
  • 查询 Lakehouse 运行时目录表:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

使用 Managed Service for Apache Spark 配置 Metastore

您可以使用 Spark 或 Flink 通过 Managed Service for Apache Spark 配置 Lakehouse 运行时目录:

Spark

  1. 配置新集群。如需创建新的 Managed Service for Apache Spark 集群,请运行以下 gcloud dataproc clusters create 命令,其中包含您需要使用的 Lakehouse 运行时目录设置:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    替换以下内容:

    • CLUSTER_NAME:Managed Service for Apache Spark 集群的名称。
    • PROJECT_ID:您要在其中创建集群的 Google Cloud 项目的 ID。
    • LOCATION:您要创建集群的 Compute Engine 区域。
  2. 使用以下方法之一提交 Spark 作业:

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
        spark.sql.catalog.CATALOG_NAME.type=bigquery,\
        spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID,\
        spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION,\
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    替换以下内容:

    • PROJECT_ID:包含 Managed Service for Apache Spark 集群的 Google Cloud 项目的 ID。
    • CLUSTER_NAME:您用于运行 Spark SQL 作业的 Managed Service for Apache Spark 集群的名称。
    • REGION:您的集群所在的 Compute Engine 区域
    • LOCATION:BigQuery 资源的位置。
    • CATALOG_NAME:要为 SQL 作业使用的 Spark 目录的名称。
    • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹。此值以 gs:// 开头。
    • SPARK_SQL_COMMAND:您要运行的 Spark SQL 查询。此查询包含用于创建资源的命令。例如,创建命名空间和表。

    spark-sql CLI

    1. 在 Google Cloud 控制台中,前往虚拟机实例页面。

      转到“虚拟机实例”

    2. 如需连接到 Managed Service for Apache Spark 虚拟机实例,请点击列出 Managed Service for Apache Spark 集群主虚拟机实例名称(即集群名称后跟 -m 后缀)的行中的 SSH。输出类似于以下内容:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. 在终端中,运行以下 Lakehouse 运行时目录初始化命令:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.type=bigquery \
          --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      替换以下内容:

      • CATALOG_NAME:您在 SQL 作业中使用的 Spark 目录的名称。
      • PROJECT_ID:Spark 目录关联的 Lakehouse 运行时目录的 Google Cloud 项目 ID。
      • LOCATION:Lakehouse 运行时目录的 Google Cloud 位置。
      • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹。此值以 gs:// 开头。

      成功连接到集群后,Spark 终端会显示 spark-sql 提示,您可以使用该提示提交 Spark 作业。

      spark-sql (default)>
      
  1. 创建启用了可选 Flink 组件的 Managed Service for Apache Spark 集群,并确保您使用的是 Managed Service for Apache Spark 2.2 或更高版本。
  2. 在 Google Cloud 控制台中,转到虚拟机实例页面。

    转到虚拟机实例

  3. 在虚拟机实例列表中,点击 SSH 以连接到主要的 Managed Service for Apache Spark 集群虚拟机实例,该实例的名称为集群名称后跟 -m 后缀。

  4. 为 Lakehouse 运行时目录配置 Apache Iceberg 自定义目录插件:

    FLINK_VERSION=1.20
    ICEBERG_VERSION=1.10.0
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/${ICEBERG_VERSION}/iceberg-bigquery-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/${ICEBERG_VERSION}/iceberg-gcp-bundle-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/${ICEBERG_VERSION}/iceberg-gcp-${ICEBERG_VERSION}.jar -P lib
  5. 在 YARN 上启动 Flink 会话:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. 在 Flink 中创建目录:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp.bigquery.project-id'='PROJECT_ID',
    'gcp.bigquery.location'='LOCATION'
    );

    替换以下内容:

    • CATALOG_NAME:Flink 目录标识符,与 Lakehouse 运行时目录相关联。
    • WAREHOUSE_DIRECTORY:仓库目录(Flink 在其中创建文件的 Cloud Storage 文件夹)的基本路径。此值以 gs:// 开头。
    • PROJECT_ID:Flink 目录关联的 Lakehouse 运行时目录的项目 ID。
    • LOCATION:BigQuery 资源的位置

您的 Flink 会话现已连接到 Lakehouse 运行时目录,您可以运行 Flink SQL 命令。

现在,您已连接到 Lakehouse 运行时目录,可以根据存储在 Lakehouse 运行时目录中的元数据创建和查看资源。

例如,尝试在交互式 Flink SQL 会话中运行以下命令,以创建 Apache Iceberg 数据库和表。

  1. 使用自定义 Apache Iceberg 目录:

    USE CATALOG CATALOG_NAME;

    CATALOG_NAME 替换为 Flink 目录标识符。

  2. 创建一个数据库,以在 BigQuery 中创建一个数据集:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    DATABASE_NAME 替换为新数据库的名称。

  3. 使用您创建的数据库:

    USE DATABASE_NAME;
  4. 创建 Apache Iceberg 表。以下命令会创建一个示例销售表:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    ICEBERG_TABLE_NAME 替换为新表的名称。

  5. 查看表元数据:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. 列出数据库中的表:

    SHOW TABLES;

将数据提取到表中

在上一部分中创建 Apache Iceberg 表后,您可以使用 Flink DataGen 作为数据源,将实时数据注入到表中。以下步骤是此工作流的一个示例:

  1. 使用 DataGen 创建临时表:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    替换以下内容:

    • DATABASE_NAME:用于存储临时表的数据库的名称。
    • TEMP_TABLE_NAME:临时表的名称。
    • ICEBERG_TABLE_NAME:您在上一部分中创建的 Apache Iceberg 表的名称。
  2. 将并行性设置为 1:

    SET 'parallelism.default' = '1';
  3. 设置检查点间隔:

    SET 'execution.checkpointing.interval' = '10second';
  4. 设置检查点:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. 启动实时流式作业:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    输出类似于以下内容:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. 如需检查流式作业的状态,请执行以下操作:

    1. 在 Google Cloud 控制台中,前往集群页面。

      转到集群

    2. 选择您的集群。

    3. 点击网页界面标签页。

    4. 点击 YARN ResourceManager 链接。

    5. YARN ResourceManager 界面中,找到您的 Flink 会话,然后点击跟踪界面下的 ApplicationMaster 链接。

    6. 状态列中,确认作业状态为正在运行

  7. 在 Flink SQL 客户端中查询流式数据:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. 在 BigQuery 中查询流式数据:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. 在 Flink SQL 客户端中终止流式作业:

    STOP JOB 'JOB_ID';

    JOB_ID 替换为创建流式作业时输出中显示的作业 ID。

使用 Managed Service for Apache Spark 配置 Metastore

您可以使用 Spark SQL 或 PySpark 通过 Managed Service for Apache Spark 配置 Lakehouse 运行时目录。

Spark SQL

  1. 创建一个 SQL 文件,其中包含要在 Lakehouse 运行时目录中运行的 Spark SQL 命令。例如,以下命令会创建一个命名空间和一个表:

    SET `spark.sql.catalog.CATALOG_NAME`=`org.apache.iceberg.spark.SparkCatalog`;
    SET `spark.sql.catalog.CATALOG_NAME.type`=`bigquery`;
    SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id`=`PROJECT_ID`;
    SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location`=`LOCATION`;
    SET `spark.sql.catalog.CATALOG_NAME.warehouse`=`WAREHOUSE_DIRECTORY`;
    
    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    替换以下内容:

    • CATALOG_NAME:引用 Spark 表的目录名称。
    • NAMESPACE_NAME:引用 Spark 表的命名空间名称。
    • TABLE_NAME:Spark 表的表名。
    • WAREHOUSE_DIRECTORY:存储数据仓库的 Cloud Storage 文件夹的 URI。
  2. 运行以下 gcloud dataproc batches submit spark-sql 命令,提交 Spark SQL 批量作业:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar

    替换以下内容:

    • SQL_SCRIPT_PATH:批量作业使用的 SQL 文件的路径。
    • PROJECT_ID:要运行批量作业的 Google Cloud 项目的 ID。
    • REGION:工作负载运行所在的区域。
    • SUBNET_NAME(可选):REGION 中满足会话子网要求的 VPC 子网的名称。
    • BUCKET_PATH:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。WAREHOUSE_DIRECTORY 位于此存储桶中。存储桶的 gs:// URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如 mybucketname1
    • LOCATION:运行批量作业的位置。

    如需详细了解如何提交 Spark 批处理作业,请参阅运行 Spark 批处理工作负载

PySpark

  1. 创建一个 Python 文件,其中包含要在 Lakehouse 运行时目录中运行的 PySpark 命令。

    例如,以下命令会设置一个 Spark 环境,以便与存储在 Lakehouse 运行时目录中的 Apache Iceberg 表进行交互。然后,该命令会创建一个新的命名空间,并在该命名空间内创建一个 Apache Iceberg 表。

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("Lakehouse runtime catalog Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.type", "bigquery") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    替换以下内容:

    • PROJECT_ID:要运行批量作业的 Google Cloud 项目的 ID。
    • LOCATION:BigQuery 资源所在的位置
    • CATALOG_NAME:引用 Spark 表的目录名称。
    • TABLE_NAME:Spark 表的表名。
    • WAREHOUSE_DIRECTORY:存储数据仓库的 Cloud Storage 文件夹的 URI。
    • NAMESPACE_NAME:引用 Spark 表的命名空间名称。
  2. 使用以下 gcloud dataproc batches submit pyspark 命令提交批量作业:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar

    替换以下内容:

    • PYTHON_SCRIPT_PATH:批量作业使用的Python 脚本的路径。
    • PROJECT_ID:要运行批量作业的 Google Cloud 项目的 ID。
    • REGION:工作负载运行所在的区域。
    • BUCKET_PATH:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。存储桶的 gs:// URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如 mybucketname1

    如需详细了解如何提交 PySpark 批处理作业,请参阅 PySpark gcloud 参考文档

后续步骤