Dataproc Metastore で Apache Iceberg テーブルを使用する

このページでは、Managed Service for Apache Spark クラスタに接続された Dataproc Metastore サービスで Apache Iceberg テーブルを使用する方法について説明します。Apache Iceberg は、大規模な分析データセットに対応したオープン テーブル 形式です。

互換性

Iceberg テーブルは、次の機能をサポートしています。

ドライバ 選択 挿入 テーブルを作成
Spark
Hive
Presto

準備

Spark で Iceberg テーブルを使用する

次の例は、Spark で Iceberg テーブルを使用する方法を示しています。

Iceberg テーブルは、読み取りと書き込みのオペレーションをサポートしています。詳細については、Apache Iceberg - Spark をご覧ください。

Spark 構成

まず、Spark シェルを起動し、Cloud Storage バケットを使用してデータを保存します。 Iceberg を Spark のインストールに含めるには、Iceberg Spark ランタイム JAR ファイルを Spark の JAR フォルダに追加します。JAR ファイルをダウンロードするには、Apache Iceberg のダウンロードをご覧ください。次のコマンドを実行すると、Apache Iceberg をサポートする Spark シェルが起動します。

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Hive カタログを使用して Iceberg テーブルを作成する

  1. Spark Scala に Iceberg テーブルを作成するように Hive カタログ構成を設定します。

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. データの挿入と更新を行うテーブルを作成します。次に例を示します。

    1. default データベースの下に example という名前のテーブルを作成します。

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. サンプルデータを挿入する

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. id 列に基づいてパーティション戦略を指定します。

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. テーブルを作成します。

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Iceberg ストレージ ハンドラと SerDe をテーブル プロパティとして追加します。

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. テーブルにデータを書き込みます。

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. データを読み取ります。

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. テーブル スキーマを変更します。次に例を示します。

    1. テーブルを取得して、新しい列 grade を追加します。

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 新しいテーブル スキーマを確認します。

      table.schema.toString;
      
  4. より多くのデータを挿入し、スキーマの進化を表示する。次に例を示します。

    1. テーブルに新しいデータを追加します。

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. 挿入された新しいデータを確認します。

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. テーブルの履歴を表示します。

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. スナップショットを表示します。

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. マニフェスト ファイルを表示します。

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. データファイルを表示します。

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. id=6 の値を含む行を追加し、誤った表を表示する必要があるとします。

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      snapshot-id は、復元するバージョンに置き換えます。

Hadoop テーブルを使用して Iceberg テーブルを作成する

  1. Hadoop テーブル構成を設定して、Spark Scala で Iceberg テーブルを作成します。

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. データの挿入と更新を行うテーブルを作成します。次に例を示します。

    1. default データベースの下に example という名前のテーブルを作成します。

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. サンプルデータを挿入する

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. id 列に基づいてパーティション戦略を指定します。

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. テーブルを作成します。

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. テーブルにデータを書き込みます。

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. データを読み取ります。

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. テーブル スキーマを変更します。次に例を示します。

    1. テーブルを取得して、新しい列 grade を追加します。

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 新しいテーブル スキーマを確認します。

      table.schema.toString;
      
  4. より多くのデータを挿入し、スキーマの進化を表示する。次に例を示します。

    1. テーブルに新しいデータを追加します。

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. 挿入された新しいデータを確認します。

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. テーブルの履歴を表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. スナップショットを表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. マニフェスト ファイルを表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. データファイルを表示します。

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. 戻って特別なバージョンのテーブルを閲覧します。

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      snapshot-id は、復元するバージョンに置き換えて "L" を末尾に追加します。例: "3943776515926014142L"

Hive で Iceberg テーブルを使用する

Iceberg は、StorageHandler を使用して Hive で読み取られるテーブルをサポートしています。Hive 2.x と 3.1.2 のバージョンのみがサポートされています。詳しくは、 Apache Iceberg - Hiveをご覧ください。さらに、Iceberg Hive ランタイム JAR ファイルを Hive クラスパスに追加します。JAR ファイルをダウンロードするには、Apache Iceberg のダウンロードをご覧ください。Hive テーブルを Iceberg のテーブルの上にオーバーレイするには、Hive カタログまたは Hadoop テーブルを使用して Iceberg テーブルを作成する必要があります。また、Iceberg テーブルからデータを読み取るように Hive を構成する必要があります。

Hive の Iceberg テーブル(Hive カタログ)を読み取る

  1. Hive クライアントを開き、Hive クライアント セッションで Iceberg テーブルを読み取るように構成を行います。

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. テーブルのスキーマとデータを読み取ります。次に例を示します。

    1. テーブルのスキーマとテーブル形式が Iceberg かどうかを確認します。

      describe formatted example;
      
    2. テーブルからデータを読み取ります。

      select * from example;
      

Hive の Iceberg テーブル(Hadoop テーブル)を読み取る

  1. Hive クライアントを開き、Hive クライアント セッションで Iceberg テーブルを読み取るように構成を行います。

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. テーブルのスキーマとデータを読み取ります。次に例を示します。

    1. 外部テーブルを作成します(Iceberg テーブルの上に Hive テーブルをオーバーレイします)。

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. テーブルのスキーマとテーブル形式が Iceberg かどうかを確認します。

      describe formatted hadoop_table;
      
    3. テーブルからデータを読み取ります。

      select * from hadoop_table;
      

Presto で Iceberg テーブルを使用する

Presto クエリは Hive コネクタを使用してパーティションの場所を取得します。このため、Iceberg テーブルに対してデータの読み取りと書き込みを行うように Presto を構成する必要があります。詳しくは、Presto/Trino - Hive コネクタPresto/Trino - Iceberg コネクタをご覧ください。

Presto の設定

  1. Managed Service for Apache Spark クラスタノードごとに、 iceberg.properties /etc/presto/conf/catalog/iceberg.properties という名前のファイルを作成し、 を次のように構成します。hive.metastore.uri

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    example.net:9083 は、Hive メタストアの Thrift サービスの正しいホストとポートに置き換えます。

  2. Presto サービスを再起動して構成をプッシュします。

    sudo systemctl restart presto.service
    

Presto に Iceberg テーブルを作成する

  1. Presto クライアントを開き、Iceberg コネクタを使用してメタストアを取得します。

    --catalog iceberg --schema default
    
  2. データの挿入と更新を行うテーブルを作成します。次に例を示します。

    1. default データベースの下に example という名前のテーブルを作成します。

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. サンプルデータを挿入する

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. テーブルからデータを読み取ります。

      SELECT * FROM iceberg.default.example;
      
    4. 新しいデータを挿入してスナップショットを確認します。

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. スナップショットを表示します。

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      コマンド ORDER BY committed_at DESC LIMIT 1; を追加すると、最新のスナップショット ID を検索できます。

    6. 特定のバージョンのテーブルへのロールバック:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      snapshot-id は、復元するバージョンに置き換えます。

次のステップ