Spark と Lakehouse ランタイム カタログでレイクハウスを作成する

レイクハウス アーキテクチャは、データレイクの柔軟性とデータ ウェアハウスのデータ管理機能を組み合わせたものです。このドキュメントでは、 でレイクハウスを設定する方法について説明します。 Google Cloudテーブル形式として Apache Iceberg、処理に Managed Service for Apache Spark、統合メタデータ管理に Lakehouse ランタイム カタログ Iceberg REST カタログを使用します。

このアーキテクチャでは、Iceberg などのオープン テーブル形式を使用して、トランザクションやスキーマの進化などのデータ ウェアハウス機能を Cloud Storage のデータに追加します。このアプローチでは、さまざまなエンジンからアクセスできるデータの信頼できる唯一の情報源が作成されます。

Managed Service for Apache Spark、Cloud Storage、Lakehouse REST Catalog などの Lakehouse アーキテクチャのコンポーネントを示す図。
レイクハウス アーキテクチャ図。

始める前に

  1. アカウントにログインします。 Google Cloud を初めて使用する場合は、 アカウントを作成して、実際のシナリオで Google プロダクトのパフォーマンスを評価してください。 Google Cloud新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  7. Verify that you have the permissions required to complete this guide.

  8. Verify that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  10. Iceberg データを保存する Cloud Storage バケットを作成します。

必要なロール

このページの例を実行するには、特定の Identity and Access Management(IAM)ロールが必要です。組織のポリシーによっては、これらのロールがすでに付与されている場合があります。ロールの付与を確認するには、 ロールを付与する必要がありますか?をご覧ください。

ロールの付与については、 プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

ユーザーロール

Managed Service for Apache Spark クラスタの作成に必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

サービス アカウント ロール

Compute Engine のデフォルトのサービス アカウントに、Managed Service for Apache Spark クラスタの作成に必要な 権限を付与するには、 プロジェクトに対する Dataproc ワーカー roles/dataproc.worker)IAM ロールを Compute Engine のデフォルトのサービス アカウントに付与するよう管理者に依頼します。

Managed Service for Apache Spark クラスタを作成する

Iceberg と Jupyter のオプション コンポーネントを使用して、Managed Service for Apache Spark クラスタを作成します。

  1. クラスタを作成するには、次の gcloud コマンドを実行します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.3-debian12 \
        --optional-components=ICEBERG,JUPYTER \
        --enable-component-gateway \
        --properties 'dataproc:dataproc.lineage.enabled=true'
    

    次のように置き換えます。

    • CLUSTER_NAME: クラスタの名前。
    • PROJECT_ID: 実際の Google Cloud プロジェクト ID。
    • REGION: クラスタのリージョン(例: us-central1)。 Google Cloud

    なお、Lakehouse ランタイム カタログ Iceberg REST カタログが正しく動作するために dataproc:dataproc.lineage.enabled=true を設定する必要はありません。これは、以下のデータリネージの例でリネージ トラッキングを行うために追加されています。

  2. Jupyter ノートブックを使用してクラスタに接続します。Vertex AI Workbench ノートブックを使用するか、クラスタでノートブックを直接起動できます。

Spark セッションを構成する

Jupyter ノートブックで、Lakehouse ランタイム カタログ Iceberg REST カタログを使用するように構成された Spark セッションを作成します。

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"

spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/v1beta/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://GCS_BUCKET') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled', 'false') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', catalog_name) \
  .getOrCreate()

次のように置き換えます。

  • CATALOG_NAME: Iceberg カタログの名前(例: iceberg_catalog)。
  • APP_NAME: Spark アプリケーションの名前。
  • GCS_BUCKET: Iceberg テーブルデータを保存する Cloud Storage バケット。
  • PROJECT_ID: 実際の Google Cloud プロジェクト ID。

Spark SQL でデータを管理する

Spark セッションを構成したら、Spark SQL を使用してデータ管理オペレーションを実行します。

  1. Namespace を作成します。Lakehouse ランタイム カタログ Iceberg REST カタログでは、Namespace は BigQuery データセットに対応します。

    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME")
    spark.sql("USE NAMESPACE_NAME")
    

    NAMESPACE_NAME は、Namespace の名前に置き換えます(例: spark_lakehouse)。

  2. Iceberg 形式でベーステーブルを作成し、データを挿入します。

    spark.sql("DROP TABLE IF EXISTS base_table PURGE")
    spark.sql("CREATE TABLE base_table (id LONG) USING iceberg")
    spark.sql("INSERT INTO base_table VALUES 0, 1, 2, 3, 4")
    spark.sql("SELECT * FROM base_table").show()
    

    出力は次のようになります。

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       +---+
    
  3. 新しいデータの 2 つ目のテーブルを作成します。

    spark.sql("DROP TABLE IF EXISTS newdata PURGE")
    spark.sql("CREATE TABLE newdata(id LONG) USING iceberg")
    spark.sql("INSERT INTO newdata VALUES 3, 4, 5, 6")
    spark.sql("SELECT * FROM newdata").show()
    

    出力は次のようになります。

       +---+
       | id|
       +---+
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  4. 新しいデータをベーステーブルにマージします。

    spark.sql("""MERGE INTO base_table USING newdata
               ON base_table.id = newdata.id
               WHEN MATCHED THEN
                 UPDATE SET base_table.id = newdata.id
               WHEN NOT MATCHED THEN INSERT * """)
    spark.sql("SELECT * FROM base_table").show()
    

    出力は次のようになります。

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    
  5. ベーステーブルのレコードを更新します。

    spark.sql(
         "UPDATE base_table SET id = (id + 100) WHERE (id % 2 == 0)"
     )
    spark.sql("SELECT * FROM base_table").show()
    

    出力は次のようになります。

       +---+
       | id|
       +---+
       |  3|
       |104|
       |  5|
       |106|
       |100|
       |102|
       |  1|
       +---+
    
  6. ベーステーブルからレコードを削除します。

    spark.sql("DELETE FROM base_table WHERE (id % 2 == 0)")
    spark.sql("SELECT * FROM base_table").show()
    

    出力は次のようになります。

       +---+
       | id|
       +---+
       |  3|
       |  5|
       |  1|
       +---+
    

過去のスナップショットをクエリする

特定のスナップショット ID をクエリして、テーブルの以前のバージョンを取得します。このオペレーションはタイムトラベルとも呼ばれます。

  1. MERGEUPDATEDELETE オペレーションの前のテーブル バージョンのスナップショット ID を取得します。

    snapshot_ids = spark.sql(
         "SELECT snapshot_id FROM `NAMESPACE_NAME`.`base_table`.snapshots"
    ).collect()
    oldest_snapshot_id = snapshot_ids[1]["snapshot_id"]
    

    NAMESPACE_NAME は、作成した Namespace に置き換えます。

  2. 取得したスナップショット ID を使用してテーブルをクエリします。

    df = (
         spark.read.format("iceberg")
         .option("versionAsOf", oldest_snapshot_id)
         .load("base_table")
     )
    df.show()
    

    出力には、MERGE オペレーションの後、UPDATE オペレーションまたは DELETE オペレーションの前のテーブルの状態が表示されます。

       +---+
       | id|
       +---+
       |  0|
       |  1|
       |  2|
       |  3|
       |  4|
       |  5|
       |  6|
       +---+
    

データリネージを検出する

Managed Service for Apache Spark 2.2 以降のイメージ バージョンで利用可能な データリネージを使用して、Lakehouse ランタイム カタログ Iceberg REST カタログ テーブル 間のデータの移動を追跡できます。

データリネージの例

  1. ソースとターゲットの Iceberg テーブルを作成し、データをコピーします。

    spark.sql("DROP TABLE IF EXISTS source_table PURGE")
    spark.sql("DROP TABLE IF EXISTS target_table PURGE")
    spark.sql("CREATE TABLE source_table (id LONG) USING iceberg")
    spark.sql("""CREATE TABLE target_table
      USING ICEBERG
      AS SELECT max(id) as top_id FROM source_table
      """)
    
  2. コンソールで、Knowledge Catalog の [Search] ページに移動します。 Google Cloud

    検索に移動

  3. いずれかのテーブルを検索し、[Lineage] タブをクリックします。

     Google Cloud コンソールの Knowledge Catalog ページのデータ リネージの例。
    コンソールの Knowledge Catalog ページにあるデータリネージ グラフの例。 Google Cloud

    データリネージは、Lakehouse ランタイム カタログ Iceberg REST カタログ テーブルの論理表現(Lakehouse ランタイム カタログ テーブル)と物理表現(Cloud Storage)の両方を認識します。

データリネージの既知の問題

一部の Managed Service for Apache Spark クラスタでは、完全なデータリネージが OpenLineageライブラリの問題により生成されないことがあります。 回避策: Spark セッション構成で、spark.sql.catalog.{catalog_name}.uri プロパティを https://biglake.googleapis.com/iceberg/v1beta/restcatalog に設定します。

次のステップ