Lakehouse Iceberg REST カタログを使用する

新しいワークフローでは、Lakehouse ランタイム カタログ内の Apache Iceberg REST カタログエンドポイントを使用することをおすすめします。

このエンドポイントは信頼できる唯一の情報源として機能し、クエリエンジン間のシームレスな相互運用を可能にします。これにより、Apache Spark などのエンジンは、Google Cloud Lakehouse テーブルを整合性をもって検出、読み取り、管理できます。

このアプローチは、オープンソース エンジンを使用して Cloud Storage のデータにアクセスし、BigQuery などの他のエンジンとの相互運用性が必要な場合に適しています。きめ細かいアクセス制御のための認証情報ベンダーや、クロスリージョン レプリケーションと障害復旧などの機能をサポートしています。

一方、BigQuery 用のカスタム Apache Iceberg カタログ エンドポイントは、以前のインテグレーションです。既存のワークフローでは引き続き使用できますが、REST カタログではより標準化された機能豊富なエクスペリエンスが提供されます。

始める前に

続行する前に、Lakehouse ランタイム カタログIceberg REST カタログ エンドポイントの概要をよく理解してください。

  1. Google Cloud プロジェクトに対して課金が有効になっていることを確認します

  2. BigLake API を有効にします。

    API を有効にするために必要なロール

    API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。詳しくは、ロールを付与する方法をご覧ください。

    API の有効化

必要なロール

Lakehouse ランタイム カタログで Apache Iceberg REST カタログ エンドポイントを使用するために必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

  • カタログ ユーザー アクセス、ストレージ アクセス、カタログの認証情報ベンダー モードの管理などの管理タスクを実行します。
  • 認証情報ベンディング モードでテーブルデータを読み取る: プロジェクトに対する BigLake 閲覧者 roles/biglake.viewer
  • 認証情報ベンディング モードでテーブルデータを書き込む: プロジェクトに対する BigLake 編集者 roles/biglake.editor
  • 非認証情報ベンディング モードでカタログ リソースとテーブルデータを読み取る:
  • カタログ リソースを管理し、非認証情報ベンディング モードでテーブルデータを書き込む:
  • BigQuery カタログ連携を使用してデータ操作言語(DML)オペレーションを実行します。
    • プロジェクトに対する BigQuery データ編集者roles/bigquery.dataEditor
    • Cloud Storage バケットに対するストレージ管理者 roles/storage.admin)。Managed Service for Apache Spark などのクエリエンジンを使用して DML オペレーションを実行する場合は、そのエンジンでジョブを実行するために使用するサービス アカウントにこれらのロールを付与します。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

制限事項

Apache Iceberg REST カタログ エンドポイントには次の制限があります。

一般的な制限事項

  • Trino は、Compute Engine 2.3 イメージ バージョン 2.3.16 以降で Managed Service for Apache Spark を使用する場合にのみ、BigQuery カタログ フェデレーションでサポートされます。
  • 認証情報ベンディング モードを使用する場合は、io-impl プロパティを org.apache.iceberg.gcp.gcs.GCSFileIO に設定する必要があります。デフォルトの org.apache.iceberg.hadoop.HadoopFileIO はサポートされていません。

テーブルの制限事項

  • Apache Iceberg REST Catalog エンドポイントで管理されるテーブルは、行レベルや列レベルのセキュリティなどのきめ細かいアクセス制御(FGAC)をサポートしていません。

データの上限

  • サポートされているのは Parquet ファイルのみです。BigQuery で Parquet ファイルを処理する方法の詳細については、Cloud Storage からの Parquet データの読み込みをご覧ください。
  • Iceberg metadata.json ファイルのサイズは 1 MB に制限されています。この上限の引き上げをリクエストするには、Google アカウント チームにお問い合わせください。

クエリの制限事項

  • Apache Iceberg REST カタログ エンドポイントで管理される Apache Iceberg テーブルのビューは、BigQuery で作成できません。
  • Apache Iceberg メタデータ テーブル(.snapshots.files など)は、5 つの部分からなる名前識別子を使用して BigQuery でクエリできません。これらのテーブルは Spark を使用してクエリできます。

Iceberg REST カタログ エンドポイントを設定する

カタログを設定する前に、 Apache Iceberg REST カタログ エンドポイントの概要を読んで、リソース階層、カタログ タイプ、命名構造を理解することをおすすめします。

Lakehouse ランタイム カタログで Apache Iceberg REST カタログ エンドポイントを使用する場合の一般的な手順は次のとおりです。

  1. Iceberg REST カタログ エンドポイントの概要に基づいて、カタログ ウェアハウスのロケーション(Cloud Storage または BigQuery)を選択します。
  2. Cloud Storage gs:// ウェアハウスを使用している場合は、ウェアハウスの場所を指すカタログを作成します。
  3. Apache Iceberg REST カタログ エンドポイントを使用するようにクライアント アプリケーションを構成します。
  4. テーブルを整理するための Namespace またはスキーマを作成します。
  5. 構成したクライアントを使用してテーブルを作成し、クエリを実行します。

カタログを作成

エンドユーザー認証情報または認証情報ベンディング モードを使用するカタログを作成できます。

  • エンドユーザー認証情報を使用すると、カタログはアクセスしているエンドユーザーの ID を Cloud Storage に渡して、認可チェックを行います。

  • 認証情報のベンディングは、Lakehouse ランタイム カタログ管理者が Lakehouse ランタイム カタログ リソースの権限を直接制御できるようにするストレージ アクセス委任メカニズムです。これにより、カタログ ユーザーが Cloud Storage バケットに直接アクセスする必要がなくなります。これにより、Google Cloud Lakehouse 管理者は特定のデータファイルに対する権限をユーザーに付与できます。

エンドユーザー認証情報

コンソール

  1. Google Cloud コンソールで [Lakehouse] ページを開きます。

    Google Cloud Lakehouse に移動

  2. [カタログを作成] をクリックします。

  3. [Cloud Storage バケットを選択] フィールドに、カタログで使用する Cloud Storage バケットの名前を入力します。または、[参照] をクリックして、既存のバケットを選択するか、新しいバケットを作成します。Cloud Storage バケットごとに 1 つのカタログのみを使用できます。

  4. [Authentication method] で [End-user credentials] を選択します。

  5. [作成] をクリックします。

gcloud

gcloud biglake iceberg catalogs create コマンドを使用します。

gcloud biglake iceberg catalogs create \
    CATALOG_NAME \
    --project PROJECT_ID \
    --catalog-type gcs-bucket \
    --credential-mode end-user \
    [--primary-location LOCATION]

次のように置き換えます。

  • CATALOG_NAME: カタログの名前。Apache Iceberg 用マネージド Lakehouse REST カタログ テーブルの場合、この名前は REST カタログで使用される Cloud Storage バケット ID と一致することがよくあります。たとえば、バケットが gs://bucket-id の場合、カタログ名は bucket-id になることがあります。この名前は、BigQuery からこれらのテーブルをクエリするときにカタログ識別子としても使用されます。
  • PROJECT_ID: 実際の Google Cloud プロジェクト ID。
  • LOCATION: (省略可)カタログのプライマリ リージョン。Cloud Storage の米国または EU のマルチリージョン バケットの場合は、US または EU を指定して、対応する BigQuery リージョンからカタログにアクセスできるようにします。詳細については、カタログのリージョンをご覧ください。

認証情報ベンディング モード

カタログ管理者は、カタログの作成時または更新時に認証情報ベンダーを有効にします。カタログ ユーザーは、Apache Iceberg REST カタログ エンドポイントを構成するときにアクセス委任を指定することで、ダウンスコープされたストレージ認証情報を返すように Apache Iceberg REST カタログ エンドポイントに指示できます。

コンソール

  1. Google Cloud コンソールで、[Lakehouse] ページを開きます。

    Google Cloud Lakehouse に移動

  2. [ カタログを作成] をクリックします。[カタログの作成] ページが開きます。

  3. [Cloud Storage バケットを選択] に、カタログで使用する Cloud Storage バケットの名前を入力します。または、[参照] をクリックして、既存のバケットのリストから選択するか、新しいバケットを作成します。Cloud Storage バケットごとに 1 つのカタログのみを使用できます。

  4. [認証方法] で、[認証情報ベンダーモード] を選択します。

  5. [作成] をクリックします。

    カタログが作成され、[カタログの詳細] ページが開きます。

  6. [認証方法] で、[バケットの権限を設定] をクリックします。

  7. ダイアログで [確認] をクリックします。

    これにより、カタログのサービス アカウントにストレージ バケットに対するストレージ オブジェクト ユーザー ロールがあることが確認されます。

クライアント アプリケーションを構成する

カタログを作成したら、それを使用するようにクライアント アプリケーションを構成します。これらの例は、認証情報ベンダーの有無にかかわらず構成する方法を示しています。

クラスタ

Managed Service for Apache Spark で Apache Iceberg REST カタログ エンドポイントを使用して Spark を使用するには、まず Apache Iceberg コンポーネントを含むクラスタを作成します。

gcloud dataproc clusters create CLUSTER_NAME \
    --enable-component-gateway \
    --project=PROJECT_ID \
    --region=REGION \
    --optional-components=ICEBERG \
    --image-version=DATAPROC_VERSION

次のように置き換えます。

  • CLUSTER_NAME: クラスタの名前。
  • PROJECT_ID: 実際の Google Cloud プロジェクト ID。
  • REGION: Managed Service for Apache Spark クラスタのリージョン。
  • DATAPROC_VERSION: Managed Service for Apache Spark イメージ バージョン(例: 2.2)。

クラスタを作成したら、Apache Iceberg REST カタログ エンドポイントを使用するように Spark セッションを構成します。

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/REST_API_VERSION/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'WAREHOUSE_PATH') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

次のように置き換えます。

  • CATALOG_NAME: Apache Iceberg REST カタログ エンドポイントの名前。
  • APP_NAME: Spark セッションの名前。
  • REST_API_VERSION: API の安定版の場合は v1 に設定します。データ リネージの生成に関する既知の問題を回避する必要がある場合は、v1beta に設定します。
  • WAREHOUSE_PATH: ウェアハウスのパス。gs://CLOUD_STORAGE_BUCKET_NAME を使用します。BigQuery カタログ フェデレーションを使用するには、BigQuery でカタログ フェデレーションを使用するをご覧ください。
  • PROJECT_ID: Apache Iceberg REST カタログ エンドポイントの使用に対して課金されるプロジェクト。Cloud Storage バケットを所有するプロジェクトとは異なる場合があります。REST API を使用する場合のプロジェクト構成の詳細については、システム パラメータをご覧ください。

認証情報ベンディングで構成する

認証情報ベンディングを使用するには、認証情報ベンディング モードのカタログを使用し、次の行を SparkSession ビルダーに追加して、Iceberg REST カタログ リクエストに X-Iceberg-Access-Delegation ヘッダーを追加し、値を vended-credentials に設定する必要があります。

.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')

認証情報ベンディングの例

次の例では、認証情報ベンダーを使用してクエリエンジンを構成します。

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/REST_API_VERSION/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

詳細については、Apache Iceberg ドキュメントの RESTCatalog のヘッダー セクションをご覧ください。

Managed Service for Apache Spark クラスタは、次のリリースで Apache Iceberg の Google 認証フローをサポートしています。

  • Managed Service for Apache Spark on Compute Engine 2.2 イメージ バージョン 2.2.65 以降。
  • Managed Service for Apache Spark on Compute Engine 2.3 イメージ バージョン 2.3.11 以降。

サーバーレス

次の構成で PySpark バッチ ワークロードを Managed Service for Apache Spark に送信します。

gcloud dataproc batches submit pyspark PYSPARK_FILE \
    --project=PROJECT_ID \
    --region=REGION \
    --version=RUNTIME_VERSION \
    --properties="\
    spark.sql.defaultCatalog=CATALOG_NAME,\
    spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
    spark.sql.catalog.CATALOG_NAME.type=rest,\
    spark.sql.catalog.CATALOG_NAME.uri=https://biglake.googleapis.com/iceberg/REST_API_VERSION/restcatalog,\
    spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_PATH,\
    spark.sql.catalog.CATALOG_NAME.io-impl=org.apache.iceberg.gcp.gcs.GCSFileIO,\
    spark.sql.catalog.CATALOG_NAME.header.x-goog-user-project=PROJECT_ID,\
    spark.sql.catalog.CATALOG_NAME.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager,\
    spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

次のように置き換えます。

  • PYSPARK_FILE: PySpark アプリケーション ファイルへの gs:// Cloud Storage パス。
  • PROJECT_ID: 実際の Google Cloud プロジェクト ID。
  • REGION: Managed Service for Apache Spark バッチ ワークロードのリージョン。
  • RUNTIME_VERSION: Managed Service for Apache Spark のランタイム バージョン(例: 2.2)。
  • CATALOG_NAME: Apache Iceberg REST カタログ エンドポイントの名前。
  • REST_API_VERSION: API の安定版の場合は v1 に設定します。データ リネージの生成に関する既知の問題を回避する必要がある場合は、v1beta に設定します。
  • WAREHOUSE_PATH: ウェアハウスのパス。gs://CLOUD_STORAGE_BUCKET_NAME を使用します。BigQuery カタログ フェデレーションを使用するには、BigQuery でカタログ フェデレーションを使用するをご覧ください。

認証情報ベンディングで構成する

認証情報ベンダーを使用するには、認証情報ベンダー モードのカタログを使用し、次の行を Managed Service for Apache Spark 構成に追加して、値が vended-credentialsX-Iceberg-Access-Delegation ヘッダーを Apache Iceberg REST カタログ エンドポイント リクエストに追加する必要があります。

.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')

認証情報ベンディングの例

次の例では、認証情報ベンダーを使用してクエリエンジンを構成します。

gcloud dataproc batches submit pyspark PYSPARK_FILE \
    --project=PROJECT_ID \
    --region=REGION \
    --version=RUNTIME_VERSION \
    --properties="\
    spark.sql.defaultCatalog=CATALOG_NAME,\
    spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
    spark.sql.catalog.CATALOG_NAME.type=rest,\
    spark.sql.catalog.CATALOG_NAME.uri=https://biglake.googleapis.com/iceberg/REST_API_VERSION/restcatalog,\
    spark.sql.catalog.CATALOG_NAME.warehouse=gs://CLOUD_STORAGE_BUCKET_NAME,\
    spark.sql.catalog.CATALOG_NAME.header.x-goog-user-project=PROJECT_ID,\
    spark.sql.catalog.CATALOG_NAME.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager,\
    spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,\
    spark.sql.catalog.CATALOG_NAME.gcs.oauth2.refresh-credentials-endpoint=https://oauth2.googleapis.com/token, \
    spark.sql.catalog.CATALOG_NAME.header.X-Iceberg-Access-Delegation=vended-credentials"

詳細については、Apache Iceberg ドキュメントの RESTCatalog のヘッダー セクションをご覧ください。

Managed Service for Apache Spark は、次のランタイム バージョンで Apache Iceberg の Google 認可フローをサポートしています。

  • Managed Service for Apache Spark 2.2 ランタイム 2.2.60 以降
  • Managed Service for Apache Spark 2.3 ランタイム 2.3.10 以降

Trino

Apache Iceberg REST カタログ エンドポイントで Trino を使用するには、Trino コンポーネントを含む Managed Service for Apache Spark クラスタを作成し、gcloud dataproc clusters create --properties フラグを使用してカタログ プロパティを構成します。次の例では、CATALOG_NAME という名前の Trino カタログを作成します。

gcloud dataproc clusters create CLUSTER_NAME \
    --enable-component-gateway \
    --region=REGION \
    --image-version=DATAPROC_VERSION \
    --network=NETWORK_ID \
    --optional-components=TRINO \
    --properties="\
    trino-catalog:CATALOG_NAME.connector.name=iceberg,\
    trino-catalog:CATALOG_NAME.iceberg.catalog.type=rest,\
    trino-catalog:CATALOG_NAME.iceberg.rest-catalog.uri=https://biglake.googleapis.com/iceberg/REST_API_VERSION/restcatalog,\
    trino-catalog:CATALOG_NAME.iceberg.rest-catalog.warehouse=WAREHOUSE_PATH,\
    trino-catalog:CATALOG_NAME.iceberg.rest-catalog.biglake.project-id=PROJECT_ID,\
    trino-catalog:CATALOG_NAME.iceberg.rest-catalog.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager"

次のように置き換えます。

  • CLUSTER_NAME: クラスタの名前。
  • REGION: Managed Service for Apache Spark クラスタのリージョン。
  • DATAPROC_VERSION: Managed Service for Apache Spark のイメージ バージョン(例: 2.2)。
  • NETWORK_ID: クラスタ ネットワーク ID。詳細については、Managed Service for Apache Spark クラスタのネットワーク構成をご覧ください。
  • CATALOG_NAME: Apache Iceberg REST カタログ エンドポイントを使用する Trino カタログの名前。
  • REST_API_VERSION: API の安定版の場合は v1 に設定します。データ リネージの生成に関する既知の問題を回避する必要がある場合は、v1beta に設定します。
  • WAREHOUSE_PATH: ウェアハウスのパス。gs://CLOUD_STORAGE_BUCKET_NAME を使用します。
  • PROJECT_ID: Lakehouse ランタイム カタログに使用する Google Cloud プロジェクト ID。

クラスタの作成後、メイン VM インスタンスに接続し、Trino CLI を使用します。

trino --catalog=CATALOG_NAME

Managed Service for Apache Spark Trino は、次のリリースで Apache Iceberg の Google 認可フローをサポートしています。

  • Managed Service for Apache Spark on Compute Engine 2.2 ランタイム バージョン 2.2.65 以降
  • Managed Service for Apache Spark on Compute Engine 2.3 ランタイム バージョン 2.3.11 以降
  • Managed Service for Apache Spark on Compute Engine 3.0 はサポートされていません。

認証情報ベンディングで構成する

認証情報のベンディングは、Managed Service for Apache Spark Trino ではサポートされていません。

Apache Iceberg 1.10 以降

オープンソースの Apache Iceberg 1.10 以降のリリースには、GoogleAuthManager での Google 認証フローのサポートが組み込まれています。次の例は、Lakehouse ランタイム カタログの Apache Iceberg REST カタログ エンドポイントを使用するように Spark を構成する方法を示しています。

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/REST_API_VERSION/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'WAREHOUSE_PATH') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

次のように置き換えます。

  • CATALOG_NAME: Apache Iceberg REST カタログ エンドポイントの名前。
  • APP_NAME: Spark セッションの名前。
  • REST_API_VERSION: API の安定版の場合は v1 に設定します。データ リネージの生成に関する既知の問題を回避する必要がある場合は、v1beta に設定します。
  • WAREHOUSE_PATH: ウェアハウスのパス。gs://CLOUD_STORAGE_BUCKET_NAME を使用します。BigQuery カタログ フェデレーションを使用するには、BigQuery でカタログ フェデレーションを使用するをご覧ください。
  • PROJECT_ID: Apache Iceberg REST カタログ エンドポイントの使用に対して課金されるプロジェクト。Cloud Storage バケットを所有するプロジェクトとは異なる場合があります。REST API を使用する場合のプロジェクト構成の詳細については、システム パラメータをご覧ください。

認証情報ベンディングで構成する

上記の例では、認証情報のベンディングは使用されていません。認証情報ベンダーを使用するには、認証情報ベンダーモードのカタログを使用し、SparkSession ビルダーに次の行を追加して、値が vended-credentialsX-Iceberg-Access-Delegation ヘッダーを Apache Iceberg REST カタログ エンドポイント リクエストに追加する必要があります。

.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')

認証情報ベンディングの例

次の例では、認証情報ベンダーを使用してクエリエンジンを構成します。

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/REST_API_VERSION/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f'spark.sql.catalog.{catalog_name}.rest.auth.type', 'org.apache.iceberg.gcp.auth.GoogleAuthManager') \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

詳細については、Apache Iceberg ドキュメントの RESTCatalog のヘッダー セクションをご覧ください。

以前の Apache Iceberg リリース

1.10 より前のオープンソースの Apache Iceberg リリースでは、次の構成でセッションを構成することで、標準の OAuth 認証を構成できます。

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1,org.apache.iceberg:iceberg-gcp-bundle:1.9.1') \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/REST_API_VERSION/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'WAREHOUSE_PATH') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f"spark.sql.catalog.{catalog_name}.token", "TOKEN") \
  .config(f"spark.sql.catalog.{catalog_name}.oauth2-server-uri", "https://oauth2.googleapis.com/token") \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

次のように置き換えます。

  • CATALOG_NAME: Apache Iceberg REST カタログ エンドポイントの名前。
  • APP_NAME: Spark セッションの名前。
  • REST_API_VERSION: API の安定版の場合は v1 に設定します。データ リネージの生成に関する既知の問題を回避する必要がある場合は、v1beta に設定します。
  • WAREHOUSE_PATH: ウェアハウスのパス。gs://CLOUD_STORAGE_BUCKET_NAME を使用します。BigQuery カタログ フェデレーションを使用するには、BigQuery でカタログ フェデレーションを使用するをご覧ください。
  • PROJECT_ID: Apache Iceberg REST カタログ エンドポイントの使用に対して課金されるプロジェクト。Cloud Storage バケットを所有するプロジェクトとは異なる場合があります。REST API を使用する場合のプロジェクト構成の詳細については、システム パラメータをご覧ください。
  • TOKEN: 1 時間有効な認証トークン(gcloud auth application-default print-access-token を使用して生成されたトークンなど)。

認証情報ベンディングで構成する

上記の例では、認証情報のベンディングは使用されていません。認証情報ベンダーを使用するには、認証情報ベンダーモードのカタログを使用し、SparkSession ビルダーに次の行を追加して、値が vended-credentialsX-Iceberg-Access-Delegation ヘッダーを Apache Iceberg REST カタログ エンドポイント リクエストに追加する必要があります。

.config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials')

認証情報ベンディングの例

次の例では、認証情報ベンダーを使用してクエリエンジンを構成します。

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

catalog_name = "CATALOG_NAME"
spark = SparkSession.builder.appName("APP_NAME") \
  .config(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.type', 'rest') \
  .config(f'spark.sql.catalog.{catalog_name}.uri', 'https://biglake.googleapis.com/iceberg/REST_API_VERSION/restcatalog') \
  .config(f'spark.sql.catalog.{catalog_name}.warehouse', 'gs://CLOUD_STORAGE_BUCKET_NAME') \
  .config(f'spark.sql.catalog.{catalog_name}.header.x-goog-user-project', 'PROJECT_ID') \
  .config(f"spark.sql.catalog.{catalog_name}.token", "TOKEN") \
  .config(f"spark.sql.catalog.{catalog_name}.oauth2-server-uri", "https://oauth2.googleapis.com/token") \
  .config(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.gcp.gcs.GCSFileIO') \
  .config(f'spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation','vended-credentials') \
  .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
  .config('spark.sql.defaultCatalog', 'CATALOG_NAME') \
  .getOrCreate()

詳細については、Apache Iceberg ドキュメントの RESTCatalog のヘッダー セクションをご覧ください。

Namespace またはスキーマを作成する

クライアントを構成したら、テーブルを整理するための Namespace またはスキーマを作成します。Namespace またはスキーマを作成する構文は、クエリエンジンによって異なります。次の例は、Spark と Trino を使用してこれらを作成する方法を示しています。

Spark

Cloud Storage ウェアハウス

spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
spark.sql("USE NAMESPACE_NAME;")

NAMESPACE_NAME は、Namespace の名前に置き換えます。

Trino

Cloud Storage ウェアハウス

CREATE SCHEMA IF NOT EXISTS  CATALOG_NAME.SCHEMA_NAME;
USE CATALOG_NAME.SCHEMA_NAME;

次のように置き換えます。

  • CATALOG_NAME: Apache Iceberg REST カタログ エンドポイントを使用する Trino カタログの名前。
  • SCHEMA_NAME: スキーマの名前。

BigQuery でテーブルをクエリする

BigQuery の Apache Iceberg REST カタログ エンドポイントを使用して作成したテーブルのクエリ方法は、Cloud Storage バケット ウェアハウスまたは BigQuery フェデレーションを使用しているかどうかによって異なります。

  • Cloud Storage バケット ウェアハウス: gs:// ウェアハウス パスを使用してクライアントを構成した場合は、4 部構成の名前(P.C.N.T)project.catalog.namespace.table を使用して BigQuery からテーブルをクエリします。P.C.N.T 構造の詳細については、Iceberg REST カタログのコンセプトをご覧ください。catalog コンポーネントは、Lakehouse ランタイム カタログ カタログ リソースの名前です。テーブルのクエリの詳細については、テーブルのクエリをご覧ください。
  • BigQuery 連携: bq:// ウェアハウス パスを使用してクライアントを構成した場合、作成したテーブルは BigQuery に表示され、標準の BigQuery SQL を使用して直接クエリできます。

    SELECT * FROM `NAMESPACE_NAME.TABLE_NAME`;

    次のように置き換えます。

    • NAMESPACE_NAME: 名前空間の名前。
    • TABLE_NAME: テーブルの名前。

BigQuery でカタログ連携を使用する

カタログ フェデレーションについては、Iceberg REST カタログのコンセプトをご覧ください。フェデレーションを有効にするには、クライアント アプリケーションを構成するのクライアント構成例の WAREHOUSE_PATH フィールドで、bq://projects/PROJECT_ID ウェアハウス形式を使用してクライアントを構成します。bq://projects/PROJECT_ID/locations/LOCATION 形式を使用して BigQuery のロケーションを含め、今後のリクエストを単一のロケーションに制限することもできます。

これらのリソースは BigQuery によって管理されるため、該当する必要な権限が必要です。

連携用にクライアントを構成したら、連携テーブルの Namespace を作成できます。

Spark

BigQuery カタログ連携を使用するには、LOCATION 句と DBPROPERTIES 句を含めます。

spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME LOCATION 'gs://BUCKET_NAME/NAMESPACE_NAME' WITH DBPROPERTIES ('gcp-region' = 'LOCATION');")
spark.sql("USE NAMESPACE_NAME;")

次のように置き換えます。

  • NAMESPACE_NAME: Namespace の名前。
  • BUCKET_NAME: カタログで使用している Cloud Storage バケット。
  • LOCATION: BigQuery のロケーション。デフォルト値は US マルチリージョンです。

Trino

BigQuery カタログ連携を使用するには、LOCATION プロパティと gcp-region プロパティを含めます。

CREATE SCHEMA IF NOT EXISTS  CATALOG_NAME.SCHEMA_NAME WITH ( LOCATION = 'gs://BUCKET_NAME/SCHEMA_NAME', "gcp-region" = 'LOCATION');
USE CATALOG_NAME.SCHEMA_NAME;

次のように置き換えます。

  • CATALOG_NAME: Apache Iceberg REST カタログ エンドポイントを使用する Trino カタログの名前。
  • SCHEMA_NAME: スキーマの名前。
  • BUCKET_NAME: カタログで使用している Cloud Storage バケット。
  • LOCATION: BigQuery のロケーション。デフォルト値は US マルチリージョンです。

料金

料金の詳細については、Google Cloud Lakehouse の料金をご覧ください。

次のステップ