Prepara el trabajo de conteo de palabras de Spark
Selecciona una pestaña, a continuación, para seguir los pasos a fin de preparar un paquete o un archivo de trabajo a fin de enviarlo a tu clúster. Puedes preparar uno de los siguientes tipos de trabajos;
- Trabajo Spark en Java con Apache Maven para compilar un paquete de JAR
- Spark job en Scala con SBT para compilar un paquete de JAR
- Trabajo de Spark en Python (PySpark)
Java
- Copia el archivo
pom.xml
en tu máquina local. En el siguiente archivopom.xml
, se especifican las dependencias de la biblioteca de Scala y Spark, que tienen un permisoprovided
para indicar que el clúster de Dataproc proporcionará estas bibliotecas en el entorno de ejecución. El archivopom.xml
no especifica una dependencia de Cloud Storage porque el conector implementa la interfaz de HDFS estándar. Cuando un trabajo de Spark accede a los archivos del clúster de Cloud Storage (archivos con URI que comienzan congs://
), el sistema usa de forma automática el conector de Cloud Storage para acceder a los archivos en Cloud Storage<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - Copia el código
WordCount.java
que aparece a continuación en tu máquina local.- Crea un conjunto de directorios con la ruta
src/main/java/dataproc/codelab
:mkdir -p src/main/java/dataproc/codelab
- Copia
WordCount.java
en tu máquina local ensrc/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
WordCount.java
es un trabajo de Spark en Java que lee archivos de texto de Cloud Storage, realiza un conteo de palabras y, luego, escribe los resultados del archivo de texto en Cloud Storage.package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- Crea un conjunto de directorios con la ruta
- Compila el paquete.
Si la compilación se realiza de forma correcta, se crea unmvn clean package
target/word-count-1.0.jar
. - Almacena el paquete en etapa intermedia en Cloud Storage.
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- Copia el archivo
build.sbt
en tu máquina local. En el siguiente archivobuild.sbt
, se especifican las dependencias de la biblioteca de Scala y Spark, que tienen un permisoprovided
para indicar que el clúster de Dataproc proporcionará estas bibliotecas en el entorno de ejecución. El archivobuild.sbt
no especifica una dependencia de Cloud Storage porque el conector implementa la interfaz de HDFS estándar. Cuando un trabajo de Spark accede a los archivos del clúster de Cloud Storage (archivos con URI que comienzan congs://
), el sistema usa de forma automática el conector de Cloud Storage para acceder a los archivos en Cloud StoragescalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - Copia
word-count.scala
a tu máquina local. Este es un trabajo de Spark en Java que lee archivos de texto de Cloud Storage, realiza un conteo de palabras y, luego, escribe los resultados del archivo de texto en Cloud Storage.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- Compila el paquete.
Si la compilación se realiza de forma correcta, se crea unsbt clean package
target/scala-2.11/word-count_2.11-1.0.jar
. - Almacena el paquete en etapa intermedia en Cloud Storage.
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- Copia
word-count.py
a tu máquina local. Este es un trabajo de Spark en Python con PySpark que lee archivos de texto de Cloud Storage, realiza un conteo de palabras y, luego, escribe los resultados del archivo de texto en Cloud Storage.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
Envía el trabajo
Ejecuta el siguiente comando de gcloud
para enviar el trabajo de conteo de palabras a tu clúster de Dataproc.
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Revise el resultado.
Una vez que finalice el trabajo, ejecuta el siguiente comando de gcloud CLI para ver el resultado del recuento de palabras.
gcloud storage cat gs://${BUCKET_NAME}/output/*
El resultado del conteo de palabras debe ser similar al siguiente:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)