クラスタに Delta Lake コンポーネントをインストールする

このドキュメントでは、Managed Service for Apache Spark クラスタに Delta Lakeオプション コンポーネントをインストールして構成する方法について説明します。有効にすると、Delta Lake コンポーネントは必要なライブラリをインストールし、Delta Lake と連携するように Spark と Hive を設定します。

Managed Service for Apache Spark で使用可能なその他のオプション コンポーネントの詳細については、 使用可能なオプション コンポーネントをご覧ください。

互換性のある Managed Service for Apache Spark イメージ バージョン

Delta Lake コンポーネントは、 Managed Service for Apache Spark イメージ バージョン 2.2.46 以降のイメージ バージョンで作成された Managed Service for Apache Spark クラスタにインストールできます。

Managed Service for Apache Spark イメージ リリースに含まれる Delta Lake コンポーネント バージョンについては、 サポートされている Managed Service for Apache Spark バージョン をご覧ください。

Delta Lake コンポーネントを有効にして Managed Service for Apache Spark クラスタを作成すると、次の Spark プロパティが Delta Lake と連携するように構成されます。

構成ファイル プロパティ デフォルト値
/etc/spark/conf/spark-defaults.conf spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
/etc/spark/conf/spark-defaults.conf spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

コンポーネントをインストールする

コンソール、Google Cloud CLI、または Dataproc API を使用して Managed Service for Apache Spark クラスタを作成するときにコンポーネントをインストールします。 Google Cloud

コンソール

  1. コンソールで、Managed Service for Apache Spark の [クラスタを作成] ページに移動します。 Google Cloud

    [クラスタの作成] に移動

    [クラスターを設定] パネルが選択されています。

  2. [コンポーネント] セクションの [オプション コンポーネント] で、クラスタにインストールする Delta Lake やその他のオプション コンポーネントを選択します。

gcloud CLI

Delta Lake コンポーネントを含む Managed Service for Apache Spark クラスタを作成するには、 gcloud dataproc clusters create コマンドと --optional-components フラグを使用します。

gcloud dataproc clusters create CLUSTER_NAME \
    --optional-components=DELTA \
    --region=REGION \
    ... other flags

注:

REST API

Delta Lake コンポーネントは、clusters.create リクエストの一部として SoftwareConfig.Component を使用して Dataproc API で指定できます。

使用例

このセクションでは、Delta Lake テーブルを使用したデータの読み取りと書き込みの例を示します。

Delta Lake テーブル

Delta Lake テーブルに書き込む

Spark DataFrame を使用して、Delta Lake テーブルにデータを書き込むことができます。次の例では、サンプルデータを使用して DataFrame を作成し、Cloud Storage に my_delta_table Delta Lake テーブルを作り、データを Delta Lake テーブルに書き込みます。

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create a Delta Lake table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table'""")

# Write the DataFrame to the Delta Lake table in Cloud Storage.
data.writeTo("my_delta_table").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create a Delta Lake table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table'""")

// Write the DataFrame to the Delta Lake table in Cloud Storage.
data.write.format("delta").mode("append").saveAsTable("my_delta_table")

Spark SQL

CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

INSERT INTO my_delta_table VALUES ("1", "Alice"), ("2", "Bob");

Delta Lake テーブルから読み取る

次の例では、my_delta_table を読み取り、その内容を表示します。

PySpark

# Read the Delta Lake table into a DataFrame.
df = spark.table("my_delta_table")

# Display the data.
df.show()

Scala

// Read the Delta Lake table into a DataFrame.
val df = spark.table("my_delta_table")

// Display the data.
df.show()

Spark SQL

SELECT * FROM my_delta_table;

Delta Lake を使用した Hive

Hive の Delta テーブルに書き込みます。

Managed Service for Apache Spark Delta Lake オプション コンポーネントは、Hive 外部テーブルと連携するように事前構成されています。

詳細については、Hive コネクタをご覧ください。

beeline クライアントでサンプルを実行します。

beeline -u jdbc:hive2://

Spark Delta Lake テーブルを作成します。

Hive 外部テーブルが Delta Lake テーブルを参照するには、Spark を使用して Delta Lake テーブルを作成する必要があります。

CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

INSERT INTO my_delta_table VALUES ("1", "Alice"), ("2", "Bob");

Hive 外部テーブルを作成します。

SET hive.input.format=io.delta.hive.HiveInputFormat;
SET hive.tez.input.format=io.delta.hive.HiveInputFormat;

CREATE EXTERNAL TABLE deltaTable(id INT, name STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

注:

  • io.delta.hive.DeltaStorageHandler クラスは、Hive データソース API を実装します。Delta テーブルを読み込んでメタデータを抽出できます。CREATE TABLE ステートメントのテーブル スキーマが基盤となる Delta Lake メタデータと一致しない場合、エラーがスローされます。

Hive の Delta Lake テーブルから読み取ります。

Delta テーブルからデータを読み取るには、SELECT ステートメントを使用します。

SELECT * FROM deltaTable;

Delta Lake テーブルを削除します。

Delta テーブルを削除するには、DROP TABLE ステートメントを使用します。

DROP TABLE deltaTable;