将 Apache Iceberg 表与 Dataproc Metastore 搭配使用

本页面介绍了如何将 Dataproc Metastore 服务附加到 Managed Service for Apache Spark 集群,以便使用 Apache Iceberg 表。Apache Iceberg 是一种用于大型分析数据集的开放表 格式。

兼容性

Iceberg 表支持以下功能。

推动因素 选择 插入 创建表
Spark
Hive
Presto

准备工作

将 Iceberg 表与 Spark 搭配使用

以下示例展示了如何将 Iceberg 表与 Spark 搭配使用。

Iceberg 表支持读取和写入操作。如需了解详情,请参阅 Apache Iceberg - Spark

Spark 配置

首先,启动 Spark shell 并使用 Cloud Storage 存储桶来存储数据。 如需在 Spark 安装中添加 Iceberg,请将 Iceberg Spark 运行时 JAR 文件添加到 Spark 的 JAR 文件夹中。如需下载 JAR 文件,请参阅 Apache Iceberg 下载。以下命令将启动支持 Apache Iceberg 的 Spark shell:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

使用 Hive Catalog 创建 Iceberg 表

  1. 设置 Hive Catalog 配置以在 Spark Scala 中创建 Iceberg 表:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. 创建表以插入和更新数据。下面给出了一个示例。

    1. default 数据库下创建名为 example 的表:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. 插入示例数据:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. 根据列 id 指定分区策略:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. 创建表:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. 将 Iceberg Storage 处理程序和 SerDe 添加为表属性:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. 将数据写入表中:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. 读取数据:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. 更改表架构。下面给出了一个示例。

    1. 获取表并添加新列 grade

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 检查新表架构:

      table.schema.toString;
      
  4. 插入更多数据并查看架构演变。下面给出了一个示例。

    1. 向表中添加新数据:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. 检查插入的新数据:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. 查看表历史记录:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. 查看快照:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. 查看清单文件:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. 查看数据文件:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. 假设您添加值为 id=6 的行并想返回以查看表的正确版本,但遇到错误:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      snapshot-id 替换为您要返回到的版本。

使用 Hadoop 表创建 Iceberg 表

  1. 设置 Hadoop 表配置以在 Spark Scala 中创建 Iceberg 表:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. 创建表以插入和更新数据。下面给出了一个示例。

    1. default 数据库下创建名为 example 的表:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. 插入示例数据:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. 根据列 id 指定分区策略:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. 创建表:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. 将数据写入表中:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. 读取数据:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. 更改表架构。下面给出了一个示例。

    1. 获取表并添加新列 grade

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 检查新表架构:

      table.schema.toString;
      
  4. 插入更多数据并查看架构演变。下面给出了一个示例。

    1. 向表中添加新数据:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. 检查插入的新数据:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. 查看表历史记录:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. 查看快照:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. 查看清单文件:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. 查看数据文件:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. 返回以查看该表的特定版本:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      snapshot-id 替换为您要返回到的版本,并将 "L" 添加到末尾。例如 "3943776515926014142L"

在 Hive 上使用 Iceberg 表

Iceberg 支持使用 StorageHandler 通过 Hive 读取表。请注意,仅支持 Hive 2.x 和 3.1.2 版本。如需了解详情,请参阅 Apache Iceberg - Hive。此外,将 Iceberg Hive 运行时 JAR 文件添加到 Hive 类路径。如需 下载 JAR 文件,请参阅 Apache Iceberg 下载。 如需在 Iceberg 表之上叠加 Hive 表,您必须通过 Hive Catalog 或 Hadoop 表创建 Iceberg 表。此外,您还必须相应地配置 Hive 以便读取 Iceberg 表中的数据。

在 Hive 上读取 Iceberg 表 (Hive Catalog)

  1. 打开 Hive 客户端并设置相关配置,以在 Hive 客户端会话上读取 Iceberg 表:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. 读取表架构和数据。下面给出了一个示例。

    1. 检查表架构以及表格式是否为 Iceberg:

      describe formatted example;
      
    2. 从表中读取数据:

      select * from example;
      

在 Hive 上读取 Iceberg 表(Hadoop 表)

  1. 打开 Hive 客户端并设置相关配置,以在 Hive 客户端会话上读取 Iceberg 表:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. 读取表架构和数据。下面给出了一个示例。

    1. 创建外部表(在 Iceberg 表上叠加 Hive 表):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. 检查表架构以及表格式是否为 Iceberg:

      describe formatted hadoop_table;
      
    3. 从表中读取数据:

      select * from hadoop_table;
      

在 Presto 上使用 Iceberg 表

Presto 查询使用 Hive 连接器获取分区位置,因此您必须相应地配置 Presto 以在 Iceberg 表上读取和写入数据。如需了解详情,请参阅 Presto/Trino - Hive 连接器Presto/Trino - Iceberg 连接器

Presto 配置

  1. 在每个 Managed Service for Apache Spark 集群节点下,创建一个名为 iceberg.properties /etc/presto/conf/catalog/iceberg.properties 的文件,并按如下方式配置 hive.metastore.uri

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    example.net:9083 替换为 Hive Metastore Thrift 服务的正确主机和端口。

  2. 重启 Presto 服务以推送配置:

    sudo systemctl restart presto.service
    

在 Presto 上创建 Iceberg 表

  1. 打开 Presto 客户端并使用“Iceberg”连接器获取 Metastore:

    --catalog iceberg --schema default
    
  2. 创建表以插入和更新数据。下面给出了一个示例。

    1. default 数据库下创建名为 example 的表:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. 插入示例数据:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. 从表中读取数据:

      SELECT * FROM iceberg.default.example;
      
    4. 插入更多新数据以检查快照:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. 查看快照:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      通过添加 ORDER BY committed_at DESC LIMIT 1; 命令,您可以找到最新的快照 ID。

    6. 回滚到表的特定版本:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      snapshot-id 替换为您要返回到的版本。

后续步骤