Usa el conector de Cloud Storage con Apache Spark

En este instructivo, se muestra cómo ejecutar código de ejemplo que usa el conector de Cloud Storage con Apache Spark.

Lightning Engine mejora la conectividad a Cloud Storage para optimizar el rendimiento de su motor nativo. El conector mejorado de Cloud Storage minimiza las operaciones de metadatos para reducir los costos, mientras que un committer de salida de archivos optimizado desbloquea el rendimiento y la confiabilidad para las cargas de trabajo de Spark. Completa el formulario de acceso anticipado para solicitar acceso anticipado a esta función de versión preliminar privada.

Objetivos

Escribe un trabajo simple de Spark de recuento de palabras en Java, Scala o Python y, luego, ejecuta el trabajo en un clúster de Managed Service for Apache Spark.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Compute Engine
  • Managed Service for Apache Spark
  • Cloud Storage

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.

Es posible que los usuarios nuevos de Google Cloud cumplan con los requisitos para acceder a una prueba gratuita.

Antes de comenzar

Ejecuta los siguientes pasos para prepararte para ejecutar el código en este instructivo.

  1. Configura tu proyecto. Si es necesario, configura un proyecto con las APIs de Managed Service for Apache Spark, Compute Engine y Cloud Storage habilitadas, y Google Cloud CLI instalada en tu máquina local.

    1. Accede a tu cuenta de Google Cloud . Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    3. Verify that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    5. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    6. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    8. Instala Google Cloud CLI.

    9. Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.

    10. Para inicializar gcloud CLI, ejecuta el siguiente comando:

      gcloud init
    11. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    12. Verify that billing is enabled for your Google Cloud project.

    13. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    14. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    15. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    16. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    17. Instala Google Cloud CLI.

    18. Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.

    19. Para inicializar gcloud CLI, ejecuta el siguiente comando:

      gcloud init

  2. Crea un bucket de Cloud Storage. Necesitas Cloud Storage para guardar los datos del instructivo. Si no tienes uno listo para usar, crea un bucket nuevo en tu proyecto.

    1. En la consola de Google Cloud , ve a la página Buckets de Cloud Storage.

      Ir a Buckets

    2. Haz clic en Crear.
    3. En la página Crear un bucket, ingresa la información de tu bucket. Para ir al paso siguiente, haz clic en Continuar.
      1. En la sección Primeros pasos, haz lo siguiente:
        • Ingresa un nombre único a nivel global que cumpla con los requisitos de nombre de los buckets.
        • Para agregar una etiqueta de bucket, expande la sección Etiquetas (), haz clic en Agregar etiqueta y especifica una key y un value para tu etiqueta.
      2. En la sección Elige dónde almacenar tus datos, haz lo siguiente:
        1. Selecciona un tipo de ubicación
        2. Elige una ubicación en la que se almacenen de forma permanente los datos de tu bucket en el menú desplegable Tipo de ubicación.
          • Si seleccionas el tipo de ubicación birregional, también puedes habilitar la replicación turbo con la casilla de verificación correspondiente.
        3. Para configurar la replicación bucket buckets, selecciona Agregar replicación entre bucket a través del Servicio de transferencia de almacenamiento y sigue estos pasos:

          Configura la replicación entre buckets

          1. En el menú Bucket, selecciona un bucket.
          2. En la sección Configuración de replicación, haz clic en Configurar para configurar los parámetros del trabajo de replicación.

            Aparecerá el panel Configurar la replicación entre buckets.

            • Para filtrar los objetos que se replicarán por prefijo de nombre de objeto, ingresa un prefijo con el que quieras incluir o excluir objetos y, luego, haz clic en Agregar un prefijo.
            • Para establecer una clase de almacenamiento para los objetos replicados, selecciona una clase de almacenamiento en el menú Clase de almacenamiento. Si omites este paso, los objetos replicados usarán la clase de almacenamiento del bucket de destino de forma predeterminada.
            • Haz clic en Listo.
      3. En la sección Elige cómo almacenar tus datos, haz lo siguiente:
        1. Selecciona una clase de almacenamiento predeterminada para el bucket o Autoclass para la administración automática de clases de almacenamiento de los datos de tu bucket.
        2. Para habilitar el espacio de nombres jerárquico, en la sección Optimizar el almacenamiento para cargas de trabajo con uso intensivo de datos, selecciona Habilitar el espacio de nombres jerárquico en este bucket.
      4. En la sección Elige cómo controlar el acceso a los objetos, selecciona si tu bucket aplica o no la prevención del acceso público y elige un método de control de acceso para los objetos del bucket.
      5. En la sección Elige cómo proteger los datos de objetos, haz lo siguiente:
        • Selecciona cualquiera de las opciones de Protección de datos que desees configurar para tu bucket.
          • Para habilitar la eliminación no definitiva, haz clic en la casilla de verificación Política de eliminación no definitiva (para la recuperación de datos) y especifica la cantidad de días que deseas conservar los objetos después de la eliminación.
          • Para configurar el control de versiones de objetos, haz clic en la casilla de verificación Control de versiones de objetos (para el control de versión) y especifica la cantidad máxima de versiones por objeto y la cantidad de días después de los cuales vencen las versiones no actuales.
          • Para habilitar la política de retención en objetos y buckets, haz clic en la casilla de verificación Retención (para cumplimiento) y, luego, haz lo siguiente:
            • Para habilitar el bloqueo de retención de objetos, haz clic en la casilla de verificación Habilitar la retención de objetos.
            • Para habilitar el Bloqueo del bucket, haz clic en la casilla de verificación Establecer política de retención del bucket y elige una unidad de tiempo y una duración para tu período de retención.
        • Para elegir cómo se encriptarán los datos de tus objetos, expande la sección Encriptación de datos () y selecciona un método de encriptación de datos.
    4. Haz clic en Crear.

  3. Configura las variables de entorno local. Configura las variables de entorno en tu máquina local. Configura tu Google Cloud project-id y el nombre del bucket de Cloud Storage que usarás para este instructivo. También proporciona el nombre y la región de un clúster de Managed Service para Apache Spark nuevo o existente. Puedes crear un clúster para usar en este instructivo en el siguiente paso.

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Crea un clúster de Managed Service para Apache Spark. Ejecuta el siguiente comando para crear un clúster de nodo único del servicio administrado para Apache Spark en la zona de Compute Engine especificada.

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. Copia los datos públicos en tu bucket de Cloud Storage Copia un fragmento de texto público de un fragmento de texto de Shakespeare en la carpeta input de tu bucket de Cloud Storage:

    gcloud storage cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Configura un entorno de desarrollo de Java (Apache Maven), Scala (SBT) o Python.

Prepara el trabajo de conteo de palabras de Spark

Selecciona una pestaña, a continuación, para seguir los pasos a fin de preparar un paquete o un archivo de trabajo a fin de enviarlo a tu clúster. Puedes preparar uno de los siguientes tipos de trabajos;

Java

  1. Copia el archivo pom.xml en tu máquina local. En el siguiente archivo pom.xml, se especifican las dependencias de la biblioteca de Scala y Spark, que tienen un permiso provided para indicar que el clúster de Managed Service para Apache Spark proporcionará estas bibliotecas en el entorno de ejecución. El archivo pom.xml no especifica una dependencia de Cloud Storage porque el conector implementa la interfaz de HDFS estándar. Cuando un trabajo de Spark accede a los archivos del clúster de Cloud Storage (archivos con URI que comienzan con gs://), el sistema usa de forma automática el conector de Cloud Storage para acceder a los archivos en Cloud Storage
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. Copia el código WordCount.java que aparece a continuación en tu máquina local.
    1. Crea un conjunto de directorios con la ruta src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copia WordCount.java en tu máquina local en src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java es un trabajo de Spark en Java que lee archivos de texto de Cloud Storage, realiza un conteo de palabras y, luego, escribe los resultados del archivo de texto en Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. Compila el paquete.
    mvn clean package
    
    Si la compilación se realiza de forma correcta, se crea un target/word-count-1.0.jar.
  4. Almacena el paquete en etapa intermedia en Cloud Storage.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copia el archivo build.sbt en tu máquina local. En el siguiente archivo build.sbt, se especifican las dependencias de la biblioteca de Scala y Spark, que tienen un permiso provided para indicar que el clúster de Managed Service para Apache Spark proporcionará estas bibliotecas en el entorno de ejecución. El archivo build.sbt no especifica una dependencia de Cloud Storage porque el conector implementa la interfaz de HDFS estándar. Cuando un trabajo de Spark accede a los archivos del clúster de Cloud Storage (archivos con URI que comienzan con gs://), el sistema usa de forma automática el conector de Cloud Storage para acceder a los archivos en Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. Copia word-count.scala a tu máquina local. Este es un trabajo de Spark en Java que lee archivos de texto de Cloud Storage, realiza un conteo de palabras y, luego, escribe los resultados del archivo de texto en Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. Compila el paquete.
    sbt clean package
    
    Si la compilación se realiza de forma correcta, se crea un target/scala-2.11/word-count_2.11-1.0.jar.
  4. Almacena el paquete en etapa intermedia en Cloud Storage.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copia word-count.py a tu máquina local. Este es un trabajo de Spark en Python con PySpark que lee archivos de texto de Cloud Storage, realiza un conteo de palabras y, luego, escribe los resultados del archivo de texto en Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

Envía el trabajo

Ejecuta el siguiente comando de gcloud para enviar el trabajo de conteo de palabras a tu clúster de Managed Service for Apache Spark.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Revise el resultado.

Una vez que finalice el trabajo, ejecuta el siguiente comando de gcloud CLI para ver el resultado del recuento de palabras.

gcloud storage cat gs://${BUCKET_NAME}/output/*

El resultado del conteo de palabras debe ser similar al siguiente:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

Limpia

Una vez que completes el instructivo, puedes limpiar los recursos que creaste para que dejen de usar la cuota y generar cargos. En las siguientes secciones, se describe cómo borrar o desactivar estos recursos.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra el clúster de Managed Service for Apache Spark

En lugar de borrar tu proyecto, es posible que solo quieras borrar tu clúster dentro del proyecto.

Borra el bucket de Cloud Storage

Consola deGoogle Cloud

  1. En la Google Cloud consola, ve a la página Buckets de Cloud Storage.

    Ir a Buckets

  2. Haz clic en la casilla de verificación del bucket que deseas borrar.
  3. Para borrar el bucket, haz clic en Borrar y sigue las instrucciones.

Línea de comandos

    Borra el bucket:
    gcloud storage buckets delete BUCKET_NAME

¿Qué sigue?