Composant Hudi facultatif de Managed Service pour Apache Spark

Vous pouvez installer des composants supplémentaires tels que Hudi lorsque vous créez un cluster Managed Service pour Apache Spark à l'aide de la fonctionnalité Composants facultatifs. Cette page explique comment installer le composant Hudi sur un cluster Managed Service pour Apache Spark.

Lorsqu'il est installé sur un cluster Managed Service pour Apache Spark, le composant Apache Hudi installe les bibliothèques Hudi et configure Spark et Hive dans le cluster pour qu'ils fonctionnent avec Hudi.

Versions d'image Managed Service pour Apache Spark compatibles

Vous pouvez installer le composant Hudi sur les clusters Managed Service pour Apache Spark créés avec les versions d'image Managed Service pour Apache Spark suivantes :

Lorsque vous créez un cluster Managed Service pour Apache Spark avec Hudi, les propriétés Spark et Hive suivantes sont configurées pour fonctionner avec Hudi.

Fichier de configuration Propriété Valeur par défaut
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Installer le composant

Installez le composant Hudi lorsque vous créez un cluster Managed Service pour Apache Spark.

Les pages des versions de Managed Service pour Apache Spark listent la version du composant Hudi incluse dans chaque version de Managed Service pour Apache Spark.

Console

  1. Activez le composant.
    • Dans la console Google Cloud , ouvrez la page Managed Service pour Apache Spark Créer un cluster. Le panneau Configurer un cluster est sélectionné.
    • Dans la section Composants :
      • Sous Composants facultatifs, sélectionnez le composant Hudi.

Commande gcloud

Pour créer un cluster Managed Service pour Apache Spark incluant le composant Hudi, utilisez la commande avec l'option --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Remplacez les éléments suivants :

  • CLUSTER_NAME : valeur obligatoire. Nom du nouveau cluster.
  • REGION : valeur obligatoire. La région du cluster.
  • DATAPROC_IMAGE : facultatif. Vous pouvez utiliser cet indicateur facultatif pour spécifier une version d'image Managed Service pour Apache Spark non définie par défaut (voir Version d'image Managed Service pour Apache Spark par défaut).
  • PROPERTIES : facultatif. Vous pouvez utiliser cette option facultative pour définir les propriétés du composant Hudi, qui sont spécifiées avec le préfixe de fichier hudi:. Exemple : properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Propriété de version du composant Hudi : vous pouvez éventuellement spécifier la propriété dataproc:hudi.version. Remarque : La version du composant Hudi est définie par Managed Service pour Apache Spark afin d'être compatible avec la version de l'image du cluster Managed Service pour Apache Spark. Si vous définissez cette propriété, la création du cluster peut échouer si la version spécifiée n'est pas compatible avec l'image du cluster.
    • Propriétés Spark et Hive : Managed Service pour Apache Spark définit les propriétés Spark et Hive liées à Hudi lors de la création du cluster. Vous n'avez pas besoin de les définir lorsque vous créez le cluster ou envoyez des jobs.

API REST

Le composant Hudi peut être installé via l'API Managed Service pour Apache Spark à l'aide de SoftwareConfig.Component dans le cadre d'une requête clusters.create.

Envoyer un job pour lire et écrire des tables Hudi

Après avoir créé un cluster avec le composant Hudi, vous pouvez envoyer des tâches Spark et Hive qui lisent et écrivent des tables Hudi.

Exemple gcloud CLI :

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

Exemple de tâche PySpark

Le fichier PySpark suivant crée, lit et écrit une table Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

La commande gcloud CLI suivante envoie l'exemple de fichier PySpark à Managed Service pour Apache Spark.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Utiliser la CLI Hudi

L'interface de ligne de commande Hudi se trouve à l'adresse /usr/lib/hudi/cli/hudi-cli.sh sur le nœud maître du cluster Managed Service pour Apache Spark. Vous pouvez utiliser la CLI Hudi pour afficher les schémas, les commits et les statistiques des tables Hudi, et pour effectuer manuellement des opérations administratives, comme planifier des compactages (voir Utiliser hudi-cli).

Pour démarrer la CLI Hudi et vous connecter à une table Hudi :

  1. Connectez-vous en SSH au nœud maître.
  2. Exécutez /usr/lib/hudi/cli/hudi-cli.sh. L'invite de commande devient hudi->.
  3. Exécutez connect --path gs://my-bucket/my-hudi-table.
  4. Exécutez des commandes telles que desc, qui décrit le schéma de la table, ou commits show, qui affiche l'historique des commits.
  5. Pour arrêter la session CLI, exécutez exit.

Étapes suivantes