搭配 Apache Spark 使用 Cloud Storage 連接器

本教學課程說明如何執行程式碼範例,搭配使用 Apache SparkCloud Storage 連接器

準備 Spark 字數統計工作

選取下方分頁標籤,按照步驟準備要提交至叢集的工作套件或檔案。你可以準備下列其中一種工作類型:

Java

  1. pom.xml 檔案複製到本機電腦。 下列 pom.xml 檔案會指定 Scala 和 Spark 程式庫依附元件,並提供 provided 範圍,指出 Dataproc 叢集會在執行階段提供這些程式庫。pom.xml 檔案不會指定 Cloud Storage 依附元件,因為連接器會實作標準 HDFS 介面。當 Spark 工作存取 Cloud Storage 叢集檔案 (URI 開頭為 gs:// 的檔案) 時,系統會自動使用 Cloud Storage 連接器存取 Cloud Storage 中的檔案。
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. 將下方列出的 WordCount.java 程式碼複製到本機電腦。
    1. 使用路徑建立一組目錄: src/main/java/dataproc/codelab
      mkdir -p src/main/java/dataproc/codelab
      
    2. WordCount.java 複製到本機電腦的 src/main/java/dataproc/codelab
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java 是以 Java 執行的 Spark 工作,可從 Cloud Storage 讀取文字檔案、執行字數統計,然後將文字檔案結果寫入 Cloud Storage。

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. 建構套件。
    mvn clean package
    
    如果建構成功,系統會建立 target/word-count-1.0.jar
  4. 將套件暫存至 Cloud Storage。
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt 檔案複製到本機電腦。 下列 build.sbt 檔案會指定 Scala 和 Spark 程式庫依附元件,並提供 provided 範圍,指出 Dataproc 叢集會在執行階段提供這些程式庫。build.sbt 檔案不會指定 Cloud Storage 依附元件,因為連接器會實作標準 HDFS 介面。當 Spark 工作存取 Cloud Storage 叢集檔案 (URI 開頭為 gs:// 的檔案) 時,系統會自動使用 Cloud Storage 連接器存取 Cloud Storage 中的檔案。
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. word-count.scala 複製到本機電腦。 這是以 Java 執行的 Spark 工作,可從 Cloud Storage 讀取文字檔案、執行字數統計,然後將文字檔案結果寫入 Cloud Storage。
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. 建構套件。
    sbt clean package
    
    如果建構成功,系統會建立 target/scala-2.11/word-count_2.11-1.0.jar
  4. 將套件暫存至 Cloud Storage。
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py 複製到本機電腦。 這是使用 PySpark 的 Python Spark 工作,可從 Cloud Storage 讀取文字檔案、執行字數統計,然後將文字檔案結果寫入 Cloud Storage。
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

提交工作

執行下列 gcloud 指令,將字數統計工作提交至 Dataproc 叢集。

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

查看輸出內容

工作完成後,請執行下列 gcloud CLI 指令,查看字數統計輸出內容。

gcloud storage cat gs://${BUCKET_NAME}/output/*

字數統計輸出內容應如下所示:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)