将 Cloud Storage 连接器与 Apache Spark 搭配使用

本教程介绍了如何执行将 Cloud Storage 连接器Apache Spark 搭配使用的示例代码。

准备 Spark wordcount 作业

在下面选择一个标签页,按照步骤准备作业软件包或文件以提交到集群。您可以准备以下作业类型之一:

Java

  1. pom.xml 文件复制到本地机器。 以下 pom.xml 文件指定 Scala 和 Spark 库依赖项,它们具有 provided 范围,指示 Dataproc 集群将在运行时提供这些库。pom.xml 文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为 gs:// 的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. 将下面列出的 WordCount.java 代码复制到您的本地机器
    1. 创建一组路径为 src/main/java/dataproc/codelab 的目录:
      mkdir -p src/main/java/dataproc/codelab
      
    2. WordCount.java 复制到您的本地机器的 src/main/java/dataproc/codelab
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java 是 Java 中一个 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. 构建软件包
    mvn clean package
    
    如果构建成功,则系统会创建 target/word-count-1.0.jar
  4. 将软件包暂存到 Cloud Storage
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt 文件复制到本地机器。 以下 build.sbt 文件指定 Scala 和 Spark 库依赖项,它们具有 provided 范围,指示 Dataproc 集群将在运行时提供这些库。build.sbt 文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为 gs:// 的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. word-count.scala 复制到本地机器。 这是 Java 中一个 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. 构建软件包
    sbt clean package
    
    如果构建成功,则系统会创建 target/scala-2.11/word-count_2.11-1.0.jar
  4. 将软件包暂存到 Cloud Storage
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py 复制到本地机器。 这是 Python 中一个使用 PySpark 的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

提交作业

运行以下 gcloud 命令,将 Wordcount 作业提交到 Dataproc 集群。

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

查看输出

作业完成后,运行以下 gcloud CLI 命令以查看字数统计输出。

gcloud storage cat gs://${BUCKET_NAME}/output/*

字数统计输出应类似于以下内容:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)