Prepara il job Spark di conteggio parole
Seleziona una scheda di seguito per seguire i passaggi per preparare un pacchetto o un file di job da inviare al cluster. Puoi preparare uno dei seguenti tipi di lavoro:
- Job Spark in Java utilizzando Apache Maven per creare un pacchetto JAR
- Job Spark in Scala utilizzando SBT per creare un pacchetto JAR
- Job Spark in Python (PySpark)
Java
- Copia il file
pom.xml
nella macchina locale. Il seguente filepom.xml
specifica le dipendenze delle librerie Scala e Spark, a cui viene assegnato un ambitoprovided
per indicare che il cluster Dataproc fornirà queste librerie in fase di runtime. Il filepom.xml
non specifica una dipendenza da Cloud Storage perché il connettore implementa l'interfaccia HDFS standard. Quando un job Spark accede ai file del cluster Cloud Storage (file con URI che iniziano congs://
), il sistema utilizza automaticamente il connettore Cloud Storage per accedere ai file in Cloud Storage<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - Copia il codice
WordCount.java
elencato di seguito nel computer locale.- Crea un insieme di directory con il percorso
src/main/java/dataproc/codelab
:mkdir -p src/main/java/dataproc/codelab
- Copia
WordCount.java
sulla tua macchina locale insrc/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
WordCount.java
è un job Spark in Java che legge file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- Crea un insieme di directory con il percorso
- Crea il pacchetto.
Se la build ha esito positivo, viene creato unmvn clean package
target/word-count-1.0.jar
. - Trasferisci il pacchetto a Cloud Storage.
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- Copia il file
build.sbt
nella macchina locale. Il seguente filebuild.sbt
specifica le dipendenze delle librerie Scala e Spark, a cui viene assegnato un ambitoprovided
per indicare che il cluster Dataproc fornirà queste librerie in fase di runtime. Il filebuild.sbt
non specifica una dipendenza da Cloud Storage perché il connettore implementa l'interfaccia HDFS standard. Quando un job Spark accede ai file del cluster Cloud Storage (file con URI che iniziano congs://
), il sistema utilizza automaticamente il connettore Cloud Storage per accedere ai file in Cloud StoragescalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - Copia
word-count.scala
sulla tua macchina locale. Si tratta di un job Spark in Java che legge file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- Crea il pacchetto.
Se la build ha esito positivo, viene creato unsbt clean package
target/scala-2.11/word-count_2.11-1.0.jar
. - Trasferisci il pacchetto a Cloud Storage.
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- Copia
word-count.py
sulla tua macchina locale. Si tratta di un job Spark in Python che utilizza PySpark per leggere i file di testo da Cloud Storage, esegue un conteggio delle parole e poi scrive i risultati del file di testo in Cloud Storage.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
Invia il job
Esegui il seguente comando gcloud
per inviare il job di conteggio parole al cluster Dataproc.
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Visualizzare l'output
Al termine del job, esegui questo comando gcloud CLI per visualizzare l'output del conteggio delle parole.
gcloud storage cat gs://${BUCKET_NAME}/output/*
L'output del conteggio delle parole dovrebbe essere simile al seguente:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)