在 Managed Service for Apache Spark 上编写并运行 Spark Scala 作业

本教程介绍了创建 Spark Scala 作业并将该作业提交到 Managed Service for Apache Spark 集群的不同方法,其中包括如何执行以下操作:

  • 使用 Scala REPL(Read-Evaluate-Print-Loop 或交互式解释器)或 SBT构建工具,通过命令行在本地机器上编写和编译 Spark Scala“Hello World”应用
  • 将经过编译的 Scala 类打包成一个包含清单的 jar 文件
  • 将 Scala jar 提交到在 Managed Service for Apache Spark 集群上运行的 Spark 作业
  • 通过 Google Cloud 控制台检查 Scala 作业的输出

本教程还向您展示了如何执行以下操作:

  • 使用 spark-shell REPL 直接在 Managed Service for Apache Spark 集群上编写和运行 Spark Scala“WordCount”mapreduce 作业

  • 在集群上运行预先安装的 Apache Spark 和 Hadoop 示例

设置 Google Cloud Platform 项目

如果您尚未对项目进行设置,请按以下步骤进行操作:

  1. 设置项目
  2. 创建 Cloud Storage 存储桶
  3. 创建 Managed Service for Apache Spark 集群

在本地编写和编译 Scala 代码

作为本教程的一个简单练习,请在开发机器上本地使用 Scala REPLSBT 命令行界面编写“Hello World”Scala 应用。

使用 Scala

  1. Scala 安装页面下载 Scala 二进制文件
  2. Scala 安装说明中所示那样,解压缩该文件,设置 SCALA_HOME 环境变量,并将其添加到您的路径中。例如:

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. 启动 Scala REPL

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. HelloWorld 代码复制并粘贴到 Scala REPL 中

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }

  5. 保存 HelloWorld.scala 并退出 REPL

    scala> :save HelloWorld.scala
    scala> :q
    

  6. 使用 scalac 进行编译

    $ scalac HelloWorld.scala
    

  7. 列出已编译的 .class 文件

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

使用 SBT

  1. 下载 SBT

  2. 创建一个“HelloWorld”项目,如下所示

    $ mkdir hello
    $ cd hello
    $ echo \
    'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
    HelloWorld.scala
    

  3. 创建一个 sbt.build 配置文件,以将 artifactName(您将在下面生成的 jar 文件的名称)设置为“HelloWorld.jar”(请参阅修改默认工件

    echo \
    'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
    "HelloWorld.jar" }' > \
    build.sbt
    

  4. 启动 SBT 并运行代码

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. 将代码打包成带有指定了主类入口点 (HelloWorld) 的清单的 jar 文件,然后退出

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

创建 jar

使用 SBT 或使用 jar 命令创建 jar 文件

使用 SBT 创建 jar

SBT package 命令创建一个 jar 文件(请参阅使用 SBT)。

手动创建 jar

  1. 将目录 (cd) 更改为包含已编译的 HelloWorld*.class 文件的目录,然后运行以下命令将类文件打包到在清单指定了主类入口点 (HelloWorld) 的 jar 中。
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

将 jar 复制到 Cloud Storage

  1. 使用 Google Cloud CLI 将 jar 复制到项目中的 Cloud Storage 存储桶
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

将 jar 提交到 Managed Service for Apache Spark 作业

  1. 使用 Google Cloud 控制台 将 jar 文件提交到您的 Managed Service for Apache Spark 作业。按如下所示,填写提交作业 页面上的字段:

    • 集群:从集群列表中选择集群名称
    • 作业类型:Spark
    • 主类或 jar:指定 HelloWorld jar (gs://your-bucket-name/HelloWorld.jar) 的 Cloud Storage URI 路径。

      如果 jar 不包含指定了代码入口点(“Main-Class: HelloWorld”)的清单,则“主类或 jar”字段应声明主类的名称(“HelloWorld”),而您应使用 jar 文件的 URI 路径 (gs://your-bucket-name/HelloWorld.jar) 填写“Jar 文件”字段。

  2. 点击提交以启动作业。作业启动后,会被添加到作业列表中。

  3. 点击“作业 ID”以打开作业页面,您可以在该页面中查看作业的驱动程序输出。

使用集群的 spark-shell REPL 编写并运行 Spark Scala 代码

建议您直接在自己的 Managed Service for Apache Spark 集群上开发 Scala 应用。Hadoop 和 Spark 预先安装在 Managed Service for Apache Spark 集群上,并且它们配置了 Cloud Storage 连接器,该连接器允许您的代码直接从 Cloud Storage 读取数据并将数据写入 Cloud Storage。

此示例展示了如何通过 SSH 连接到项目的 Managed Service for Apache Spark 集群主服务器节点,然后使用 spark-shell REPL 创建并运行 Scala wordcount MapReduce 应用。

  1. 通过 SSH 连接到 Managed Service for Apache Spark 集群的主节点

    1. 在 控制台中,前往项目的 Managed Service for Apache Spark 集群 页面,然后点击您的 集群名称。 Google Cloud

    2. 在集群详情页面上,选择虚拟机实例 标签页,然后点击集群名称行右侧显示的 SSH 选项。

      此时会在主节点上的主目录打开一个浏览器窗口

  2. 启动 spark-shell

    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    

  3. 从位于公共 Cloud Storage 中的莎士比亚文本片段创建 RDD(弹性分布式数据集)

    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    

  4. 针对该文本运行字数统计 mapreduce,然后显示 wordcounts 结果

    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
    (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    

  5. 将计数保存到 Cloud Storage 中的 <bucket-name>/wordcounts-out,然后退出 scala-shell

    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    

  6. 使用 gcloud CLI 列出输出文件并显示文件内容

    $ gcloud storage ls gs://bucket-name/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    

  7. 检查 gs://<bucket-name>/wordcounts-out/part-00000 内容

    $ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

运行预先安装的示例代码

Managed Service for Apache Spark 主节点包含具有标准 Apache Hadoop 和 Spark 示例的可运行 jar 文件。

Jar 类型 Master node /usr/lib/ location GitHub 来源 Apache 文档
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar 来源链接 MapReduce 教程
Spark spark/lib/spark-examples.jar 来源链接 Spark 示例

通过命令行向集群提交示例

您可以使用 Google Cloud CLI gcloud 命令行工具从本地开发机器提交示例(请参阅使用 Google Cloud 控制台从 Google Cloud 控制台提交作业)。

Hadoop WordCount 示例

gcloud dataproc jobs submit hadoop --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
    --class=org.apache.hadoop.examples.WordCount \
    -- URI of input file URI of output file

Spark WordCount 示例

gcloud dataproc jobs submit spark --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.JavaWordCount \
    -- URI of input file

关闭集群

为避免不断产生费用,请关闭本教程使用的集群并删除 Cloud Storage 资源(Cloud Storage 存储分区和文件)。

要关闭集群,请运行以下命令:

gcloud dataproc clusters delete cluster-name \
    --region=region

要删除 Cloud Storage jar 文件,请运行以下命令:

gcloud storage rm gs://bucket-name/HelloWorld.jar

您可以使用以下命令删除存储分区及其所有文件夹和文件:

gcloud storage rm gs://bucket-name/ --recursive

后续步骤