Use o conetor do Cloud Storage com o Apache Spark

Este tutorial mostra como executar um código de exemplo que usa o conetor do Cloud Storage com o Apache Spark.

Prepare a tarefa de contagem de palavras do Spark

Selecione um separador abaixo para seguir os passos de preparação de um pacote de tarefas ou um ficheiro a enviar para o cluster. Pode preparar um dos seguintes tipos de tarefas:

Java

  1. Copie o ficheiro pom.xml para a sua máquina local. O ficheiro pom.xml seguinte especifica as dependências da biblioteca Scala e Spark, às quais é atribuído um âmbito provided para indicar que o cluster do Dataproc vai fornecer estas bibliotecas no tempo de execução. O ficheiro pom.xml não especifica uma dependência do Cloud Storage porque o conetor implementa a interface HDFS padrão. Quando uma tarefa do Spark acede a ficheiros do cluster do Cloud Storage (ficheiros com URIs que começam por gs://), o sistema usa automaticamente o conetor do Cloud Storage para aceder aos ficheiros no Cloud Storage .major.minor
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. Copie o código WordCount.java apresentado abaixo para o seu computador local.
    1. Crie um conjunto de diretórios com o caminho src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copie WordCount.java para a sua máquina local para src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java é uma tarefa do Spark em Java que lê ficheiros de texto do Cloud Storage, faz uma contagem de palavras e, em seguida, escreve os resultados do ficheiro de texto no Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. Crie o pacote.
    mvn clean package
    
    Se a compilação for bem-sucedida, é criado um target/word-count-1.0.jar.
  4. Prepare o pacote para o Cloud Storage.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copie o ficheiro build.sbt para a sua máquina local. O ficheiro build.sbt seguinte especifica as dependências da biblioteca Scala e Spark, às quais é atribuído um âmbito provided para indicar que o cluster do Dataproc vai fornecer estas bibliotecas no tempo de execução. O ficheiro build.sbt não especifica uma dependência do Cloud Storage porque o conetor implementa a interface HDFS padrão. Quando uma tarefa do Spark acede a ficheiros do cluster do Cloud Storage (ficheiros com URIs que começam por gs://), o sistema usa automaticamente o conetor do Cloud Storage para aceder aos ficheiros no Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. Copie word-count.scala para a sua máquina local. Este é um trabalho do Spark em Java que lê ficheiros de texto do Cloud Storage, faz uma contagem de palavras e, em seguida, escreve os resultados do ficheiro de texto no Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. Crie o pacote.
    sbt clean package
    
    Se a compilação for bem-sucedida, é criado um target/scala-2.11/word-count_2.11-1.0.jar.
  4. Prepare o pacote para o Cloud Storage.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copie word-count.py para a sua máquina local. Este é um trabalho do Spark em Python que usa o PySpark para ler ficheiros de texto do Cloud Storage, faz uma contagem de palavras e, em seguida, escreve os resultados do ficheiro de texto no Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

Envie o trabalho

Execute o seguinte comando gcloud para enviar a tarefa wordcount para o seu cluster do Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Veja a saída

Após a conclusão da tarefa, execute o seguinte comando da CLI gcloud para ver o resultado da contagem de palavras.

gcloud storage cat gs://${BUCKET_NAME}/output/*

O resultado da contagem de palavras deve ser semelhante ao seguinte:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)