Iceberg 1.10 を使用して Managed Service for Apache Spark の Lakehouse ランタイム カタログを構成する

このドキュメントでは、 Lakehouse ランタイム カタログ内で BigQuery 用のカスタム Apache Iceberg カタログを構成する方法について説明します。

これは、Managed Service for Apache Spark クラスタまたはManaged Service for Apache Sparkのいずれかを使用して設定できます。これにより、Apache Spark や Apache Flink などのオープンソース エンジンとシームレスに連携する単一の共有カタログが Google Cloud Lakehouse 全体に作成されます。

始める前に

  1. Google Cloud プロジェクトに対する課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを 確認する方法をご覧ください。
  2. BigQuery API と Managed Service for Apache Spark API を有効にします。

    API を有効にする

  3. Lakehouse ランタイム カタログについて理解する。

必要なロール

Lakehouse ランタイム カタログを構成するために必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

必要な権限は、カスタム ロールや他の事前定義 ロールから取得することもできます。

Managed Service for Apache Spark でメタストアを構成する

Spark または Flink を使用して、Managed Service for Apache Spark で Lakehouse ランタイム カタログを構成できます。

Spark

  1. 新しいクラスタを構成します。新しい Managed Service for Apache Spark クラスタを作成するには、 Lakehouse ランタイム カタログを使用するために必要な設定を含む次の gcloud dataproc clusters create コマンドを実行します。

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    次のように置き換えます。

    • CLUSTER_NAME: Managed Service for Apache Spark クラスタの名前。
    • PROJECT_ID: クラスタを作成する Google Cloud プロジェクト の ID。
    • LOCATION: クラスタを作成する Compute Engine リージョン。
  2. 次のいずれかの方法で Spark ジョブを送信します。

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
        spark.sql.catalog.CATALOG_NAME.type=bigquery,\
        spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID,\
        spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION,\
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    次のように置き換えます。

    • PROJECT_ID: Managed Service for Apache Spark クラスタを含む Google Cloud プロジェクト の ID。
    • CLUSTER_NAME: Spark SQL ジョブの実行に使用している Managed Service for Apache Spark クラスタの名前。
    • REGION: クラスタが配置されている Compute Engine リージョン
    • LOCATION: BigQuery リソースのロケーション。
    • CATALOG_NAME: SQL ジョブで使用する Spark カタログの名前。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。 この値は gs:// で始まります。
    • SPARK_SQL_COMMAND: 実行する Spark SQL クエリ。このクエリには、リソースを作成するコマンドを含めます。たとえば、名前空間とテーブルを作成します。

    spark-sql CLI

    1. コンソールで [VM インスタンス] ページに移動します。 Google Cloud

      [VM インスタンス] に移動

    2. Managed Service for Apache Spark VM インスタンスに接続するには、Managed Service for Apache Spark クラスタのメイン VM インスタンス名(クラスタ名の後に -m サフィックスが付いたもの)が一覧表示されている行の [SSH] をクリックします。出力は次のようになります。

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. ターミナルで、次の Lakehouse ランタイム カタログの初期化コマンドを実行します。

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.type=bigquery \
          --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      次のように置き換えます。

      • CATALOG_NAME: SQL ジョブで使用している Spark カタログの名前。
      • PROJECT_ID: Spark カタログがリンクされる Lakehouse ランタイム カタログ カタログの Google Cloud プロジェクト ID。
      • LOCATION: Lakehouse ランタイム カタログの Google Cloud ロケーション 。
      • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。 この値は gs:// で始まります。

      クラスタに正常に接続すると、Spark ターミナルに spark-sql プロンプトが表示されます。これを使用して Spark ジョブを送信できます。

      spark-sql (default)>
      
  1. オプションの Flink コンポーネントが有効になっている Managed Service for Apache Spark クラスタを作成し 、Managed Service for Apache Spark 2.2 以降を使用していることを確認します。
  2. コンソールで、[VM インスタンス] ページに移動します。 Google Cloud

    [VM インスタンス] に移動

  3. 仮想マシン インスタンスのリストで、[SSH] をクリックして、Managed Service for Apache Spark クラスタのメイン VM インスタンス(クラスタ名の後に -m サフィックスが付いたもの)に接続します。

  4. Lakehouse ランタイム カタログ用に Apache Iceberg カスタム カタログ プラグインを構成します。

    FLINK_VERSION=1.20
    ICEBERG_VERSION=1.10.0
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/${ICEBERG_VERSION}/iceberg-bigquery-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/${ICEBERG_VERSION}/iceberg-gcp-bundle-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/${ICEBERG_VERSION}/iceberg-gcp-${ICEBERG_VERSION}.jar -P lib
  5. YARN で Flink セッションを開始します。

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Flink でカタログを作成します。

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp.bigquery.project-id'='PROJECT_ID',
    'gcp.bigquery.location'='LOCATION'
    );

    次のように置き換えます。

    • CATALOG_NAME: Lakehouse ランタイム カタログ カタログにリンクされている Flink カタログ識別子。
    • WAREHOUSE_DIRECTORY: ウェアハウス ディレクトリのベースパス(Flink がファイルを作成する Cloud Storage フォルダ)。この値は gs:// で始まります。
    • PROJECT_ID: Flink カタログがリンクされる Lakehouse ランタイム カタログ カタログのプロジェクト ID。
    • LOCATION: BigQuery リソースの ロケーション

これで、Flink セッションが Lakehouse ランタイム カタログに接続され、Flink SQL コマンドを実行できるようになりました。

Lakehouse ランタイム カタログに接続したので、Lakehouse ランタイム カタログに保存されているメタデータに基づいてリソースを作成して表示できます。

たとえば、インタラクティブ Flink SQL セッションで次のコマンドを実行して、Apache Iceberg のデータベースとテーブルを作成してみます。

  1. カスタムの Apache Iceberg カタログを使用する:

    USE CATALOG CATALOG_NAME;

    CATALOG_NAME は、Flink カタログ ID に置き換えます。

  2. データベースを作成する。これにより、BigQuery にデータセットが作成されます。

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    DATABASE_NAME は、新しいデータベースの名前に置き換えます。

  3. 作成したデータベースを使用する。

    USE DATABASE_NAME;
  4. Apache Iceberg テーブルを作成します。次の例では、販売テーブルを作成します。

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    ICEBERG_TABLE_NAME は、新しいテーブルの名前に置き換えます。

  5. テーブルのメタデータを表示する:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. データベース内のテーブルを一覧表示する。

    SHOW TABLES;

テーブルにデータを取り込む

前のセクションで Apache Iceberg テーブルを作成したら、Flink DataGen をデータソースとして使用して、リアルタイム データをテーブルに取り込むことができます。このワークフローの例を次に示します。

  1. DataGen を使用して一時テーブルを作成します。

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    次のように置き換えます。

    • DATABASE_NAME: 一時テーブルを保存するデータベースの名前。
    • TEMP_TABLE_NAME: 一時テーブルの名前。
    • ICEBERG_TABLE_NAME: 前のセクションで作成した Apache Iceberg テーブルの名前。
  2. 並列処理を 1 に設定します。

    SET 'parallelism.default' = '1';
  3. チェックポイント間隔を設定します。

    SET 'execution.checkpointing.interval' = '10second';
  4. チェックポイントを設定します。

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. リアルタイム ストリーミング ジョブを開始します。

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    出力は次のようになります。

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. ストリーミング ジョブのステータスを確認する手順は次のとおりです。

    1. Google Cloud コンソールで、[クラスタ] ページに移動します。

      [クラスタ] に移動

    2. クラスタを選択します。

    3. [ウェブ インターフェース] タブをクリックします。

    4. [YARN ResourceManager] リンクをクリックします。

    5. [YARN ResourceManager] インターフェースで、Flink セッションを見つけて、[Tracking UI] の [ApplicationMaster] リンクをクリックします。

    6. [ステータス] 列で、ジョブのステータスが [実行中] であることを確認します。

  7. Flink SQL クライアントでストリーミング データにクエリを実行します。

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. BigQuery でストリーミング データにクエリを実行します。

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Flink SQL クライアントでストリーミング ジョブを終了します。

    STOP JOB 'JOB_ID';

    JOB_ID は、ストリーミング ジョブの作成時に出力に表示されたジョブ ID に置き換えます。

Managed Service for Apache Spark でメタストアを構成する

Spark SQL または PySpark を使用して、Managed Service for Apache Spark で Lakehouse ランタイム カタログを構成できます。

Spark SQL

  1. Lakehouse ランタイム カタログで実行する Spark SQL コマンドを含む SQL ファイルを作成します。たとえば、次のコマンドは名前空間とテーブルを作成します。

    SET `spark.sql.catalog.CATALOG_NAME`=`org.apache.iceberg.spark.SparkCatalog`;
    SET `spark.sql.catalog.CATALOG_NAME.type`=`bigquery`;
    SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id`=`PROJECT_ID`;
    SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location`=`LOCATION`;
    SET `spark.sql.catalog.CATALOG_NAME.warehouse`=`WAREHOUSE_DIRECTORY`;
    
    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    次のように置き換えます。

    • CATALOG_NAME: Spark テーブルを参照するカタログ名。
    • NAMESPACE_NAME: Spark テーブルを参照する名前空間名。
    • TABLE_NAME: Spark テーブルのテーブル名。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。
  2. 次の コマンド gcloud dataproc batches submit spark-sqlを実行して、Spark SQL バッチジョブを送信します

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar

    次のように置き換えます。

    • SQL_SCRIPT_PATH: バッチジョブが使用する SQL ファイルのパス。
    • PROJECT_ID: バッチジョブを実行する Google Cloud プロジェクトの ID。
    • REGION: ワークロードが実行されるリージョン。
    • SUBNET_NAME(省略可): セッション サブネットの要件を満たす REGION 内の VPC サブネットの名前
    • BUCKET_PATH: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。 WAREHOUSE_DIRECTORY はこのバケットにあります。バケットの gs:// URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1 など)を指定できます。
    • LOCATION: バッチジョブを実行するロケーション。

    Spark バッチジョブの送信の詳細については、 Spark バッチ ワークロードを実行するをご覧ください。

PySpark

  1. Lakehouse ランタイム カタログで実行する PySpark コマンドを含む Python ファイルを作成します。

    たとえば、次のコマンドは、Lakehouse ランタイム カタログに保存されている Apache Iceberg テーブルを操作する Spark 環境を設定します。このコマンドは、新しい名前空間とその名前空間内の Apache Iceberg テーブルを作成します。

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("Lakehouse runtime catalog Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.type", "bigquery") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    次のように置き換えます。

    • PROJECT_ID: バッチジョブを実行する Google Cloud プロジェクトの ID。
    • LOCATION: BigQuery リソースが配置されているロケーション
    • CATALOG_NAME: Spark テーブルを参照するカタログ名。
    • TABLE_NAME: Spark テーブルのテーブル名。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。
    • NAMESPACE_NAME: Spark テーブルを参照する名前空間名。
  2. 次の gcloud dataproc batches submit pyspark コマンドを使用して、バッチジョブを送信します。

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar

    次のように置き換えます。

    • PYTHON_SCRIPT_PATH: バッチジョブが使用する Python スクリプトのパス。
    • PROJECT_ID: バッチジョブを実行する Google Cloud プロジェクトの ID。
    • REGION: ワークロードが実行されるリージョン。
    • BUCKET_PATH: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。バケットの gs:// URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1 など)を指定できます。

    PySpark バッチジョブの送信の詳細については、PySpark gcloud リファレンスをご覧ください。

次のステップ