Siapkan tugas jumlah kata Spark
Pilih tab di bawah untuk mengikuti langkah-langkah menyiapkan paket atau file tugas untuk dikirimkan ke cluster Anda. Anda dapat menyiapkan salah satu jenis tugas berikut;
- Tugas Spark di Java menggunakan Apache Maven untuk membuat paket JAR
- Tugas Spark di Scala menggunakan SBT untuk membuat paket JAR
- Tugas Spark di Python (PySpark)
Java
- Salin file
pom.xml
ke komputer lokal Anda. Filepom.xml
berikut menentukan dependensi library Scala dan Spark, yang diberi cakupanprovided
untuk menunjukkan bahwa cluster Dataproc akan menyediakan library ini saat runtime. Filepom.xml
tidak menentukan dependensi Cloud Storage karena konektor menerapkan antarmuka HDFS standar. Saat tugas Spark mengakses file cluster Cloud Storage (file dengan URI yang dimulai dengangs://
), sistem secara otomatis menggunakan konektor Cloud Storage untuk mengakses file di Cloud Storage<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - Salin kode
WordCount.java
yang tercantum di bawah, ke komputer lokal Anda.- Buat serangkaian direktori dengan jalur
src/main/java/dataproc/codelab
:mkdir -p src/main/java/dataproc/codelab
- Salin
WordCount.java
ke komputer lokal Anda ke dalamsrc/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
WordCount.java
adalah tugas Spark di Java yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- Buat serangkaian direktori dengan jalur
- Bangun paket.
Jika build berhasil,mvn clean package
target/word-count-1.0.jar
akan dibuat. - Lakukan staging paket ke Cloud Storage.
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- Salin file
build.sbt
ke komputer lokal Anda. Filebuild.sbt
berikut menentukan dependensi library Scala dan Spark, yang diberi cakupanprovided
untuk menunjukkan bahwa cluster Dataproc akan menyediakan library ini saat runtime. Filebuild.sbt
tidak menentukan dependensi Cloud Storage karena konektor menerapkan antarmuka HDFS standar. Saat tugas Spark mengakses file cluster Cloud Storage (file dengan URI yang diawali dengangs://
), sistem akan otomatis menggunakan konektor Cloud Storage untuk mengakses file di Cloud StoragescalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - Salin
word-count.scala
ke komputer lokal Anda. Ini adalah tugas Spark di Java yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- Bangun paket.
Jika build berhasil,sbt clean package
target/scala-2.11/word-count_2.11-1.0.jar
akan dibuat. - Lakukan staging paket ke Cloud Storage.
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- Salin
word-count.py
ke komputer lokal Anda. Ini adalah tugas Spark di Python menggunakan PySpark yang membaca file teks dari Cloud Storage, melakukan penghitungan kata, lalu menulis hasil file teks ke Cloud Storage.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
Kirim tugas
Jalankan perintah gcloud
berikut untuk mengirimkan tugas wordcount ke cluster Dataproc Anda.
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Melihat output
Setelah tugas selesai, jalankan perintah gcloud CLI berikut untuk melihat output jumlah kata.
gcloud storage cat gs://${BUCKET_NAME}/output/*
Output jumlah kata akan mirip dengan berikut:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)