Usa Apache Spark con HBase en el servicio administrado para Apache Spark

Objetivos

En este instructivo, se muestra cómo hacer lo siguiente:

  1. Crea un clúster de Managed Service for Apache Spark y, luego, instala Apache HBase y Apache ZooKeeper en el clúster.
  2. Crea una tabla de HBase con el shell de HBase que se ejecuta en el nodo principal del clúster del servicio administrado para Apache Spark
  3. Usa Cloud Shell para enviar un trabajo de Spark en Java o PySpark al servicio de Managed Service for Apache Spark que escribe datos en la tabla de HBase y, luego, los lee.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.

Es posible que los usuarios nuevos de Google Cloud cumplan con los requisitos para acceder a una prueba gratuita.

Antes de comenzar

Si aún no lo hiciste, crea un proyecto de Google Cloud Platform.

  1. Accede a tu cuenta de Google Cloud . Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

Crea un clúster de Managed Service para Apache Spark

  1. Ejecuta el siguiente comando en una terminal de sesión de Cloud Shell para hacer lo siguiente:

    • Instala los componentes HBase y ZooKeeper.
    • Aprovisiona tres nodos trabajadores (se recomienda usar entre tres y cinco trabajadores para ejecutar el código de este instructivo).
    • Habilita la puerta de enlace de componentes.
    • Usar la versión de imagen 2.0
    • Usa la marca --properties para agregar la configuración y la biblioteca de HBase a las rutas de acceso de clase del controlador y el ejecutor de Spark.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Verifica la instalación del conector

  1. Desde la consola de Google Cloud o una terminal de sesión de Cloud Shell, establece una conexión SSH con el instancia principal del clúster de Managed Service para Apache Spark.

  2. Verifica la instalación del conector de Apache HBase Spark en el nodo principal:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Resultado de muestra:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Mantén abierta la terminal de la sesión SSH para hacer lo siguiente:

    1. Crea una tabla de HBase
    2. (Usuarios de Java): Ejecuta comandos en el nodo principal del clúster para determinar las versiones de los componentes instalados en el clúster.
    3. Analiza tu tabla de HBase después de ejecutar el código.

Crea una tabla de HBase

Ejecuta los comandos que se indican en esta sección en la terminal de la sesión de SSH del nodo principal que abriste en el paso anterior.

  1. Abre la shell de HBase:

    hbase shell
    

  2. Crea una tabla "my-table" de HBase con una familia de columnas "cf":

    create 'my_table','cf'
    

    1. Para confirmar la creación de la tabla, en la Google Cloud consola, haz clic en HBase en los vínculos de Component Gateway de la consolaGoogle Cloud para abrir la IU de Apache HBase. my-table aparece en la sección Tablas de la página Página principal.

Cómo ver el código de Spark

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;


        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Ejecuta el código

  1. Abre una terminal de sesión de Cloud Shell.

  2. Clona el repositorio de GitHub GoogleCloudDataproc/cloud-dataproc en la terminal de tu sesión de Cloud Shell:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Cambia al directorio cloud-dataproc/spark-hbase:

    cd cloud-dataproc/spark-hbase
    
    Resultado de muestra:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Envía el trabajo del servicio administrado para Apache Spark.

Java

  1. Configura las versiones de los componentes en el archivo pom.xml.
    1. En la página de versiones de actualización 2.0.x del servicio administrado para Apache Spark, se enumeran las versiones de los componentes de Scala, Spark y HBase instalados con las cuatro versiones secundarias de la imagen 2.0 más recientes y las cuatro anteriores.
      1. Para encontrar la versión secundaria de tu clúster de versión de imagen 2.0, haz clic en el nombre del clúster en la página Clústeres de la consola deGoogle Cloud para abrir la página Detalles del clúster, en la que se muestra la versión de imagen del clúster.
    2. Como alternativa, puedes ejecutar los siguientes comandos en una terminal de sesión SSH desde el nodo principal de tu clúster para determinar las versiones de los componentes:
      1. Verifica la versión de Scala:
        scala -version
        
      2. Verifica la versión de Spark (presiona Ctrl + D para salir):
        spark-shell
        
      3. Verifica la versión de HBase:
        hbase version
        
      4. Identifica las dependencias de versión de Spark, Scala y HBase en pom.xml de Maven:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Nota: hbase-spark.version es la versión actual del conector de HBase de Spark. No cambies este número de versión.
    3. Edita el archivo pom.xml en el editor de Cloud Shell para insertar los números de versión correctos de Scala, Spark y HBase. Cuando termines de editar, haz clic en Abrir terminal para volver a la línea de comandos de la terminal de Cloud Shell.
      cloudshell edit .
      
    4. Cambia a Java 8 en Cloud Shell. Esta versión del JDK es necesaria para compilar el código (puedes ignorar los mensajes de advertencia de los complementos):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Verifica la instalación de Java 8:
      java -version
      
      Resultado de muestra:
      openjdk version "1.8..."
       
  2. Compila el archivo jar:
    mvn clean package
    
    El archivo .jar se coloca en el subdirectorio /target (por ejemplo, target/spark-hbase-1.0-SNAPSHOT.jar).
  3. Envía el trabajo.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: Inserta el nombre de tu archivo .jar después de "target/" y antes de ".jar".
    • Si no estableciste las rutas de acceso de clase de HBase del controlador y el ejecutor de Spark cuando creaste tu clúster, debes establecerlas con cada envío de trabajo. Para ello, incluye la siguiente marca ‑‑properties en el comando de envío de trabajo:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Visualiza el resultado de la tabla de HBase en el resultado de la terminal de la sesión de Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Envía el trabajo.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Si no estableciste las rutas de acceso de clase de HBase del controlador y el ejecutor de Spark cuando creaste tu clúster, debes establecerlas con cada envío de trabajo. Para ello, incluye la siguiente marca ‑‑properties en el comando de envío de trabajo:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Visualiza el resultado de la tabla de HBase en el resultado de la terminal de la sesión de Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Analiza la tabla de HBase

Para analizar el contenido de tu tabla de HBase, ejecuta los siguientes comandos en la terminal de la sesión de SSH del nodo principal que abriste en Verifica la instalación del conector:

  1. Abre la shell de HBase:
    hbase shell
    
  2. Analiza "my-table":
    scan 'my_table'
    
    Resultado de muestra:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Realiza una limpieza

Una vez que completes el instructivo, puedes limpiar los recursos que creaste para que dejen de usar la cuota y generar cargos. En las siguientes secciones, se describe cómo borrar o desactivar estos recursos.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra el clúster

  • Para borrar tu clúster, realiza los siguientes pasos:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}