Managed Service for Apache Spark クラスタを作成する際には、オプション コンポーネント機能を使用して、Iceberg などの追加コンポーネントをインストールできます。このページでは、必要に応じて、Managed Service for Apache Spark クラスタに Iceberg コンポーネントをインストールする方法について説明します。
概要
Apache Iceberg は、大規模な分析データセットに対応したオープン テーブル形式です。SQL テーブルの信頼性とシンプルさをビッグデータにもたらし、Spark、Trino、PrestoDB、Flink、Hive などのエンジンが同じテーブルを同時に安全に操作できるようにします。
Managed Service for Apache Spark クラスタにインストールすると、Apache Iceberg コンポーネントは Iceberg ライブラリをインストールし、クラスタで Iceberg と連携するように Spark と Hive を構成します。
Iceberg の主な機能
Iceberg の機能は次のとおりです。
- スキーマの進化: テーブル全体を書き換えることなく、列の追加、削除、名前の変更を行います。
- タイムトラベル: 監査またはロールバックの目的で、過去のテーブル スナップショットをクエリします。
- 非表示パーティショニング: パーティションの詳細をユーザーに公開せずに、クエリを高速化するためにデータ レイアウトを最適化します。
- ACID トランザクション: データの整合性を確保し、競合を防ぎます。
互換性のある Managed Service for Apache Spark イメージ バージョン
Iceberg コンポーネントは、2.2.47 以降のイメージ バージョンで作成された Managed Service for Apache Spark クラスタにインストールできます。クラスタにインストールされている Iceberg のバージョンは、2.2 リリース バージョンのページに記載されています。
Iceberg 関連のプロパティ
Iceberg クラスタで Managed Service for Apache Spark を作成すると、次の Spark プロパティと Hive プロパティが Iceberg と連携するように構成されます。
| 構成ファイル | プロパティ | デフォルト値 |
|---|---|---|
/etc/spark/conf/spark-defaults.conf |
spark.sql.extensions |
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions |
spark.driver.extraClassPath |
/usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar |
|
spark.executor.extraClassPath |
/usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar |
|
/etc/hive/conf/hive-site.xml |
hive.aux.jars.path |
file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar |
iceberg.engine.hive.enabled |
true |
Iceberg オプション コンポーネントをインストールする
Managed Service for Apache Spark クラスタを作成するときに、Iceberg コンポーネントをインストールします。Managed Service for Apache Spark クラスタ イメージ バージョン リスト ページには、最新の Managed Service for Apache Spark クラスタ イメージ バージョンに含まれる Iceberg コンポーネント バージョンが表示されます。
Google Cloud コンソール
Iceberg コンポーネントをインストールする Managed Service for Apache Spark クラスタを作成するには、 Google Cloud コンソールで次の操作を行います。
- Managed Service for Apache Spark の [クラスタの作成] ページを開きます。[クラスタの設定] パネルが選択されています。
- [コンポーネント] セクションの [オプション コンポーネント] で、[Iceberg] コンポーネントを選択します。
- 他のクラスタ設定を確認または指定して、[作成] をクリックします。
Google Cloud CLI
Iceberg コンポーネントをインストールする Managed Service for Apache Spark クラスタを作成するには、--optional-components フラグを指定した gcloud dataproc clusters create コマンドを使用します。
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --optional-components=ICEBERG \ other flags ...
次のように置き換えます。
- CLUSTER_NAME: 新しいクラスタ名。
- REGION: クラスタ リージョン。
REST API
Iceberg オプション コンポーネントをインストールする Managed Service for Apache Spark クラスタを作成するには、clusters.create リクエストの一部として Iceberg SoftwareConfig.Component を指定します。
Spark と Hive で Iceberg テーブルを使用する
Iceberg のオプション コンポーネントがクラスタにインストールされている Managed Service for Apache Spark クラスタを作成したら、Spark と Hive を使用して Iceberg テーブルのデータを読み書きできます。
Spark
Iceberg 用の Spark セッションを構成する
gcloud CLI コマンドをローカルで使用するか、Dataproc クラスタのマスターノードで実行されている spark-shell または pyspark REPL(Read-Eval-Print Loop)を使用して、Iceberg の Spark 拡張機能を有効にし、Iceberg テーブルを使用するように Spark カタログを設定できます。
gcloud
次の gcloud CLI の例をローカル ターミナル ウィンドウまたは Cloud Shell で実行して、Spark ジョブを送信し、Spark プロパティを設定して Iceberg の Spark セッションを構成します。
gcloud dataproc jobs submit spark \ --cluster=CLUSTER_NAME \ --region=REGION \ --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \ --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \ --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \ other flags ...
次のように置き換えます。
- CLUSTER_NAME: クラスタ名。
- REGION: Compute Engine のリージョン。
- CATALOG_NAME: Iceberg カタログ名。
- BUCKET と FOLDER: Cloud Storage 内の Iceberg カタログのロケーション。
spark-shell
Managed Service for Apache Spark クラスタの spark-shell REPL を使用して Iceberg の Spark セッションを構成するには、次の操作を行います。
SSH を使用して、Managed Service for Apache Spark クラスタのマスターノードに接続します。
SSH セッション ターミナルで次のコマンドを実行して、Iceberg 用に Spark セッションを構成します。
spark-shell \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
--conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"
次のように置き換えます。
- CLUSTER_NAME: クラスタ名。
- REGION: Compute Engine のリージョン。
- CATALOG_NAME: Iceberg カタログ名。
- BUCKET と FOLDER: Cloud Storage 内の Iceberg カタログのロケーション。
pyspark シェル
Managed Service for Apache Spark クラスタの pyspark REPL を使用して Iceberg の Spark セッションを構成するには、次の操作を行います。
SSH を使用して、Managed Service for Apache Spark クラスタのマスターノードに接続します。
SSH セッション ターミナルで次のコマンドを実行して、Iceberg 用に Spark セッションを構成します。
pyspark \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
--conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"
次のように置き換えます。
- CLUSTER_NAME: クラスタ名。
- REGION: Compute Engine のリージョン。
- CATALOG_NAME: Iceberg カタログ名。
- BUCKET と FOLDER: Cloud Storage 内の Iceberg カタログのロケーション。
Iceberg テーブルにデータを書き込む
Spark を使用して Iceberg テーブルにデータを書き込むことができます。次のコード スニペットは、サンプルデータを使用して DataFrame を作成し、Cloud Storage に Iceberg テーブルを作成して、データを Iceberg テーブルに書き込みます。
PySpark
# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
id integer,
name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")
# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()
Scala
// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
id integer,
name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")
// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()
Iceberg テーブルからデータを読み取る
Spark を使用して Iceberg テーブルからデータを読み取ることができます。次のコード スニペットは、テーブルを読み取り、その内容を表示します。
PySpark
# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()
Scala
// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
// Display the data.
df.show()
Spark SQL
SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME
Hive
Hive で Iceberg テーブルを作成する
Managed Service for Apache Spark クラスタは、Iceberg と連携するように Hive を事前に構成します。
このセクションのコード スニペットを実行するには、次の操作を行います。
SSH を使用して、Managed Service for Apache Spark クラスタのマスターノードに接続します。
SSH ターミナル ウィンドウで
beelineを起動します。beeline -u jdbc:hive2://
Hive でパーティション分割されていない Iceberg テーブルまたはパーティション分割された Iceberg テーブルを作成できます。
パーティション分割されていないテーブル
Hive にパーティション分割されていない Iceberg テーブルを作成します。
CREATE TABLE my_table ( id INT, name STRING, created_at TIMESTAMP ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
パーティション分割テーブル
PARTITIONED BY 句でパーティション列を指定して、Hive にパーティション分割された Iceberg テーブルを作成します。
CREATE TABLE my_partitioned_table ( id INT, name STRING ) PARTITIONED BY (date_sk INT) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
Hive の Iceberg テーブルにデータを挿入する
標準の Hive INSERT ステートメントを使用して、Iceberg テーブルにデータを挿入できます。
SET hive.execution.engine=mr; INSERT INTO my_table SELECT 1, 'Alice', current_timestamp();
制限事項
- DML(データ操作言語)オペレーションでは、MR(MapReduce)実行エンジンのみがサポートされます。
- Hive
3.1.3では MR 実行は非推奨になりました。
Hive の Iceberg テーブルからデータを読み取る
Iceberg テーブルからデータを読み取るには、SELECT ステートメントを使用します。
SELECT * FROM my_table;
Hive で Iceberg テーブルを削除する
Hive で Iceberg テーブルを削除するには、DROP TABLE ステートメントを使用します。
DROP TABLE my_table;
次のステップ
- Iceberg クイック スタートガイドをご覧ください。