Menggunakan konektor Cloud Storage dengan Apache Spark

Tutorial ini menunjukkan cara menjalankan contoh kode yang menggunakan konektor Cloud Storage dengan Apache Spark.

Siapkan tugas jumlah kata Spark

Pilih tab di bawah untuk mengikuti langkah-langkah menyiapkan paket atau file tugas untuk dikirimkan ke cluster Anda. Anda dapat menyiapkan salah satu jenis tugas berikut;

Java

  1. Salin file pom.xml ke komputer lokal Anda. File pom.xml berikut menentukan dependensi library Scala dan Spark, yang diberi cakupan provided untuk menunjukkan bahwa cluster Dataproc akan menyediakan library ini saat runtime. File pom.xml tidak menentukan dependensi Cloud Storage karena konektor menerapkan antarmuka HDFS standar. Saat tugas Spark mengakses file cluster Cloud Storage (file dengan URI yang dimulai dengan gs://), sistem secara otomatis menggunakan konektor Cloud Storage untuk mengakses file di Cloud Storage
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. Salin kode WordCount.java yang tercantum di bawah, ke komputer lokal Anda.
    1. Buat serangkaian direktori dengan jalur src/main/java/dataproc/codelab:
      mkdir -p src/main/java/dataproc/codelab
      
    2. Salin WordCount.java ke komputer lokal Anda ke dalam src/main/java/dataproc/codelab:
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java adalah tugas Spark di Java yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. Bangun paket.
    mvn clean package
    
    Jika build berhasil, target/word-count-1.0.jar akan dibuat.
  4. Lakukan staging paket ke Cloud Storage.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. Salin file build.sbt ke komputer lokal Anda. File build.sbt berikut menentukan dependensi library Scala dan Spark, yang diberi cakupan provided untuk menunjukkan bahwa cluster Dataproc akan menyediakan library ini saat runtime. File build.sbt tidak menentukan dependensi Cloud Storage karena konektor menerapkan antarmuka HDFS standar. Saat tugas Spark mengakses file cluster Cloud Storage (file dengan URI yang diawali dengan gs://), sistem akan otomatis menggunakan konektor Cloud Storage untuk mengakses file di Cloud Storage
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. Salin word-count.scala ke komputer lokal Anda. Ini adalah tugas Spark di Java yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. Bangun paket.
    sbt clean package
    
    Jika build berhasil, target/scala-2.11/word-count_2.11-1.0.jar akan dibuat.
  4. Lakukan staging paket ke Cloud Storage.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. Salin word-count.py ke komputer lokal Anda. Ini adalah tugas Spark di Python menggunakan PySpark yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

Kirim tugas

Jalankan perintah gcloud berikut untuk mengirimkan tugas wordcount ke cluster Dataproc Anda.

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Melihat output

Setelah tugas selesai, jalankan perintah gcloud CLI berikut untuk melihat output jumlah kata.

gcloud storage cat gs://${BUCKET_NAME}/output/*

Output jumlah kata akan mirip dengan berikut:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)