BigQuery 用のカスタム Apache Iceberg カタログを構成すると、Apache Spark エンジンと Apache Flink エンジンがレイクハウス ランタイム カタログに接続されます。
Lakehouse for Apache Iceberg のこの統合を確立すると、 Managed Service for Apache Spark クラスタまたは Managed Service for Apache Spark を使用してオープン テーブル形式を管理するための単一の共有メタデータ レイヤが作成されます。
始める前に
- Google Cloud プロジェクトに対する課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
BigQuery API と Managed Service for Apache Spark API を有効にします。
Lakehouse ランタイム カタログについて理解する。
必要なロール
Lakehouse ランタイム カタログの構成に必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。
-
Managed Service for Apache Spark クラスタを作成する。プロジェクトの Compute Engine デフォルトのサービス アカウントに対する Dataproc ワーカー (
roles/dataproc.worker) -
Lakehouse ランタイム カタログ テーブルを作成します。
- プロジェクトの Managed Service for Apache Spark VM サービス アカウントに対する Dataproc ワーカー (
roles/dataproc.worker) - プロジェクトの Managed Service for Apache Spark VM サービス アカウントに対する BigQuery データ編集者 (
roles/bigquery.dataEditor) - プロジェクトの Managed Service for Apache Spark VM サービス アカウントに対するストレージ オブジェクト ユーザー (
roles/storage.objectUser)
- プロジェクトの Managed Service for Apache Spark VM サービス アカウントに対する Dataproc ワーカー (
-
Lakehouse ランタイム カタログ テーブルをクエリする:
- プロジェクトに対する BigQuery データ閲覧者(
roles/bigquery.dataViewer) - プロジェクトに対する BigQuery ユーザー(
roles/bigquery.user) - プロジェクトに対するストレージ オブジェクト閲覧者(
roles/storage.objectViewer)
- プロジェクトに対する BigQuery データ閲覧者(
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
Managed Service for Apache Spark でメタストアを構成する
Spark または Flink のいずれかを使用して、Managed Service for Apache Spark で Lakehouse ランタイム カタログを構成できます。
Spark
新しいクラスタを構成します。新しい Managed Service for Apache Spark クラスタを作成するには、次の
gcloud dataproc clusters createコマンドを実行します。このコマンドには、Lakehouse ランタイム カタログを使用するために必要な設定が含まれています。gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
次のように置き換えます。
CLUSTER_NAME: Managed Service for Apache Spark クラスタの名前。PROJECT_ID: クラスタを作成する Google Cloud プロジェクトの ID。LOCATION: クラスタを作成する Compute Engine リージョン。
次のいずれかの方法で Spark ジョブを送信します。
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\ spark.sql.catalog.CATALOG_NAME.type=bigquery,\ spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID,\ spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION,\ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
次のように置き換えます。
PROJECT_ID: Managed Service for Apache Spark クラスタを含む Google Cloud プロジェクトの ID。CLUSTER_NAME: Spark SQL ジョブの実行に使用している Managed Service for Apache Spark クラスタの名前。REGION: クラスタが配置されている Compute Engine リージョン。LOCATION: BigQuery リソースのロケーション。CATALOG_NAME: SQL ジョブで使用する Spark カタログの名前。WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。この値はgs://で始まります。SPARK_SQL_COMMAND: 実行する Spark SQL クエリ。このクエリには、リソースを作成するコマンドを含めます。たとえば、名前空間とテーブルを作成します。
spark-sql CLI
Google Cloud コンソールで [VM インスタンス] ページに移動します。
Managed Service for Apache Spark VM インスタンスに接続するには、Managed Service for Apache Spark クラスタのメイン VM インスタンス名(クラスタ名に接尾辞
-mを付けたもの)が一覧表示されている行で [SSH] をクリックします。出力は次のようになります。Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$ターミナルで、次の Lakehouse ランタイム カタログの初期化コマンドを実行します。
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.type=bigquery \ --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
次のように置き換えます。
CATALOG_NAME: SQL ジョブで使用している Spark カタログの名前。PROJECT_ID: Spark カタログがリンクされる Lakehouse ランタイム カタログの Google Cloud プロジェクト ID。LOCATION: Lakehouse ランタイム カタログの Google Cloud ロケーション。WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。この値はgs://で始まります。
クラスタに正常に接続すると、Spark ターミナルに「
spark-sql」というプロンプトが表示されます。このプロンプトを使用して Spark ジョブを送信できます。spark-sql (default)>
Flink
- オプションの Flink コンポーネントを有効にして Managed Service for Apache Spark クラスタを作成し、Managed Service for Apache Spark
2.2以降を使用していることを確認します。 Google Cloud コンソールで、[VM インスタンス] ページに移動します。
仮想マシン インスタンスのリストで、[SSH] をクリックして、メインの Managed Service for Apache Spark クラスタ VM インスタンスに接続します。このインスタンスは、クラスタ名の後に
-m接尾辞が付いた名前でリストされます。Lakehouse ランタイム カタログ用に Apache Iceberg カスタム カタログ プラグインを構成します。
FLINK_VERSION=1.20 ICEBERG_VERSION=1.10.0 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/${ICEBERG_VERSION}/iceberg-bigquery-${ICEBERG_VERSION}.jar -P lib sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/${ICEBERG_VERSION}/iceberg-gcp-bundle-${ICEBERG_VERSION}.jar -P lib sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/${ICEBERG_VERSION}/iceberg-gcp-${ICEBERG_VERSION}.jar -P lib
YARN で Flink セッションを開始します。
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Flink でカタログを作成します。
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp.bigquery.project-id'='PROJECT_ID', 'gcp.bigquery.location'='LOCATION' );
次のように置き換えます。
CATALOG_NAME: Lakehouse ランタイム カタログ カタログにリンクされている Flink カタログ識別子。WAREHOUSE_DIRECTORY: ウェアハウス ディレクトリのベースパス(Flink がファイルを作成する Cloud Storage フォルダ)。この値はgs://で始まります。PROJECT_ID: Flink カタログがリンクされる Lakehouse ランタイム カタログのプロジェクト ID。LOCATION: BigQuery リソースのロケーション。
これで、Flink セッションが Lakehouse ランタイム カタログに接続され、Flink SQL コマンドを実行できるようになりました。
Lakehouse ランタイム カタログ リソースを管理する
Lakehouse ランタイム カタログに接続したので、Lakehouse ランタイム カタログに保存されているメタデータに基づいてリソースを作成して表示できます。
たとえば、インタラクティブ Flink SQL セッションで次のコマンドを実行して、Apache Iceberg のデータベースとテーブルを作成してみます。
カスタムの Apache Iceberg カタログを使用する:
USE CATALOG CATALOG_NAME;
CATALOG_NAMEは、Flink カタログ ID に置き換えます。データベースを作成する。これにより、BigQuery にデータセットが作成されます。
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
DATABASE_NAMEは、新しいデータベースの名前に置き換えます。作成したデータベースを使用する。
USE DATABASE_NAME;
Apache Iceberg テーブルを作成します。次の例では、販売テーブルを作成します。
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
ICEBERG_TABLE_NAMEは、新しいテーブルの名前に置き換えます。テーブルのメタデータを表示する:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
データベース内のテーブルを一覧表示する。
SHOW TABLES;
テーブルにデータを取り込む
前のセクションで Apache Iceberg テーブルを作成したら、Flink DataGen をデータソースとして使用して、リアルタイム データをテーブルに取り込むことができます。このワークフローの例を次に示します。
DataGen を使用して一時テーブルを作成します。
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
次のように置き換えます。
DATABASE_NAME: 一時テーブルを保存するデータベースの名前。TEMP_TABLE_NAME: 一時テーブルの名前。ICEBERG_TABLE_NAME: 前のセクションで作成した Apache Iceberg テーブルの名前。
並列処理を 1 に設定します。
SET 'parallelism.default' = '1';
チェックポイント間隔を設定します。
SET 'execution.checkpointing.interval' = '10second';
チェックポイントを設定します。
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
リアルタイム ストリーミング ジョブを開始します。
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
出力は次のようになります。
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
ストリーミング ジョブのステータスを確認する手順は次のとおりです。
Google Cloud コンソールで、[クラスタ] ページに移動します。
クラスタを選択します。
[ウェブ インターフェース] タブをクリックします。
[YARN ResourceManager] リンクをクリックします。
[YARN ResourceManager] インターフェースで、Flink セッションを見つけて、[Tracking UI] の [ApplicationMaster] リンクをクリックします。
[ステータス] 列で、ジョブのステータスが [実行中] であることを確認します。
Flink SQL クライアントでストリーミング データにクエリを実行します。
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
BigQuery でストリーミング データにクエリを実行します。
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Flink SQL クライアントでストリーミング ジョブを終了します。
STOP JOB 'JOB_ID';
JOB_IDは、ストリーミング ジョブの作成時に出力に表示されたジョブ ID に置き換えます。
Managed Service for Apache Spark でメタストアを構成する
Spark SQL または PySpark を使用して、Managed Service for Apache Spark でレイクハウス ランタイム カタログを構成できます。
Spark SQL
Lakehouse ランタイム カタログで実行する Spark SQL コマンドを含む SQL ファイルを作成します。たとえば、次のコマンドは名前空間とテーブルを作成します。
SET `spark.sql.catalog.CATALOG_NAME`=`org.apache.iceberg.spark.SparkCatalog`; SET `spark.sql.catalog.CATALOG_NAME.type`=`bigquery`; SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id`=`PROJECT_ID`; SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location`=`LOCATION`; SET `spark.sql.catalog.CATALOG_NAME.warehouse`=`WAREHOUSE_DIRECTORY`; CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
次のように置き換えます。
CATALOG_NAME: Spark テーブルを参照するカタログ名。NAMESPACE_NAME: Spark テーブルを参照する名前空間名。TABLE_NAME: Spark テーブルのテーブル名。WAREHOUSE_DIRECTORY: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。
次の
gcloud dataproc batches submit spark-sqlコマンドを実行して、Spark SQL バッチジョブを送信します。gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar
次のように置き換えます。
SQL_SCRIPT_PATH: バッチジョブが使用する SQL ファイルのパス。PROJECT_ID: バッチジョブを実行する Google Cloud プロジェクトの ID。REGION: ワークロードが実行されるリージョン。SUBNET_NAME(省略可): セッション サブネットの要件を満たすREGIONの VPC サブネットの名前。BUCKET_PATH: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。WAREHOUSE_DIRECTORYはこのバケットにあります。バケットのgs://URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1など)を指定できます。LOCATION: バッチジョブを実行するロケーション。
Spark バッチジョブの送信の詳細については、Spark バッチ ワークロードを実行するをご覧ください。
PySpark
Lakehouse ランタイム カタログで実行する PySpark コマンドを含む Python ファイルを作成します。
たとえば、次のコマンドは、Lakehouse ランタイム カタログに保存されている Apache Iceberg テーブルを操作する Spark 環境を設定します。このコマンドは、新しい名前空間とその名前空間内の Apache Iceberg テーブルを作成します。
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Lakehouse runtime catalog Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.type", "bigquery") \ .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
次のように置き換えます。
PROJECT_ID: バッチジョブを実行する Google Cloud プロジェクトの ID。LOCATION: BigQuery リソースが配置されているロケーション。CATALOG_NAME: Spark テーブルを参照するカタログ名。TABLE_NAME: Spark テーブルのテーブル名。WAREHOUSE_DIRECTORY: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。NAMESPACE_NAME: Spark テーブルを参照する名前空間名。
次の
gcloud dataproc batches submit pysparkコマンドを使用して、バッチジョブを送信します。gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar
次のように置き換えます。
PYTHON_SCRIPT_PATH: バッチジョブが使用する Python スクリプトのパス。PROJECT_ID: バッチジョブを実行する Google Cloud プロジェクトの ID。REGION: ワークロードが実行されるリージョン。BUCKET_PATH: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。バケットのgs://URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1など)を指定できます。
PySpark バッチジョブの送信の詳細については、PySpark gcloud リファレンスをご覧ください。