Componente Hudi facoltativo di Managed Service per Apache Spark

Puoi installare componenti aggiuntivi come Hudi quando crei un cluster Managed Service per Apache Spark utilizzando la funzionalità Componenti facoltativi. Questa pagina descrive come installare facoltativamente il componente Hudi su un cluster Managed Service per Apache Spark.

Quando viene installato su un cluster Managed Service per Apache Spark, il componente Apache Hudi installa le librerie Hudi e configura Spark e Hive nel cluster in modo che funzionino con Hudi.

Versioni delle immagini di Managed Service per Apache Spark compatibili

Puoi installare il componente Hudi sui cluster Managed Service per Apache Spark creati con le seguenti versioni delle immagini di Managed Service per Apache Spark:

Quando crei un cluster Managed Service per Apache Spark con Hudi, le seguenti proprietà di Spark e Hive vengono configurate per funzionare con Hudi.

File di configurazione Proprietà Valore predefinito
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Installare il componente

Installa il componente Hudi quando crei un cluster Managed Service per Apache Spark.

Le pagine delle versioni di rilascio delle immagini di Managed Service per Apache Spark elencano la versione del componente Hudi inclusa in ogni release di immagini di Managed Service per Apache Spark.

Console

  1. Attiva il componente.
    • Nella Google Cloud console, apri la pagina Crea un cluster di Managed Service per Apache Spark. Viene selezionato il riquadro Configura cluster.
    • Nella sezione Componenti :
      • In Componenti facoltativi, seleziona il il componente Hudi.

Comando g-cloud

Per creare un cluster Managed Service per Apache Spark che includa il componente Hudi, utilizza il comando con il flag --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Sostituisci quanto segue:

  • CLUSTER_NAME: obbligatorio. Il nome del nuovo cluster.
  • REGION: obbligatorio. La regione del cluster.
  • DATAPROC_IMAGE: facoltativo. Puoi utilizzare questo flag facoltativo per specificare una versione dell'immagine di Managed Service per Apache Spark non predefinita (vedi Versione predefinita dell'immagine di Managed Service per Apache Spark).
  • PROPERTIES: facoltativo. Puoi utilizzare questo flag facoltativo per impostare le proprietà del componente Hudi, che vengono specificate con il prefisso del file hudi: Esempio: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Proprietà della versione del componente Hudi: puoi specificare facoltativamente la dataproc:hudi.version proprietà. Nota: la versione del componente Hudi è impostata da Managed Service per Apache Spark in modo che sia compatibile con la versione dell'immagine del cluster Managed Service per Apache Spark. Se imposti questa proprietà, la creazione del cluster può non riuscire se la versione specificata non è compatibile con l'immagine del cluster.
    • Proprietà di Spark e Hive: Managed Service per Apache Spark imposta le proprietà di Spark e Hive correlate a Hudi quando viene creato il cluster. Non devi impostarle quando crei il cluster o invii i job.

API REST

Il componente Hudi può essere installato tramite l'API Managed Service per Apache Spark utilizzando SoftwareConfig.Component come parte di una clusters.create richiesta.

Inviare un job per leggere e scrivere tabelle Hudi

Dopo aver creato un cluster con il componente Hudi, puoi inviare job Spark e Hive che leggono e scrivono tabelle Hudi.

Esempio di gcloud CLI:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

Job PySpark di esempio

Il seguente file PySpark crea, legge e scrive una tabella Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

Il seguente comando gcloud CLI invia il file PySpark di esempio a Managed Service per Apache Spark.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Utilizzare l'interfaccia a riga di comando Hudi

L'interfaccia a riga di comando Hudi si trova in /usr/lib/hudi/cli/hudi-cli.sh sul nodo master del cluster Managed Service per Apache Spark. Puoi utilizzare l'interfaccia a riga di comando Hudi per visualizzare gli schemi, i commit e le statistiche delle tabelle Hudi e per eseguire manualmente operazioni amministrative, come la pianificazione delle compattazioni (vedi Utilizzo di hudi-cli).

Per avviare l'interfaccia a riga di comando Hudi e connetterti a una tabella Hudi:

  1. Accedi al nodo master tramite SSH.
  2. Esegui /usr/lib/hudi/cli/hudi-cli.sh. Il prompt dei comandi cambia in hudi->.
  3. Esegui connect --path gs://my-bucket/my-hudi-table.
  4. Esegui comandi come desc, che descrive lo schema della tabella, o commits show, che mostra la cronologia dei commit.
  5. Per interrompere la sessione dell'interfaccia a riga di comando, esegui exit.

Passaggi successivi