Managed Service for Apache Spark 可选的 Iceberg 组件

使用可选组件功能创建 Managed Service for Apache Spark 集群时,您可以安装 Iceberg 等其他组件。本页面介绍了如何选择性地在 Managed Service for Apache Spark 集群上安装 Iceberg 组件。

概览

Apache Iceberg 是一种用于大型分析数据集的开放表格式。该格式为大数据提供了 SQL 表的可靠性和简单性,并且让 Spark、Trino、PrestoDB、Flink 和 Hive 等引擎能够同时安全地处理相同的表。

在 Managed Service for Apache Spark 集群上安装 Apache Iceberg 组件后,它会安装 Iceberg 库,并在集群中配置 Spark 和 Hive,以与 Iceberg 搭配使用。

主要 Iceberg 功能

Iceberg 具有以下特点:

  • 架构演变:添加、移除或重命名列,而无需重写整个表。
  • 时间旅行:查询历史表快照,以用于审核或回滚。
  • 隐藏分区:优化数据布局,可更快地进行查询,而无需向用户公开分区详细信息。
  • ACID 事务:确保数据一致性并防止冲突。

兼容的 Managed Service for Apache Spark 映像版本

您可以在使用 2.2.47 及更高映像版本创建的 Managed Service for Apache Spark 集群上安装 Iceberg 组件。该集群上安装的 Iceberg 版本列在 2.2 发布版本页面中。

创建具有 Iceberg 集群的 Managed Service for Apache Spark 时,系统会将以下 Spark 和 Hive 属性配置为与 Iceberg 搭配使用。

配置文件 属性 默认值
/etc/spark/conf/spark-defaults.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.driver.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
spark.executor.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar
iceberg.engine.hive.enabled true

安装 Iceberg 可选组件

创建 Managed Service for Apache Spark 集群时安装 Iceberg 组件。Managed Service for Apache Spark 集群映像版本列表页面会显示最新 Managed Service for Apache Spark 集群映像版本中包含的 Iceberg 组件版本。

Google Cloud 控制台

如需创建用于安装 Iceberg 组件的 Managed Service for Apache Spark 集群,请在 Google Cloud 控制台中完成以下步骤:

  1. 打开 Managed Service for Apache Spark 的创建集群页面。设置集群面板已处于选中状态。
  2. 组件部分的可选组件下,选择 Iceberg 组件。
  3. 确认或指定其他集群设置,然后点击创建

Google Cloud CLI

如需创建用于安装 Iceberg 组件的 Managed Service for Apache Spark 集群,请将 gcloud dataproc clusters create 命令与 --optional-components 标志结合使用。

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=ICEBERG \
     other flags ...

替换以下内容:

  • CLUSTER_NAME:新集群名称。
  • REGION集群区域

REST API

如需创建用于安装 Iceberg 可选组件的 Managed Service for Apache Spark 集群,请在 clusters.create 请求中指定 Iceberg SoftwareConfig.Component

将 Iceberg 表与 Spark 和 Hive 搭配使用

创建在集群上安装了 Iceberg 可选组件的 Managed Service for Apache Spark 集群后,您可以使用 Spark 和 Hive 读取和写入 Iceberg 表数据。

Spark

为 Iceberg 配置 Spark 会话

您可以在本地使用 gcloud CLI 命令,也可以使用在 dataproc 集群主节点上运行的 spark-shellpyspark REPL(读取-评估-输出循环),以启用 Iceberg 的 Spark 扩展程序,并设置 Spark 目录以使用 Iceberg 表。

gcloud

在本地终端窗口或 Cloud Shell 中运行以下 gcloud CLI 示例,以提交 Spark 作业并设置 Spark 属性,从而为 Iceberg 配置 Spark 会话。

gcloud dataproc jobs submit spark  \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \
     other flags ...

替换以下内容:

  • CLUSTER_NAME:集群名称。
  • REGIONCompute Engine 区域。
  • CATALOG_NAME:Iceberg 目录名称。
  • BUCKETFOLDER:Cloud Storage 中的 Iceberg 目录位置。

spark-shell

如需使用 Managed Service for Apache Spark 集群上的 spark-shell REPL 为 Iceberg 配置 Spark 会话,请完成以下步骤:

  1. 使用 SSH 连接到 Managed Service for Apache Spark 集群的主节点。

  2. 在 SSH 会话终端中运行以下命令,以配置 Iceberg 的 Spark 会话。

spark-shell \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

替换以下内容:

  • CLUSTER_NAME:集群名称。
  • REGIONCompute Engine 区域。
  • CATALOG_NAME:Iceberg 目录名称。
  • BUCKETFOLDER:Cloud Storage 中的 Iceberg 目录位置。

pyspark shell

如需使用 Managed Service for Apache Spark 集群上的 pyspark REPL 为 Iceberg 配置 Spark 会话,请完成以下步骤:

  1. 使用 SSH 连接到 Managed Service for Apache Spark 集群的主节点。

  2. 在 SSH 会话终端中运行以下命令,以配置 Iceberg 的 Spark 会话:

pyspark \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

替换以下内容:

  • CLUSTER_NAME:集群名称。
  • REGIONCompute Engine 区域。
  • CATALOG_NAME:Iceberg 目录名称。
  • BUCKETFOLDER:Cloud Storage 中的 Iceberg 目录位置。

将数据写入 Iceberg 表

您可以使用 Spark 将数据写入 Iceberg 表。以下代码段会创建一个包含示例数据的 DataFrame,在 Cloud Storage 中创建一个 Iceberg 表,然后将数据写入 Iceberg 表。

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

从 Iceberg 表中读取数据

您可以使用 Spark 从 Iceberg 表中读取数据。以下代码段会读取该表,然后显示其内容。

PySpark

# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()

Scala

// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")

// Display the data.
df.show()

Spark SQL

SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME

Hive

在 Hive 中创建 Iceberg 表

Managed Service for Apache Spark 集群预先配置了 Hive,以与 Iceberg 搭配使用。

如需运行本部分中的代码段,请完成以下步骤:

  1. 使用 SSH 连接到 Managed Service for Apache Spark 集群的主节点。

  2. 在 SSH 终端窗口中启动 beeline

    beeline -u jdbc:hive2://
    

您可以在 Hive 中创建未分区或分区 Iceberg 表。

未分区表

在 Hive 中创建未分区 Iceberg 表。

CREATE TABLE my_table (
  id INT,
  name STRING,
  created_at TIMESTAMP
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

分区表

通过在 PARTITIONED BY 子句中指定分区列,可在 Hive 中创建分区 Iceberg 表。

CREATE TABLE my_partitioned_table (
  id INT,
  name STRING
) PARTITIONED BY (date_sk INT)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

将数据插入到 Hive 的 Iceberg 表中

您可以使用标准 Hive INSERT 语句将数据插入 Iceberg 表。

SET hive.execution.engine=mr;

INSERT INTO my_table
SELECT 1, 'Alice', current_timestamp();

限制

  • DML(数据操纵语言)操作仅支持 MR (MapReduce) 执行引擎。
  • Hive 3.1.3 中已弃用 MR 执行。

从 Hive 的 Iceberg 表中读取数据

如需从 Iceberg 表中读取数据,请使用 SELECT 语句。

SELECT * FROM my_table;

在 Hive 中删除 Iceberg 表。

如需在 Hive 中删除 Iceberg 表,请使用 DROP TABLE 语句。

DROP TABLE my_table;

后续步骤