Utiliser Apache Spark avec HBase sur Dataproc

Objectifs

Ce tutoriel vous explique comment :

  1. Créer un cluster Dataproc, en installant Apache HBase et Apache ZooKeeper sur le cluster
  2. Créez une table HBase à l'aide du shell HBase exécuté sur le nœud maître du cluster Dataproc.
  3. Utilisez Cloud Shell pour envoyer une tâche Spark Java ou PySpark au service Dataproc qui écrit des données dans la table HBase, puis les lit.

Coûts

Dans ce document, vous utilisez les composants facturables de Google Cloudsuivants :

Vous pouvez obtenir une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût.

Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai sans frais.

Avant de commencer

Si ce n'est pas déjà fait, créez un projet Google Cloud Platform.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Créer un cluster Dataproc

    1. Exécutez la commande suivante dans un terminal de session Cloud Shell pour :

      • Installez les composants HBase et ZooKeeper.
      • Provisionnez trois nœuds de calcul (trois à cinq nœuds de calcul sont recommandés pour exécuter le code de ce tutoriel).
      • Activez la passerelle des composants.
      • Utiliser la version d'image 2.0
      • Utilisez l'indicateur --properties pour ajouter la configuration et la bibliothèque HBase aux chemins de classe du pilote et de l'exécuteur Spark.
    gcloud dataproc clusters create cluster-name \
        --region=region \
        --optional-components=HBASE,ZOOKEEPER \
        --num-workers=3 \
        --enable-component-gateway \
        --image-version=2.0 \
        --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
    

    Vérifier l'installation du connecteur

    1. Dans la console Google Cloud ou dans le terminal d'une session Cloud Shell, connectez-vous en SSH au nœud maître du cluster Dataproc.

    2. Vérifiez l'installation du connecteur Apache HBase Spark sur le nœud maître :

      ls -l /usr/lib/spark/jars | grep hbase-spark
      
      Exemple de résultat :
      -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
      

    3. Laissez le terminal de session SSH ouvert pour :

      1. Créer une table HBase
      2. (Utilisateurs Java) Exécutez des commandes sur le nœud maître du cluster pour déterminer les versions des composants installés sur le cluster.
      3. Analysez votre table HBase après avoir exécuté le code.

    Créer une table HBase

    Exécutez les commandes listées dans cette section dans le terminal de la session SSH du nœud maître que vous avez ouvert à l'étape précédente.

    1. Ouvrez l'interface système HBase :

      hbase shell
      

    2. Créez une table HBase "my-table" avec une famille de colonnes "cf" :

      create 'my_table','cf'
      

      1. Pour confirmer la création de la table, dans la console Google Cloud , cliquez sur HBase dans les liens de la passerelle des composants de la consoleGoogle Cloud pour ouvrir l'interface utilisateur Apache HBase. my-table est listé dans la section Tables de la page Accueil.

    Afficher le code Spark

    Java

    package hbase;
    
    import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class SparkHBaseMain {
        public static class SampleData implements Serializable {
            private String key;
            private String name;
    
    
            public SampleData(String key, String name) {
                this.key = key;
                this.name = name;
            }
    
            public SampleData() {
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
        }
        public static void main(String[] args) {
            // Init SparkSession
            SparkSession spark = SparkSession
                    .builder()
                    .master("yarn")
                    .appName("spark-hbase-tutorial")
                    .getOrCreate();
    
            // Data Schema
            String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                    "\"rowkey\":\"key\"," +
                    "\"columns\":{" +
                    "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                    "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                    "}" +
                    "}";
    
            Map<String, String> optionsMap = new HashMap<String, String>();
            optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);
    
            Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                    new SampleData("key1", "foo"),
                    new SampleData("key2", "bar")), SampleData.class);
    
            // Write to HBase
            ds.write()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .mode("overwrite")
                    .save();
    
            // Read from HBase
            Dataset dataset = spark.read()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .load();
            dataset.show();
        }
    }
    

    Python

    from pyspark.sql import SparkSession
    
    # Initialize Spark Session
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-hbase-tutorial') \
      .getOrCreate()
    
    data_source_format = ''
    
    # Create some test data
    df = spark.createDataFrame(
        [
            ("key1", "foo"),
            ("key2", "bar"),
        ],
        ["key", "name"]
    )
    
    # Define the schema for catalog
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"my_table"},
        "rowkey":"key",
        "columns":{
            "key":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"cf", "col":"name", "type":"string"}
        }
    }""".split())
    
    # Write to HBase
    df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()
    
    # Read from HBase
    result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
    result.show()

    Exécuter le code

    1. Ouvrez un terminal de session Cloud Shell.

    2. Clonez le dépôt GitHub GoogleCloudDataproc/cloud-dataproc dans le terminal de votre session Cloud Shell :

      git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
      

    3. Accédez au répertoire cloud-dataproc/spark-hbase :

      cd cloud-dataproc/spark-hbase
      
      Exemple de résultat :
      user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
      

    4. Envoyez la tâche Dataproc.

    Java

    1. Définissez les versions des composants dans le fichier pom.xml.
      1. La page des versions Dataproc 2.0.x liste les versions des composants Scala, Spark et HBase installés avec les quatre dernières versions mineures de l'image 2.0 et la plus récente.
        1. Pour trouver la version mineure de votre cluster de version d'image 2.0, cliquez sur le nom du cluster sur la page Clusters de la consoleGoogle Cloud pour ouvrir la page Détails du cluster, où la version de l'image du cluster est indiquée.
      2. Vous pouvez également exécuter les commandes suivantes dans un terminal de session SSH à partir du nœud maître de votre cluster pour déterminer les versions des composants :
        1. Vérifiez la version de Scala :
          scala -version
          
        2. Vérifiez la version de Spark (Ctrl+D pour quitter) :
          spark-shell
          
        3. Vérifiez la version de HBase :
          hbase version
          
        4. Identifiez les dépendances de version Spark, Scala et HBase dans le pom.xml Maven :
          <properties>
            <scala.version>scala full version (for example, 2.12.14)</scala.version>
            <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
            <spark.version>spark version (for example, 3.1.2)</spark.version>
            <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
            <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
          </properties>
          
          Remarque : hbase-spark.version correspond à la version actuelle du connecteur Spark HBase. Ne modifiez pas ce numéro de version.
      3. Modifiez le fichier pom.xml dans l'éditeur Cloud Shell pour insérer les numéros de version corrects de Scala, Spark et HBase. Cliquez sur Ouvrir le terminal lorsque vous avez terminé de modifier le fichier pour revenir à la ligne de commande du terminal Cloud Shell.
        cloudshell edit .
        
      4. Passez à Java 8 dans Cloud Shell. Cette version du JDK est nécessaire pour compiler le code (vous pouvez ignorer les messages d'avertissement des plug-ins) :
        sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
        
      5. Vérifiez l'installation de Java 8 :
        java -version
        
        Exemple de résultat :
        openjdk version "1.8..."
         
    2. Créez le fichier jar :
      mvn clean package
      
      Le fichier .jar est placé dans le sous-répertoire /target (par exemple, target/spark-hbase-1.0-SNAPSHOT.jar).
    3. Envoyez le job.

      gcloud dataproc jobs submit spark \
          --class=hbase.SparkHBaseMain  \
          --jars=target/filename.jar \
          --region=cluster-region \
          --cluster=cluster-name
      
      • --jars : insérez le nom de votre fichier .jar après "target/" et avant ".jar".
      • Si vous n'avez pas défini les chemins d'accès aux classes HBase du pilote et de l'exécuteur Spark lorsque vous avez créé votre cluster, vous devez les définir à chaque envoi de job en incluant l'indicateur ‑‑properties suivant dans votre commande d'envoi de job :
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    4. Affichez la sortie de la table HBase dans la sortie du terminal de la session Cloud Shell :

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Python

    1. Envoyez le job.

      gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
          --region=cluster-region \
          --cluster=cluster-name
      
      • Si vous n'avez pas défini les chemins d'accès aux classes HBase du pilote et de l'exécuteur Spark lorsque vous avez créé votre cluster, vous devez les définir à chaque envoi de job en incluant l'indicateur ‑‑properties suivant dans votre commande d'envoi de job :
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    2. Affichez la sortie de la table HBase dans la sortie du terminal de la session Cloud Shell :

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Analyser la table HBase

    Vous pouvez analyser le contenu de votre table HBase en exécutant les commandes suivantes dans le terminal de la session SSH du nœud maître que vous avez ouvert dans Vérifier l'installation du connecteur :

    1. Ouvrez l'interface système HBase :
      hbase shell
      
    2. Analyser "my-table" :
      scan 'my_table'
      
      Exemple de résultat :
      ROW               COLUMN+CELL
       key1             column=cf:name, timestamp=1647364013561, value=foo
       key2             column=cf:name, timestamp=1647364012817, value=bar
      2 row(s)
      Took 0.5009 seconds
      

      Effectuer un nettoyage

      Une fois le tutoriel terminé, vous pouvez procéder au nettoyage des ressources que vous avez créées afin qu'elles ne soient plus comptabilisées dans votre quota et qu'elles ne vous soient plus facturées. Dans les sections suivantes, nous allons voir comment supprimer ou désactiver ces ressources.

      Supprimer le projet

      Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel.

      Pour supprimer le projet :

      1. In the Google Cloud console, go to the Manage resources page.

        Go to Manage resources

      2. In the project list, select the project that you want to delete, and then click Delete.
      3. In the dialog, type the project ID, and then click Shut down to delete the project.

      Supprimer le cluster

      • Pour supprimer le cluster :
        gcloud dataproc clusters delete cluster-name \
            --region=${REGION}