Optionale Hudi-Komponente für Managed Service for Apache Spark

Sie können zusätzliche Komponenten wie Hudi installieren, wenn Sie einen Managed Service for Apache Spark Cluster mit dem Feature Optionale Komponenten erstellen. Auf dieser Seite wird beschrieben, wie Sie die Hudi-Komponente optional in einem Managed Service for Apache Spark-Cluster installieren können.

Wenn die Apache Hudi Komponente in einem Managed Service for Apache Spark-Cluster installiert ist, werden Hudi-Bibliotheken installiert und Spark und Hive im Cluster so konfiguriert, dass sie mit Hudi funktionieren.

Kompatible Managed Service for Apache Spark-Imageversionen

Sie können die Hudi-Komponente in Managed Service for Apache Spark-Clustern installieren, die mit den folgenden Managed Service for Apache Spark-Imageversionen erstellt wurden:

Wenn Sie einen Managed Service for Apache Spark-Cluster mit Hudi erstellen, werden die folgenden Spark- und Hive-Attribute für die Verwendung mit Hudi konfiguriert.

Konfigurationsdatei Attribut Standardwert
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Komponente installieren

Installieren Sie die Hudi-Komponente, wenn Sie einen Managed Service for Apache Spark-Cluster erstellen.

Auf den Seiten zu den Managed Service for Apache Spark-Image-Releaseversionen ist die Hudi-Komponentenversion aufgeführt, die in jedem Managed Service for Apache Spark-Image-Release enthalten ist.

Console

  1. Aktivieren Sie die Komponente.
    • Öffnen Sie in der Google Cloud console die Seite „Managed Service for Apache Spark Cluster erstellen “. Der Bereich Cluster einrichten ist ausgewählt.
    • Im Bereich Komponenten :
      • Wählen Sie unter Optionale Komponenten die Komponente Hudi aus.

gcloud-Befehl

Verwenden Sie den Befehl mit dem Flag --optional-components, um einen Managed Service for Apache Spark-Cluster zu erstellen, der die Hudi-Komponente enthält.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Ersetzen Sie Folgendes:

  • CLUSTER_NAME: erforderlich. Der Name des neuen Clusters.
  • REGION: erforderlich. Die Clusterregion.
  • DATAPROC_IMAGE: optional. Mit diesem optionalen Flag können Sie eine andere als die Standard-Imageversion von Managed Service for Apache Spark angeben (siehe Standard-Imageversion von Managed Service for Apache Spark).
  • PROPERTIES: optional. Mit diesem optionalen Flag können Sie Eigenschaften der Hudi-Komponente festlegen, die mit dem hudi: Dateipräfix angegeben werden. Beispiel: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Eigenschaft der Hudi-Komponentenversion: Sie können optional die dataproc:hudi.version Eigenschaft angeben. Hinweis: Die Hudi-Komponentenversion wird von Managed Service for Apache Spark so festgelegt, dass sie mit der Imageversion des Managed Service for Apache Spark-Clusters kompatibel ist. Wenn Sie diese Eigenschaft festlegen, kann die Clustererstellung fehlschlagen, wenn die angegebene Version nicht mit dem Clusterimage kompatibel ist.
    • Spark- und Hive-Attribute: Managed Service for Apache Spark legt Hudi-bezogene Spark- und Hive Attribute fest, wenn der Cluster erstellt wird. Sie müssen sie nicht festlegen, wenn Sie den Cluster erstellen oder Jobs senden.

REST API

Die Hudi-Komponente kann über die Managed Service for Apache Spark API mit SoftwareConfig.Component als Teil einer clusters.create Anfrage installiert werden.

Job senden, um Hudi-Tabellen zu lesen und zu schreiben

Nachdem Sie einen Cluster mit der Hudi-Komponente erstellt haben, können Sie Spark- und Hive-Jobs senden, die Hudi-Tabellen lesen und schreiben.

Beispiel für die gcloud CLI:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

Beispiel für einen PySpark-Job

Die folgende PySpark-Datei erstellt, liest und schreibt eine Hudi-Tabelle.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

Mit dem folgenden gcloud CLI-Befehl wird die PySpark-Beispieldatei an Managed Service for Apache Spark gesendet.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Hudi CLI verwenden

Die Hudi CLI befindet sich auf dem Masterknoten des Managed Service for Apache Spark-Clusters unter /usr/lib/hudi/cli/hudi-cli.sh. Mit der Hudi CLI können Sie Hudi-Tabellenschemas, Commits und Statistiken ansehen und administrative Vorgänge manuell ausführen, z. B. Komprimierungen planen (siehe Hudi CLI verwenden).

So starten Sie die Hudi CLI und stellen eine Verbindung zu einer Hudi-Tabelle her:

  1. Stellen Sie eine SSH-Verbindung zum Masterknoten her.
  2. Führen Sie /usr/lib/hudi/cli/hudi-cli.sh aus. Die Eingabeaufforderung ändert sich in hudi->.
  3. Führen Sie connect --path gs://my-bucket/my-hudi-table aus.
  4. Führen Sie Befehle wie desc aus, der das Tabellenschema beschreibt, oder commits show, der den Commitverlauf anzeigt.
  5. Führen Sie exit aus, um die CLI-Sitzung zu beenden.

Nächste Schritte

  • Kurzanleitung zu Hudi ansehen.