Prepare the Spark wordcount job
Select a tab, below, to follow the steps to prepare a job package or file to submit to your cluster. You can prepare one of the following job types;
- Spark job in Java using Apache Maven to build a JAR package
- Spark job in Scala using SBT to build a JAR package
- Spark job in Python (PySpark)
Java
- Copy
pom.xml
file to your local machine. The followingpom.xml
file specifies Scala and Spark library dependencies, which are given aprovided
scope to indicate that the Dataproc cluster will provide these libraries at runtime. Thepom.xml
file does not specify a Cloud Storage dependency because the connector implements the standard HDFS interface. When a Spark job accesses Cloud Storage cluster files (files with URIs that start withgs://
), the system automatically uses the Cloud Storage connector to access the files in Cloud Storage<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - Copy the
WordCount.java
code listed, below, to your local machine.- Create a set of directories with the path
src/main/java/dataproc/codelab
:mkdir -p src/main/java/dataproc/codelab
- Copy
WordCount.java
to your local machine intosrc/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
WordCount.java
is a Spark job in Java that reads text files from Cloud Storage, performs a word count, then writes the text file results to Cloud Storage.package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- Create a set of directories with the path
- Build the package.
If the build is successful, amvn clean package
target/word-count-1.0.jar
is created. - Stage the package to Cloud Storage.
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- Copy
build.sbt
file to your local machine. The followingbuild.sbt
file specifies Scala and Spark library dependencies, which are given aprovided
scope to indicate that the Dataproc cluster will provide these libraries at runtime. Thebuild.sbt
file does not specify a Cloud Storage dependency because the connector implements the standard HDFS interface. When a Spark job accesses Cloud Storage cluster files (files with URIs that start withgs://
), the system automatically uses the Cloud Storage connector to access the files in Cloud StoragescalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - Copy
word-count.scala
to your local machine. This is a Spark job in Java that reads text files from Cloud Storage, performs a word count, then writes the text file results to Cloud Storage.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- Build the package.
If the build is successful, asbt clean package
target/scala-2.11/word-count_2.11-1.0.jar
is created. - Stage the package to Cloud Storage.
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- Copy
word-count.py
to your local machine. This is a Spark job in Python using PySpark that reads text files from Cloud Storage, performs a word count, then writes the text file results to Cloud Storage.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
Submit the job
Run the following gcloud
command to submit the wordcount job to your
Dataproc cluster.
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
View the output
After the job finishes, run the following gcloud CLI command to view the wordcount output.
gcloud storage cat gs://${BUCKET_NAME}/output/*
The wordcount output should be similar to the following:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)