Utiliser le connecteur Cloud Storage avec Apache Spark

Ce tutoriel vous montre comment exécuter un exemple de code utilisant le connecteur Cloud Storage avec Apache Spark.

Préparer la tâche de décompte Spark

Sélectionnez un onglet ci-dessous pour suivre la procédure de préparation d'un package ou d'un fichier de tâche à envoyer au cluster. Vous pouvez préparer l'un des types de tâches suivants :

Java

  1. Copiez le fichier pom.xml sur votre ordinateur local. Le fichier pom.xml suivant spécifie les dépendances des bibliothèques Scala et Spark, auxquelles un champ d'application provided est attribué pour indiquer que le cluster Dataproc fournira ces bibliothèques lors de l'exécution. Le fichier pom.xml ne spécifie pas de dépendance Cloud Storage, car le connecteur met en œuvre l'interface HDFS standard. Lorsqu'une tâche Spark accède à des fichiers de cluster Cloud Storage (fichiers dont les URI commencent par gs:// ), le système utilise automatiquement le connecteur Cloud Storage pour accéder aux fichiers dans Cloud Storage.
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. Copiez le code WordCount.java indiqué ci-dessous sur votre ordinateur local.
    1. Créez un ensemble de répertoires avec le chemin d'accès src/main/java/dataproc/codelab :
      mkdir -p src/main/java/dataproc/codelab
      
    2. Copiez WordCount.java sur votre ordinateur local dans src/main/java/dataproc/codelab :
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java est une tâche Spark en Java qui lit les fichiers texte de Cloud Storage, effectue un décompte des mots, puis écrit les résultats des fichiers texte dans Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. Créez le package.
    mvn clean package
    
    Si la compilation réussit, un fichier target/word-count-1.0.jar est créé.
  4. Entreposez le package sur Cloud Storage.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Copiez le fichier build.sbt sur votre ordinateur local. Le fichier build.sbt suivant spécifie les dépendances des bibliothèques Scala et Spark, auxquelles un champ d'application provided est attribué pour indiquer que le cluster Dataproc fournira ces bibliothèques lors de l'exécution. Le fichier build.sbt ne spécifie pas de dépendance Cloud Storage, car le connecteur met en œuvre l'interface HDFS standard. Lorsqu'une tâche Spark accède à des fichiers de cluster Cloud Storage (fichiers dont les URI commencent par gs:// ), le système utilise automatiquement le connecteur Cloud Storage pour accéder aux fichiers dans Cloud Storage.
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. Copiez word-count.scala sur votre ordinateur local. Il s'agit d'une tâche Spark en Java qui lit les fichiers texte de Cloud Storage, effectue un décompte des mots, puis écrit les résultats des fichiers texte dans Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. Créez le package.
    sbt clean package
    
    Si la compilation réussit, un fichier target/scala-2.11/word-count_2.11-1.0.jar est créé.
  4. Entreposez le package sur Cloud Storage.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Copiez word-count.py sur votre ordinateur local. Il s'agit d'une tâche Spark en Python qui lit les fichiers texte de Cloud Storage, effectue un décompte des mots, puis écrit les résultats des fichiers texte dans Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

Envoyer la tâche

Exécutez la commande gcloud suivante pour envoyer la tâche de décompte à votre cluster Dataproc.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Consulter le résultat

Une fois le job terminé, exécutez la commande suivante de gcloud CLI pour afficher le résultat du décompte.

gcloud storage cat gs://${BUCKET_NAME}/output/*

Le résultat du décompte doit ressembler à celui-ci :

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)