Prepare a tarefa de contagem de palavras do Spark
Selecione um separador abaixo para seguir os passos de preparação de um pacote de tarefas ou um ficheiro a enviar para o cluster. Pode preparar um dos seguintes tipos de tarefas:
- Tarefa do Spark em Java usando o Apache Maven para criar um pacote JAR
- Tarefa do Spark em Scala com o SBT para criar um pacote JAR
- Tarefa do Spark em Python (PySpark)
Java
- Copie o ficheiro
pom.xml
para a sua máquina local. O ficheiropom.xml
seguinte especifica as dependências da biblioteca Scala e Spark, às quais é atribuído um âmbitoprovided
para indicar que o cluster do Dataproc vai fornecer estas bibliotecas no tempo de execução. O ficheiropom.xml
não especifica uma dependência do Cloud Storage porque o conetor implementa a interface HDFS padrão. Quando uma tarefa do Spark acede a ficheiros do cluster do Cloud Storage (ficheiros com URIs que começam porgs://
), o sistema usa automaticamente o conetor do Cloud Storage para aceder aos ficheiros no Cloud Storage .major.minor
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - Copie o código
WordCount.java
apresentado abaixo para o seu computador local.- Crie um conjunto de diretórios com o caminho
src/main/java/dataproc/codelab
:mkdir -p src/main/java/dataproc/codelab
- Copie
WordCount.java
para a sua máquina local parasrc/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
WordCount.java
é uma tarefa do Spark em Java que lê ficheiros de texto do Cloud Storage, faz uma contagem de palavras e, em seguida, escreve os resultados do ficheiro de texto no Cloud Storage.package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- Crie um conjunto de diretórios com o caminho
- Crie o pacote.
Se a compilação for bem-sucedida, é criado ummvn clean package
target/word-count-1.0.jar
. - Prepare o pacote para o Cloud Storage.
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- Copie o ficheiro
build.sbt
para a sua máquina local. O ficheirobuild.sbt
seguinte especifica as dependências da biblioteca Scala e Spark, às quais é atribuído um âmbitoprovided
para indicar que o cluster do Dataproc vai fornecer estas bibliotecas no tempo de execução. O ficheirobuild.sbt
não especifica uma dependência do Cloud Storage porque o conetor implementa a interface HDFS padrão. Quando uma tarefa do Spark acede a ficheiros do cluster do Cloud Storage (ficheiros com URIs que começam porgs://
), o sistema usa automaticamente o conetor do Cloud Storage para aceder aos ficheiros no Cloud StoragescalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - Copie
word-count.scala
para a sua máquina local. Este é um trabalho do Spark em Java que lê ficheiros de texto do Cloud Storage, faz uma contagem de palavras e, em seguida, escreve os resultados do ficheiro de texto no Cloud Storage.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- Crie o pacote.
Se a compilação for bem-sucedida, é criado umsbt clean package
target/scala-2.11/word-count_2.11-1.0.jar
. - Prepare o pacote para o Cloud Storage.
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- Copie
word-count.py
para a sua máquina local. Este é um trabalho do Spark em Python que usa o PySpark para ler ficheiros de texto do Cloud Storage, faz uma contagem de palavras e, em seguida, escreve os resultados do ficheiro de texto no Cloud Storage.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
Envie o trabalho
Execute o seguinte comando gcloud
para enviar a tarefa wordcount para o seu cluster do Dataproc.
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Veja a saída
Após a conclusão da tarefa, execute o seguinte comando da CLI gcloud para ver o resultado da contagem de palavras.
gcloud storage cat gs://${BUCKET_NAME}/output/*
O resultado da contagem de palavras deve ser semelhante ao seguinte:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)