使用 Iceberg 1.10 為 Managed Service for Apache Spark 設定 Lakehouse 執行階段目錄

設定 BigQuery 的自訂 Apache Iceberg 目錄,即可將 Apache Spark 和 Apache Flink 引擎連結至 Lakehouse 執行階段目錄

Lakehouse for Apache Iceberg 建立這項整合功能後,您可以使用 Managed Service for Apache Spark 叢集或 Managed Service for Apache Spark,建立單一共用的中繼資料層,管理開放式資料表格式。

事前準備

  1. 為 Google Cloud 專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能
  2. 啟用 BigQuery 和 Managed Service for Apache Spark API。

    啟用 API

  3. 瞭解 Lakehouse 執行階段目錄

必要的角色

如要取得設定 Lakehouse 執行階段目錄所需的權限,請要求管理員授予您下列 IAM 角色:

  • 建立 Managed Service for Apache Spark 叢集: 專案中 Compute Engine 預設服務帳戶上的 Dataproc Worker (roles/dataproc.worker)
  • 建立 Lakehouse 執行階段目錄資料表:
    • Dataproc 工作者 (roles/dataproc.worker) 專案中 Managed Service for Apache Spark VM 服務帳戶的
    • 專案中 Managed Service for Apache Spark VM 服務帳戶的 BigQuery 資料編輯者 (roles/bigquery.dataEditor) 角色
    • 專案中 Managed Service for Apache Spark VM 服務帳戶的「Storage Object User」 (roles/storage.objectUser)
  • 查詢 Lakehouse 執行階段目錄資料表:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

您或許也能透過自訂角色或其他預先定義的角色,取得必要權限。

使用 Managed Service for Apache Spark 設定 Metastore

您可以透過 Spark 或 Flink,使用 Managed Service for Apache Spark 設定 Lakehouse 執行階段目錄:

Spark

  1. 設定新叢集。如要建立新的 Managed Service for Apache Spark 叢集,請執行下列 gcloud dataproc clusters create 指令,其中包含使用 Lakehouse 執行階段目錄所需的設定:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    更改下列內容:

    • CLUSTER_NAME:Managed Service for Apache Spark 叢集的名稱。
    • PROJECT_ID:要建立叢集的 Google Cloud 專案 Google Cloud ID。
    • LOCATION:您要建立叢集的 Compute Engine 區域。
  2. 使用下列任一方法提交 Spark 工作:

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,\
        spark.sql.catalog.CATALOG_NAME.type=bigquery,\
        spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID,\
        spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION,\
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    更改下列內容:

    • PROJECT_ID:包含 Managed Service for Apache Spark 叢集的 Google Cloud 專案 ID。
    • CLUSTER_NAME:用於執行 Spark SQL 作業的 Managed Service for Apache Spark 叢集名稱。
    • REGION:叢集所在的 Compute Engine 區域
    • LOCATION:BigQuery 資源的位置。
    • CATALOG_NAME:要用於 SQL 工作的 Spark 目錄名稱。
    • WAREHOUSE_DIRECTORY:包含資料倉儲的 Cloud Storage 資料夾。這個值開頭為 gs://
    • SPARK_SQL_COMMAND:您要執行的 Spark SQL 查詢。這項查詢包含建立資源的指令。舉例來說,如要建立命名空間和資料表,

    spark-sql CLI

    1. 前往 Google Cloud 控制台的「VM Instances」(VM 執行個體) 頁面

      前往 VM 執行個體

    2. 如要連線至 Managed Service for Apache Spark VM 執行個體,請在列出 Managed Service for Apache Spark 叢集主要 VM 執行個體名稱的資料列中,按一下「SSH」SSH,該名稱是叢集名稱,後方加上 -m 後置字串。輸出結果會與下列內容相似:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. 在終端機中執行下列 Lakehouse 執行階段目錄初始化指令:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.type=bigquery \
          --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      更改下列內容:

      • CATALOG_NAME:您在 SQL 工作中使用的 Spark 目錄名稱。
      • PROJECT_ID:Spark 目錄連結的 Lakehouse 執行階段目錄 Google Cloud 專案 ID。
      • LOCATION:Lakehouse 執行階段目錄的 Google Cloud 位置。
      • WAREHOUSE_DIRECTORY:包含資料倉儲的 Cloud Storage 資料夾。這個值開頭為 gs://

      成功連線至叢集後,Spark 終端機會顯示 spark-sql 提示,您可以使用這個提示提交 Spark 工作。

      spark-sql (default)>
      
  1. 建立 Managed Service for Apache Spark 叢集,並啟用選用的 Flink 元件,且請務必使用 Managed Service for Apache Spark 2.2 以上版本。
  2. 前往 Google Cloud 控制台的「VM instances」(VM 執行個體) 頁面

    前往 VM 執行個體

  3. 在虛擬機器執行個體清單中,按一下「SSH」SSH,連線至主要的 Managed Service for Apache Spark 叢集 VM 執行個體,該執行個體會列為叢集名稱,後方加上 -m 後置字串。

  4. 為 Lakehouse 執行階段目錄設定 Apache Iceberg 自訂目錄外掛程式:

    FLINK_VERSION=1.20
    ICEBERG_VERSION=1.10.0
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/${ICEBERG_VERSION}/iceberg-bigquery-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/${ICEBERG_VERSION}/iceberg-gcp-bundle-${ICEBERG_VERSION}.jar -P lib
    
    sudo wget -c https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/${ICEBERG_VERSION}/iceberg-gcp-${ICEBERG_VERSION}.jar -P lib
  5. 在 YARN 上啟動 Flink 工作階段:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. 在 Flink 中建立目錄:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp.bigquery.project-id'='PROJECT_ID',
    'gcp.bigquery.location'='LOCATION'
    );

    更改下列內容:

    • CATALOG_NAME:Flink 目錄 ID,連結至 Lakehouse 執行階段目錄。
    • WAREHOUSE_DIRECTORY:倉庫目錄的基本路徑 (Flink 建立檔案的 Cloud Storage 資料夾)。這個值開頭為 gs://
    • PROJECT_ID:Flink 目錄連結的 Lakehouse 執行階段目錄目錄專案 ID。
    • LOCATION:BigQuery 資源的位置

Flink 工作階段現已連線至 Lakehouse 執行階段目錄,您可以執行 Flink SQL 指令。

連線至 Lakehouse 執行階段目錄後,您就能根據儲存在 Lakehouse 執行階段目錄中的中繼資料,建立及查看資源。

舉例來說,您可以在互動式 Flink SQL 工作階段中執行下列指令,建立 Apache Iceberg 資料庫和資料表。

  1. 使用自訂 Apache Iceberg 目錄:

    USE CATALOG CATALOG_NAME;

    CATALOG_NAME 替換為 Flink 目錄 ID。

  2. 建立資料庫,這會在 BigQuery 中建立資料集:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    DATABASE_NAME 替換為新資料庫的名稱。

  3. 使用您建立的資料庫:

    USE DATABASE_NAME;
  4. 建立 Apache Iceberg 資料表。以下範例會建立銷售資料表:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    ICEBERG_TABLE_NAME 替換成新資料表的名稱。

  5. 查看資料表中繼資料:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. 列出資料庫中的資料表:

    SHOW TABLES;

將資料擷取至資料表

在上一節中建立 Apache Iceberg 資料表後,您可以使用 Flink DataGen 做為資料來源,將即時資料擷取到資料表中。以下步驟是這個工作流程的範例:

  1. 使用 DataGen 建立暫時表格:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    更改下列內容:

    • DATABASE_NAME:用於儲存臨時資料表的資料庫名稱。
    • TEMP_TABLE_NAME:臨時資料表的名稱。
    • ICEBERG_TABLE_NAME:您在上一個章節中建立的 Apache Iceberg 資料表名稱。
  2. 將平行處理設為 1:

    SET 'parallelism.default' = '1';
  3. 設定檢查點間隔:

    SET 'execution.checkpointing.interval' = '10second';
  4. 設定查核點:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. 啟動即時串流工作:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    輸出結果會與下列內容相似:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. 如要檢查串流工作的狀態,請執行下列操作:

    1. 前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。

      前往叢集

    2. 選取您的叢集。

    3. 按一下「網頁介面」分頁標籤。

    4. 按一下「YARN ResourceManager」連結。

    5. YARN ResourceManager 介面中,找出 Flink 工作階段,然後按一下「Tracking UI」下方的「ApplicationMaster」連結。

    6. 在「狀態」欄中,確認工作狀態為「執行中」

  7. 在 Flink SQL 用戶端中查詢串流資料:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. 在 BigQuery 中查詢串流資料:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. 在 Flink SQL 用戶端終止串流工作:

    STOP JOB 'JOB_ID';

    JOB_ID 替換為您建立串流工作時,輸出內容中顯示的工作 ID。

使用 Managed Service for Apache Spark 設定 Metastore

您可以使用 Spark SQL 或 PySpark,透過 Managed Service for Apache Spark 設定 Lakehouse 執行階段目錄。

Spark SQL

  1. 建立 SQL 檔案,內含要在 Lakehouse 執行階段目錄中執行的 Spark SQL 指令。舉例來說,以下指令會建立命名空間和資料表:

    SET `spark.sql.catalog.CATALOG_NAME`=`org.apache.iceberg.spark.SparkCatalog`;
    SET `spark.sql.catalog.CATALOG_NAME.type`=`bigquery`;
    SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id`=`PROJECT_ID`;
    SET `spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location`=`LOCATION`;
    SET `spark.sql.catalog.CATALOG_NAME.warehouse`=`WAREHOUSE_DIRECTORY`;
    
    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    更改下列內容:

    • CATALOG_NAME:參照 Spark 資料表的目錄名稱。
    • NAMESPACE_NAME:參照 Spark 資料表的命名空間名稱。
    • TABLE_NAME:Spark 資料表的資料表名稱。
    • WAREHOUSE_DIRECTORY:Cloud Storage 資料夾的 URI,其中儲存資料倉儲。
  2. 執行下列 gcloud dataproc batches submit spark-sql 指令,提交 Spark SQL 批次工作:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar

    更改下列內容:

    • SQL_SCRIPT_PATH:批次工作使用的 SQL 檔案路徑。
    • PROJECT_ID:要執行批次工作的 Google Cloud 專案 ID。
    • REGION:工作負載執行的區域。
    • SUBNET_NAME (選用):REGION 中符合工作階段子網路需求的虛擬私有雲子網路名稱。
    • BUCKET_PATH:用於上傳工作負載依附元件的 Cloud Storage bucket 位置。WAREHOUSE_DIRECTORY位於這個值區中。 不需要提供 bucket 的 gs:// URI 前置字串。您可以指定 bucket 路徑或 bucket 名稱,例如 mybucketname1
    • LOCATION:執行批次工作的位置。

    如要進一步瞭解如何提交 Spark 批次工作,請參閱「執行 Spark 批次工作負載」。

PySpark

  1. 建立 Python 檔案,其中包含要在 Lakehouse 執行階段目錄中執行的 PySpark 指令。

    舉例來說,下列指令會設定 Spark 環境,與儲存在 Lakehouse 執行階段目錄中的 Apache Iceberg 資料表互動。然後,這個指令會在該命名空間中建立新的命名空間和 Apache Iceberg 資料表。

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("Lakehouse runtime catalog Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.type", "bigquery") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.project-id", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp.bigquery.location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    更改下列內容:

    • PROJECT_ID:要執行批次工作的 Google Cloud 專案 ID。
    • LOCATION:BigQuery 資源所在的位置
    • CATALOG_NAME:參照 Spark 資料表的目錄名稱。
    • TABLE_NAME:Spark 資料表的資料表名稱。
    • WAREHOUSE_DIRECTORY:Cloud Storage 資料夾的 URI,其中儲存資料倉儲。
    • NAMESPACE_NAME:參照 Spark 資料表的命名空間名稱。
  2. 使用下列 gcloud dataproc batches submit pyspark 指令提交批次工作:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0/iceberg-spark-runtime-3.5_2.12-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-bigquery/1.10.0/iceberg-bigquery-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.10.0/iceberg-gcp-bundle-1.10.0.jar,https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-gcp/1.10.0/iceberg-gcp-1.10.0.jar

    更改下列內容:

    • PYTHON_SCRIPT_PATH:批次工作使用的 Python 指令碼路徑。
    • PROJECT_ID:要執行批次工作的 Google Cloud 專案 ID。
    • REGION:工作負載執行的區域。
    • BUCKET_PATH:用於上傳工作負載依附元件的 Cloud Storage bucket 位置。不需要提供 bucket 的 gs:// URI 前置字串。您可以指定 bucket 路徑或 bucket 名稱,例如 mybucketname1

    如要進一步瞭解如何提交 PySpark 批次工作,請參閱 PySpark gcloud 參考資料

後續步驟