Cloud Storage-Connector mit Apache Spark verwenden

In dieser Anleitung erfahren Sie, wie Sie Beispielcode ausführen, der den Cloud Storage-Connector mit Apache Spark verwendet.

Spark-WordCount-Job vorbereiten

Wählen Sie unten einen Tab aus, um die Schritte zum Vorbereiten eines Jobpakets oder einer Datei zum Senden an den Cluster auszuführen. Sie können einen der folgenden Jobtypen vorbereiten:

Java

  1. pom.xml-Datei auf Ihren lokalen Computer kopieren. Die folgende pom.xml-Datei gibt Scala- und Spark-Bibliotheksabhängigkeiten an, denen ein provided-Bereich zugewiesen wird, um anzugeben, dass der Dataproc-Cluster diese Bibliotheken zur Laufzeit bereitstellt. Die pom.xml-Datei gibt keine Cloud Storage-Abhängigkeit an, da der Connector die standardmäßige HDFS-Schnittstelle implementiert. Wenn ein Spark-Job auf Cloud Storage-Clusterdateien zugreift (Dateien mit URIs, die mit gs:// beginnen), verwendet das System automatisch den Cloud Storage-Connector, um auf die Dateien in Cloud Storage zuzugreifen
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. Den unten aufgeführten WordCount.java-Code auf Ihren lokalen Computer kopieren.
    1. Erstellen Sie einen Satz von Verzeichnissen mit dem Pfad src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Kopieren Sie WordCount.java auf Ihren lokalen Computer in src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java ist ein Spark-Job in Java, der Textdateien aus Cloud Storage liest, eine Wortzählung durchführt und dann die Ergebnisse der Textdatei in Cloud Storage schreibt.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. Erstellen Sie das Paket.
    mvn clean package
    
    Ist dies erfolgreich, wird eine target/word-count-1.0.jar-Datei erstellt.
  4. Das Paket in Cloud Storage bereitstellen.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt-Datei auf Ihren lokalen Computer kopieren. Die folgende build.sbt-Datei gibt Scala- und Spark-Bibliotheksabhängigkeiten an, denen ein provided-Bereich zugewiesen wird, um anzugeben, dass der Dataproc-Cluster diese Bibliotheken zur Laufzeit bereitstellt. Die build.sbt-Datei gibt keine Cloud Storage-Abhängigkeit an, da der Connector die standardmäßige HDFS-Schnittstelle implementiert. Wenn ein Spark-Job auf Cloud Storage-Clusterdateien zugreift (Dateien mit URIs, die mit gs:// beginnen), verwendet das System automatisch den Cloud Storage-Connector, um auf die Dateien in Cloud Storage zuzugreifen
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. word-count.scala auf Ihren lokalen Computer kopieren. Dies ist ein Spark-Job in Java, der Textdateien aus Cloud Storage liest, eine Wortzählung ausführt und die Ergebnisse der Textdatei anschließend in Cloud Storage schreibt.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. Erstellen Sie das Paket.
    sbt clean package
    
    Ist dies erfolgreich, wird eine target/scala-2.11/word-count_2.11-1.0.jar-Datei erstellt.
  4. Das Paket in Cloud Storage bereitstellen.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py auf Ihren lokalen Computer kopieren. Dies ist ein Spark-Job in Python mit PySpark, der Textdateien aus Cloud Storage liest, eine Wortzählung ausführt und die Ergebnisse der Textdatei anschließend in Cloud Storage schreibt.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

Job senden

Führen Sie den folgenden gcloud-Befehl aus, um den Wordcount-Job an Ihren Dataproc-Cluster zu senden.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Ausgabe ansehen

Führen Sie nach Abschluss des Jobs den folgenden gcloud CLI-Befehl aus, um die Wordcount-Ausgabe aufzurufen.

gcloud storage cat gs://${BUCKET_NAME}/output/*

Es sollte etwa diese Ausgabe angezeigt werden:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)