Managed Service for Apache Spark 選用 Hudi 元件

使用選用元件功能建立 Managed Service for Apache Spark 叢集時,可以安裝 Hudi 等其他元件。本頁說明如何在 Managed Service for Apache Spark 叢集上選擇性地安裝 Hudi 元件。

Apache Hudi 元件安裝至 Managed Service for Apache Spark 叢集時,會一併安裝 Hudi 程式庫,同時設定叢集中的 Spark 和 Hive,以便與 Hudi 搭配運作。

相容的 Managed Service for Apache Spark 映像檔版本

Hudi 元件可安裝於使用下列 Managed Service for Apache Spark 映像檔版本建立的 Managed Service for Apache Spark 叢集:

建立搭載 Hudi 的 Managed Service for Apache Spark 叢集時,系統會設定下列 Spark 和 Hive 屬性,以便與 Hudi 搭配運作。

設定檔 屬性 預設值
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

安裝元件

請在建立 Managed Service for Apache Spark 叢集時安裝 Hudi 元件。

Managed Service for Apache Spark 映像檔發布版本頁面會列出每個 Managed Service for Apache Spark 映像檔版本內含的 Hudi 元件版本。

控制台

  1. 啟用元件。
    • 在 Google Cloud 控制台中,開啟 Managed Service for Apache Spark 的「建立叢集」頁面。系統會選取「Set up cluster」(設定叢集) 面板。
    • 在「Components」(元件) 部分執行下列操作:
      • 在「Optional components」(選用元件) 下方選取「Hudi」元件。

gcloud 指令

如要建立包含 Hudi 元件的 Managed Service for Apache Spark 叢集,請在指令中使用 --optional-components 旗標。

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

更改下列內容:

  • CLUSTER_NAME:這是必要項目,用來設定新的叢集名稱。
  • REGION:這是必要項目,用來設定叢集區域
  • DATAPROC_IMAGE:選用。可選用此旗標來指定非預設的 Managed Service for Apache Spark 映像檔版本 (請參閱預設 Managed Service for Apache Spark 映像檔版本)。
  • PROPERTIES:選用。可選用此旗標設定 Hudi 元件屬性,而這些屬性需以 hudi: 檔案前置字元指定,例如:properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE)。
    • Hudi 元件版本屬性:使用者可視需要指定 dataproc:hudi.version 屬性注意:Managed Service for Apache Spark 會設定 Hudi 元件版本,確保與 Managed Service for Apache Spark 叢集映像檔版本相容。如果設定這項屬性,但指定版本與叢集映像檔不相容,則可能導致叢集建立作業失敗。
    • Spark 和 Hive 屬性:Managed Service for Apache Spark 會在建立叢集時設定與 Hudi 相關的 Spark 和 Hive 屬性。使用者建立叢集或提交工作時不需自行設定這些屬性。

REST API

Hudi 元件可以透過 Managed Service for Apache Spark API 安裝,方法是在 clusters.create 要求中使用 SoftwareConfig.Component

提交讀取及寫入 Hudi 資料表的工作

使用 Hudi 元件建立叢集後,即可提交讀取及寫入 Hudi 資料表的 Spark 和 Hive 工作。

gcloud CLI 範例:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

PySpark 工作範例

下列 PySpark 檔案會建立、讀取並寫入 Hudi 資料表。

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

下列 gcloud CLI 指令會將範例 PySpark 檔案提交至 Managed Service for Apache Spark。

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

使用 Hudi CLI

Hudi CLI 位於 Managed Service for Apache Spark 叢集主要執行個體節點的 /usr/lib/hudi/cli/hudi-cli.sh。您可以使用 Hudi CLI 查看 Hudi 資料表結構定義、提交和統計資料,以及手動執行管理作業,例如指派壓縮工作 (請參閱「使用 hudi-cli」)。

啟動 Hudi CLI 並連線至 Hudi 資料表的步驟如下:

  1. 透過 SSH 連線至主要節點
  2. 執行 /usr/lib/hudi/cli/hudi-cli.sh。命令提示字元會變更為 hudi->
  3. 執行 connect --path gs://my-bucket/my-hudi-table
  4. 執行指令,例如用於說明資料表結構定義的 desc,或用於顯示修訂歷史的 commits show
  5. 如要停止 CLI 工作階段,請執行 exit

後續步驟