Spark と BigLake metastore を使用してレイクハウスを作成する

レイクハウス アーキテクチャは、データレイクの柔軟性とデータ ウェアハウスのデータ管理機能を組み合わせたものです。このドキュメントでは、 Google Cloudにレイクハウスを設定する方法について説明します。テーブル形式として Apache Iceberg、処理に Managed Service for Apache Spark、統合メタデータ管理に BigLake Metastore の Iceberg REST カタログを使用します。

このアーキテクチャでは、Iceberg などのオープン テーブル形式を使用して、トランザクションやスキーマの進化などのデータ ウェアハウス機能を Cloud Storage のデータに追加します。このアプローチでは、さまざまなエンジンからアクセスできるデータの信頼できる唯一の情報源が作成されます。

始める前に

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Iceberg データを保存する Cloud Storage バケットを作成します。

必要なロール

このページの例を実行するには、特定の IAM ロールが必要です。組織のポリシーによっては、これらのロールがすでに付与されている場合があります。ロール付与を確認するには、ロールを付与する必要がありますか?をご覧ください。

ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

ユーザーロール

Managed Service for Apache Spark クラスタの作成に必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

サービス アカウント ロール

Compute Engine のデフォルト サービス アカウントに Managed Service for Apache Spark クラスタを作成するために必要な権限を付与するには、プロジェクトに対する Dataproc ワーカー roles/dataproc.worker)IAM ロールを Compute Engine のデフォルト サービス アカウントに付与するよう管理者に依頼してください。

Managed Service for Apache Spark クラスタを作成する

Iceberg と Jupyter のオプション コンポーネントを使用して、Managed Service for Apache Spark クラスタを作成します。

  1. クラスタを作成するには、次の gcloud コマンドを実行します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.3-debian12 \
        --optional-components=ICEBERG,JUPYTER \
        --enable-component-gateway
    

    次のように置き換えます。

    • CLUSTER_NAME: クラスタの名前。
    • PROJECT_ID: 実際の Google Cloud プロジェクト ID。
    • REGION: クラスタの Google Cloud リージョン(例: us-central1)。
  2. Jupyter Notebook を使用してクラスタに接続します。Vertex AI Workbench ノートブックを使用するか、クラスタでノートブックを直接起動できます。

Spark セッションを構成する

Jupyter Notebook で、BigLake metastore Iceberg REST カタログを使用するように構成された Spark セッションを作成します。

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"

spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1beta/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://GCS_BUCKET') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', catalog_name) \
  .getOrCreate()

次のように置き換えます。

  • CATALOG_NAME: Iceberg カタログの名前(iceberg_catalog など)。
  • APP_NAME: Spark アプリケーションの名前。
  • GCS_BUCKET: Iceberg テーブルデータを保存する Cloud Storage バケット。
  • PROJECT_ID: 実際の Google Cloud プロジェクト ID。

Spark SQL でデータを管理する

Spark セッションを構成したら、Spark SQL を使用してデータ管理オペレーションを実行します。

  1. Namespace を作成します。BigLake metastore Iceberg REST カタログでは、名前空間は BigQuery データセットに対応します。

    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME")
    spark.sql("USE NAMESPACE_NAME")
    

    NAMESPACE_NAME は、Namespace の名前に置き換えます(例: spark_lakehouse)。

  2. Iceberg 形式でベーステーブルを作成し、データを挿入します。

    spark.sql("DROP TABLE IF EXISTS base_table PURGE")
    spark.sql("CREATE TABLE base_table (id LONG) USING iceberg")
    spark.sql("INSERT INTO base_table VALUES 0, 1, 2, 3, 4")
    spark.sql("SELECT * FROM base_table").show()
    

    出力は次のようになります。

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       +---+
    
  3. 新しいデータ用の 2 つ目のテーブルを作成します。

    spark.sql("DROP TABLE IF EXISTS newdata PURGE")
    spark.sql("CREATE TABLE newdata(id LONG) USING iceberg")
    spark.sql("INSERT INTO newdata VALUES 3, 4, 5, 6")
    spark.sql("SELECT * FROM newdata").show()
    

    出力は次のようになります。

       +---+
       | id|
       +---+
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  4. 新しいデータをベーステーブルに統合します。

    spark.sql("""MERGE INTO base_table USING newdata
               ON base_table.id = newdata.id
               WHEN MATCHED THEN
                 UPDATE SET base_table.id = newdata.id
               WHEN NOT MATCHED THEN INSERT * """)
    spark.sql("SELECT * FROM base_table").show()
    

    出力は次のようになります。

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  5. ベーステーブルのレコードを更新します。

    spark.sql(
         "UPDATE base_table SET id = (id + 100) WHERE (id % 2 == 0)"
     )
    spark.sql("SELECT * FROM base_table").show()
    

    出力は次のようになります。

       +---+
       | id|
       +---+
       |  3|
       |104|
       |  5|
       |106|
       |100|
       |102|
       |  1|
       +---+
    
  6. ベーステーブルからレコードを削除します。

    spark.sql("DELETE FROM base_table WHERE (id % 2 == 0)")
    spark.sql("SELECT * FROM base_table").show()
    

    出力は次のようになります。

       +---+
       | id|
       +---+
       |  3|
       |  5|
       |  1|
       +---+
    

過去のスナップショットをクエリする

特定のスナップショット ID をクエリして、テーブルの以前のバージョンを取得します。このオペレーションは、タイム トラベルとも呼ばれます。

  1. MERGEUPDATEDELETE オペレーションの前に、テーブル バージョンのスナップショット ID を取得します。

    snapshot_ids = spark.sql(
         "SELECT snapshot_id FROM `NAMESPACE_NAME`.`base_table`.snapshots"
    ).collect()
    oldest_snapshot_id = snapshot_ids[1]["snapshot_id"]
    

    NAMESPACE_NAME は、作成した Namespace に置き換えます。

  2. 取得したスナップショット ID を使用してテーブルをクエリします。

    df = (
         spark.read.format("iceberg")
         .option("versionAsOf", oldest_snapshot_id)
         .load("base_table")
     )
    df.show()
    

    出力には、MERGE オペレーションの後、UPDATE オペレーションまたは DELETE オペレーションの前のテーブルの状態が表示されます。

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    

次のステップ