Utilizar el conector de Cloud Storage con Apache Spark

En este tutorial se muestra cómo ejecutar un código de ejemplo que usa el conector de Cloud Storage con Apache Spark.

Preparar la tarea de recuento de palabras de Spark

Selecciona una de las pestañas de abajo para seguir los pasos necesarios para preparar un paquete de trabajo o un archivo que quieras enviar a tu clúster. Puedes preparar uno de los siguientes tipos de trabajo:

Java

  1. Copia el archivo pom.xml en tu equipo local. El siguiente archivo pom.xml especifica las dependencias de la biblioteca de Scala y Spark, a las que se les asigna un ámbito provided para indicar que el clúster de Dataproc proporcionará estas bibliotecas en el tiempo de ejecución. El archivo pom.xml no especifica una dependencia de Cloud Storage porque el conector implementa la interfaz HDFS estándar. Cuando una tarea de Spark accede a archivos de clúster de Cloud Storage (archivos con URIs que empiezan por gs://), el sistema utiliza automáticamente el conector de Cloud Storage para acceder a los archivos de Cloud Storage.
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. Copia el código WordCount.java que se indica más abajo en tu equipo local.
    1. Crea un conjunto de directorios con la ruta src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copia WordCount.java en tu máquina local en src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java es una tarea de Spark en Java que lee archivos de texto de Cloud Storage, realiza un recuento de palabras y, a continuación, escribe los resultados del archivo de texto en Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. Crea el paquete.
    mvn clean package
    
    Si la compilación se realiza correctamente, se crea un target/word-count-1.0.jar.
  4. Transfiere el paquete a Cloud Storage.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copia el archivo build.sbt en tu equipo local. El siguiente archivo build.sbt especifica las dependencias de la biblioteca de Scala y Spark, a las que se les asigna un ámbito provided para indicar que el clúster de Dataproc proporcionará estas bibliotecas en el tiempo de ejecución. El archivo build.sbt no especifica una dependencia de Cloud Storage porque el conector implementa la interfaz HDFS estándar. Cuando un trabajo de Spark accede a archivos de clúster de Cloud Storage (archivos con URIs que empiezan por gs://), el sistema utiliza automáticamente el conector de Cloud Storage para acceder a los archivos de Cloud Storage.
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. Copia word-count.scala en tu equipo local. Se trata de un trabajo de Spark en Java que lee archivos de texto de Cloud Storage, realiza un recuento de palabras y, a continuación, escribe los resultados del archivo de texto en Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. Crea el paquete.
    sbt clean package
    
    Si la compilación se realiza correctamente, se crea un target/scala-2.11/word-count_2.11-1.0.jar.
  4. Transfiere el paquete a Cloud Storage.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copia word-count.py en tu equipo local. Se trata de un trabajo de Spark en Python que usa PySpark para leer archivos de texto de Cloud Storage, realizar un recuento de palabras y, a continuación, escribir los resultados del archivo de texto en Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

Enviar la tarea

Ejecuta el siguiente comando gcloud para enviar la tarea de recuento de palabras a tu clúster de Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Ver el resultado

Una vez que haya finalizado el trabajo, ejecuta el siguiente comando de la CLI de gcloud para ver el resultado del recuento de palabras.

gcloud storage cat gs://${BUCKET_NAME}/output/*

El resultado del recuento de palabras debería ser similar al siguiente:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)