Preparar la tarea de recuento de palabras de Spark
Selecciona una de las pestañas de abajo para seguir los pasos necesarios para preparar un paquete de trabajo o un archivo que quieras enviar a tu clúster. Puedes preparar uno de los siguientes tipos de trabajo:
- Tarea de Spark en Java con Apache Maven para crear un paquete JAR
- Tarea de Spark en Scala con SBT para crear un paquete JAR
- Tarea de Spark en Python (PySpark)
Java
- Copia el archivo
pom.xml
en tu equipo local. El siguiente archivopom.xml
especifica las dependencias de la biblioteca de Scala y Spark, a las que se les asigna un ámbitoprovided
para indicar que el clúster de Dataproc proporcionará estas bibliotecas en el tiempo de ejecución. El archivopom.xml
no especifica una dependencia de Cloud Storage porque el conector implementa la interfaz HDFS estándar. Cuando una tarea de Spark accede a archivos de clúster de Cloud Storage (archivos con URIs que empiezan porgs://
), el sistema utiliza automáticamente el conector de Cloud Storage para acceder a los archivos de Cloud Storage.<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - Copia el código
WordCount.java
que se indica más abajo en tu equipo local.- Crea un conjunto de directorios con la ruta
src/main/java/dataproc/codelab
:mkdir -p src/main/java/dataproc/codelab
- Copia
WordCount.java
en tu máquina local ensrc/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
WordCount.java
es una tarea de Spark en Java que lee archivos de texto de Cloud Storage, realiza un recuento de palabras y, a continuación, escribe los resultados del archivo de texto en Cloud Storage.package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- Crea un conjunto de directorios con la ruta
- Crea el paquete.
Si la compilación se realiza correctamente, se crea unmvn clean package
target/word-count-1.0.jar
. - Transfiere el paquete a Cloud Storage.
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- Copia el archivo
build.sbt
en tu equipo local. El siguiente archivobuild.sbt
especifica las dependencias de la biblioteca de Scala y Spark, a las que se les asigna un ámbitoprovided
para indicar que el clúster de Dataproc proporcionará estas bibliotecas en el tiempo de ejecución. El archivobuild.sbt
no especifica una dependencia de Cloud Storage porque el conector implementa la interfaz HDFS estándar. Cuando un trabajo de Spark accede a archivos de clúster de Cloud Storage (archivos con URIs que empiezan porgs://
), el sistema utiliza automáticamente el conector de Cloud Storage para acceder a los archivos de Cloud Storage.scalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - Copia
word-count.scala
en tu equipo local. Se trata de un trabajo de Spark en Java que lee archivos de texto de Cloud Storage, realiza un recuento de palabras y, a continuación, escribe los resultados del archivo de texto en Cloud Storage.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- Crea el paquete.
Si la compilación se realiza correctamente, se crea unsbt clean package
target/scala-2.11/word-count_2.11-1.0.jar
. - Transfiere el paquete a Cloud Storage.
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- Copia
word-count.py
en tu equipo local. Se trata de un trabajo de Spark en Python que usa PySpark para leer archivos de texto de Cloud Storage, realizar un recuento de palabras y, a continuación, escribir los resultados del archivo de texto en Cloud Storage.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
Enviar la tarea
Ejecuta el siguiente comando gcloud
para enviar la tarea de recuento de palabras a tu clúster de Dataproc.
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Ver el resultado
Una vez que haya finalizado el trabajo, ejecuta el siguiente comando de la CLI de gcloud para ver el resultado del recuento de palabras.
gcloud storage cat gs://${BUCKET_NAME}/output/*
El resultado del recuento de palabras debería ser similar al siguiente:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)