Apache Spark mit HBase in Dataproc verwenden

Ziele

In dieser Anleitung wird Folgendes erläutert:

  1. Dataproc-Cluster erstellen und Apache HBase und Apache ZooKeeper auf dem Cluster installieren
  2. HBase-Tabelle mit der HBase-Shell erstellen, die auf dem Masternode des Dataproc-Clusters ausgeführt wird
  3. Mit Cloud Shell einen Java- oder PySpark-Spark-Job an den Dataproc-Dienst senden, der Daten in die HBase-Tabelle schreibt und dann Daten daraus liest

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.

Neuen Nutzern von Google Cloud steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweise

Erstellen Sie ein Google Cloud Platform-Projekt, falls dies noch nicht geschehen ist.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Dataproc-Cluster erstellen

    1. Führen Sie den folgenden Befehl in einem Cloud Shell-Sitzungsterminal aus, um:

      • Installieren Sie die Komponenten HBase und ZooKeeper.
      • Stellen Sie drei Worker-Knoten bereit. Für den Code in dieser Anleitung werden drei bis fünf Worker empfohlen.
      • Aktivieren Sie das Component Gateway.
      • Image-Version 2.0 verwenden
      • Verwenden Sie das Flag --properties, um die HBase-Konfiguration und die HBase-Bibliothek den Klassenpfaden von Spark-Treiber und -Executor hinzuzufügen.
    gcloud dataproc clusters create cluster-name \
        --region=region \
        --optional-components=HBASE,ZOOKEEPER \
        --num-workers=3 \
        --enable-component-gateway \
        --image-version=2.0 \
        --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
    

    Connector-Installation prüfen

    1. Stellen Sie über die Google Cloud -Konsole oder ein Cloud Shell-Sitzungsterminal eine SSH-Verbindung zum Masterknoten des Dataproc-Clusters her.

    2. Prüfen Sie die Installation des Apache HBase Spark-Connectors auf dem Masterknoten:

      ls -l /usr/lib/spark/jars | grep hbase-spark
      
      Beispielausgabe:
      -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
      

    3. Lassen Sie das Terminal der SSH-Sitzung geöffnet, um:

      1. HBase-Tabelle erstellen
      2. (Java-Nutzer): Befehle auf dem Masterknoten des Clusters ausführen, um die Versionen der auf dem Cluster installierten Komponenten zu ermitteln
      3. HBase-Tabelle scannen, nachdem Sie den Code ausgeführt haben

    HBase-Tabelle erstellen

    Führen Sie die in diesem Abschnitt aufgeführten Befehle im SSH-Sitzungsterminal des Masterknotens aus, das Sie im vorherigen Schritt geöffnet haben.

    1. HBase-Shell öffnen:

      hbase shell
      

    2. Erstellen Sie eine HBase-Tabelle namens „my-table“ mit der Spaltenfamilie „cf“:

      create 'my_table','cf'
      

      1. Klicken Sie in der Google Cloud Console in den Google Cloud Links zum Component Gateway in der Console auf HBase, um die Apache HBase-Benutzeroberfläche zu öffnen und die Tabellenerstellung zu bestätigen. my-table wird auf der Seite Startseite im Bereich Tabellen aufgeführt.

    Spark-Code ansehen

    Java

    package hbase;
    
    import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class SparkHBaseMain {
        public static class SampleData implements Serializable {
            private String key;
            private String name;
    
    
            public SampleData(String key, String name) {
                this.key = key;
                this.name = name;
            }
    
            public SampleData() {
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
        }
        public static void main(String[] args) {
            // Init SparkSession
            SparkSession spark = SparkSession
                    .builder()
                    .master("yarn")
                    .appName("spark-hbase-tutorial")
                    .getOrCreate();
    
            // Data Schema
            String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                    "\"rowkey\":\"key\"," +
                    "\"columns\":{" +
                    "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                    "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                    "}" +
                    "}";
    
            Map<String, String> optionsMap = new HashMap<String, String>();
            optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);
    
            Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                    new SampleData("key1", "foo"),
                    new SampleData("key2", "bar")), SampleData.class);
    
            // Write to HBase
            ds.write()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .mode("overwrite")
                    .save();
    
            // Read from HBase
            Dataset dataset = spark.read()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .load();
            dataset.show();
        }
    }
    

    Python

    from pyspark.sql import SparkSession
    
    # Initialize Spark Session
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-hbase-tutorial') \
      .getOrCreate()
    
    data_source_format = ''
    
    # Create some test data
    df = spark.createDataFrame(
        [
            ("key1", "foo"),
            ("key2", "bar"),
        ],
        ["key", "name"]
    )
    
    # Define the schema for catalog
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"my_table"},
        "rowkey":"key",
        "columns":{
            "key":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"cf", "col":"name", "type":"string"}
        }
    }""".split())
    
    # Write to HBase
    df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()
    
    # Read from HBase
    result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
    result.show()

    Code ausführen

    1. Öffnen Sie ein Cloud Shell-Sitzungsterminal.

    2. Klonen Sie das GitHub-Repository GoogleCloudDataproc/cloud-dataproc in das Terminal Ihrer Cloud Shell-Sitzung:

      git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
      

    3. Wechseln Sie in das Verzeichnis cloud-dataproc/spark-hbase:

      cd cloud-dataproc/spark-hbase
      
      Beispielausgabe:
      user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
      

    4. Senden Sie den Dataproc-Job.

    Java

    1. Legen Sie die Komponentenversionen in der Datei pom.xml fest.
      1. Auf der Seite Dataproc-Releaseversionen 2.0.x sind die Scala-, Spark- und HBase-Komponentenversionen aufgeführt, die mit der neuesten und den letzten vier untergeordneten Image-Versionen 2.0 installiert werden.
        1. Wenn Sie die untergeordnete Version Ihres Clusters mit Imageversion 2.0 aufrufen möchten, klicken Sie in derGoogle Cloud -Konsole auf der Seite Cluster auf den Clusternamen, um die Seite Clusterdetails zu öffnen. Dort wird die Imageversion des Clusters aufgeführt.
      2. Alternativ können Sie die folgenden Befehle in einem SSH-Sitzungsterminal vom Masterknoten Ihres Clusters aus ausführen, um die Komponentenversionen zu ermitteln:
        1. Scala-Version prüfen:
          scala -version
          
        2. Spark-Version prüfen (mit Strg+D beenden):
          spark-shell
          
        3. HBase-Version prüfen:
          hbase version
          
        4. Ermitteln Sie die Versionsabhängigkeiten von Spark, Scala und HBase in der Maven-Datei pom.xml:
          <properties>
            <scala.version>scala full version (for example, 2.12.14)</scala.version>
            <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
            <spark.version>spark version (for example, 3.1.2)</spark.version>
            <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
            <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
          </properties>
          
          Hinweis: hbase-spark.version ist die aktuelle Version des Spark HBase-Connectors. Lassen Sie diese Versionsnummer unverändert.
      3. Bearbeiten Sie die Datei pom.xml im Cloud Shell-Editor, um die richtigen Versionsnummern für Scala, Spark und HBase einzufügen. Klicken Sie nach Abschluss der Bearbeitung auf Terminal öffnen, um zur Befehlszeile des Cloud Shell-Terminals zurückzukehren.
        cloudshell edit .
        
      4. Wechseln Sie in Cloud Shell zu Java 8. Diese JDK-Version ist zum Erstellen des Codes erforderlich. Sie können alle Plug-in-Warnmeldungen ignorieren:
        sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
        
      5. Java 8-Installation prüfen:
        java -version
        
        Beispielausgabe:
        openjdk version "1.8..."
         
    2. Erstellen Sie die Datei jar:
      mvn clean package
      
      Die Datei .jar wird im Unterverzeichnis /target abgelegt (z. B. target/spark-hbase-1.0-SNAPSHOT.jar).
    3. Senden Sie den Job.

      gcloud dataproc jobs submit spark \
          --class=hbase.SparkHBaseMain  \
          --jars=target/filename.jar \
          --region=cluster-region \
          --cluster=cluster-name
      
      • --jars: Fügen Sie den Namen Ihrer .jar-Datei nach „target/“ und vor „.jar“ ein.
      • Wenn Sie die HBase-Klassenpfade für Spark-Treiber und -Executor beim Erstellen des Clusters nicht festgelegt haben, müssen Sie sie bei jeder Jobübergabe festlegen, indem Sie das folgende ‑‑properties-Flag in den Befehl zum Übergeben des Jobs einfügen:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    4. HBase-Tabellenausgabe in der Terminalausgabe der Cloud Shell-Sitzung ansehen:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Python

    1. Senden Sie den Job.

      gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
          --region=cluster-region \
          --cluster=cluster-name
      
      • Wenn Sie die HBase-Klassenpfade für Spark-Treiber und -Executor beim Erstellen des Clusters nicht festgelegt haben, müssen Sie sie bei jeder Jobübergabe festlegen, indem Sie das folgende ‑‑properties-Flag in den Befehl zum Übergeben des Jobs einfügen:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    2. HBase-Tabellenausgabe in der Terminalausgabe der Cloud Shell-Sitzung ansehen:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    HBase-Tabelle scannen

    Sie können den Inhalt Ihrer HBase-Tabelle scannen, indem Sie die folgenden Befehle im Terminal der SSH-Sitzung des Masterknotens ausführen, die Sie unter Connector-Installation überprüfen geöffnet haben:

    1. HBase-Shell öffnen:
      hbase shell
      
    2. Tabelle „my-table“ scannen:
      scan 'my_table'
      
      Beispielausgabe:
      ROW               COLUMN+CELL
       key1             column=cf:name, timestamp=1647364013561, value=foo
       key2             column=cf:name, timestamp=1647364012817, value=bar
      2 row(s)
      Took 0.5009 seconds
      

      Bereinigen

      Nachdem Sie die Anleitung abgeschlossen haben, können Sie die erstellten Ressourcen bereinigen, damit sie keine Kontingente mehr nutzen und keine Gebühren mehr anfallen. In den folgenden Abschnitten erfahren Sie, wie Sie diese Ressourcen löschen oder deaktivieren.

      Projekt löschen

      Am einfachsten vermeiden Sie weitere Kosten durch Löschen des für die Anleitung erstellten Projekts.

      So löschen Sie das Projekt:

      1. In the Google Cloud console, go to the Manage resources page.

        Go to Manage resources

      2. In the project list, select the project that you want to delete, and then click Delete.
      3. In the dialog, type the project ID, and then click Shut down to delete the project.

      Cluster löschen

      • So löschen Sie den Cluster:
        gcloud dataproc clusters delete cluster-name \
            --region=${REGION}