רכיב Hudi אופציונלי של Managed Service for Apache Spark

אפשר להתקין רכיבים נוספים כמו Hudi כשיוצרים אשכול של Managed Service for Apache Spark באמצעות התכונה Optional components. בדף הזה מוסבר איך להתקין את רכיב Hudi באשכול של Managed Service for Apache Spark.

כשמתקינים את רכיב Apache Hudi באשכול Managed Service for Apache Spark, הוא מתקין ספריות של Hudi ומגדיר את Spark ו-Hive באשכול כך שיפעלו עם Hudi.

גרסאות תמונה תואמות של Managed Service for Apache Spark

אפשר להתקין את רכיב Hudi באשכולות של Managed Service for Apache Spark שנוצרו באמצעות גרסאות התמונה הבאות של Managed Service for Apache Spark:

כשיוצרים Managed Service for Apache Spark עם אשכול Hudi, המאפיינים הבאים של Spark ו-Hive מוגדרים לעבודה עם Hudi.

קובץ תצורה מאפיין (property) ערך ברירת המחדל
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

התקנת הרכיב

מתקינים את רכיב Hudi כשיוצרים אשכול Managed Service for Apache Spark.

בדפים של גרסאות הפצה של תמונות Managed Service for Apache Spark מפורטת גרסת רכיב Hudi שכלולה בכל גרסת הפצה של תמונה של Managed Service for Apache Spark.

מסוףGoogle Cloud

  1. במסוף Google Cloud , פותחים את הדף Create cluster.
  2. לוחצים על הגדרה נוספת כדי להרחיב את הקטע.
  3. עורכים את הרכיבים האופציונליים.
  4. בחלונית שנפתחת, מסמנים את התיבה לצד Hudi.
  5. לוחצים על Save.

‫CLI של gcloud

כדי ליצור אשכול של Managed Service for Apache Spark שכולל את רכיב Hudi, משתמשים בפקודה עם הדגל --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

מחליפים את מה שכתוב בשדות הבאים:

  • CLUSTER_NAME: שדה חובה. השם החדש של האשכול.
  • REGION: שדה חובה. האזור של האשכול.
  • DATAPROC_IMAGE: אופציונלי. אפשר להשתמש בדגל האופציונלי הזה כדי לציין גרסה של תמונה של Managed Service for Apache Spark שאינה ברירת המחדל (ראו גרסת תמונה של Managed Service for Apache Spark שמוגדרת כברירת מחדל).
  • PROPERTIES: אופציונלי. אפשר להשתמש בדגל האופציונלי הזה כדי להגדיר מאפיינים של רכיב Hudi, שמצוינים באמצעות hudi: קידומת הקובץ (לדוגמה: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • מאפיין גרסת רכיב Hudi: אפשר לציין את המאפיין dataproc:hudi.version. הערה: הגרסה של רכיב Hudi מוגדרת על ידי Managed Service for Apache Spark כך שתהיה תואמת לגרסת תמונת האשכול של Managed Service for Apache Spark. אם מגדירים את המאפיין הזה, יכול להיות שיצירת האשכול תיכשל אם הגרסה שצוינה לא תואמת לתמונת האשכול.
    • מאפייני Spark ו-Hive: ‏Managed Service for Apache Spark מגדיר מאפייני Spark ו-Hive שקשורים ל-Hudi כשיוצרים את האשכול. לא צריך להגדיר אותם כשיוצרים את האשכול או כששולחים משימות.

‫API בארכיטקטורת REST

אפשר להתקין את רכיב Hudi דרך Dataproc API באמצעות SoftwareConfig.Component כחלק מבקשת clusters.create.

שליחת משימה לקריאה ולכתיבה של טבלאות Hudi

אחרי שיוצרים אשכול עם רכיב Hudi, אפשר לשלוח משימות Spark ו-Hive שקוראות וכותבות טבלאות Hudi.

gcloud CLI דוגמה:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

דוגמה למשימת PySpark

קובץ PySpark הבא יוצר, קורא וכותב טבלת Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

הפקודה הבאה ב-CLI של gcloud שולחת את קובץ PySpark לדוגמה אל Managed Service for Apache Spark.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

שימוש ב-Hudi CLI

‫Hudi CLI נמצא ב-/usr/lib/hudi/cli/hudi-cli.sh בצומת הראשי של אשכול Managed Service for Apache Spark. אפשר להשתמש ב-CLI של Hudi כדי לראות את הסכימות, הקומיטים והנתונים הסטטיסטיים של טבלאות Hudi, וכדי לבצע באופן ידני פעולות ניהוליות, כמו תזמון של פעולות דחיסה (ראו שימוש ב-hudi-cli).

כדי להפעיל את Hudi CLI ולהתחבר לטבלת Hudi:

  1. מתחברים ב-SSH לצומת הראשי.
  2. מריצים את /usr/lib/hudi/cli/hudi-cli.sh. שורת הפקודה משתנה ל-hudi->.
  3. מריצים את connect --path gs://my-bucket/my-hudi-table.
  4. להריץ פקודות כמו desc, שמתארת את סכימת הטבלה, או commits show, שמציגה את היסטוריית השמירה.
  5. כדי להפסיק את סשן ה-CLI, מריצים את הפקודה exit.

המאמרים הבאים