Spark WordCount 작업 준비
아래의 탭을 선택하여 단계에 따라 클러스터에 제출할 작업 패키지 또는 파일을 준비합니다. 다음 작업 유형 중 하나를 준비할 수 있습니다.
- 자바의 Spark 작업: Apache Maven을 사용하여 JAR 패키지 빌드
- Scala의 Spark 작업: SBT를 사용하여 JAR 패키지 빌드
- Python의 Spark 작업(PySpark)
자바
pom.xml
파일을 로컬 머신에 복사합니다. 다음pom.xml
파일은 Scala 및 Spark 라이브러리 종속 항목을 지정합니다. 이는 Dataproc 클러스터가 런타임 시 이러한 라이브러리를 제공함을 나타내기 위해provided
범위를 제공합니다. 커넥터가 표준 HDFS 인터페이스를 구현하므로pom.xml
파일은 Cloud Storage 종속 항목을 지정하지 않습니다. Spark 작업이 Cloud Storage 클러스터 파일(gs://
로 시작하는 URI가 있는 파일)에 액세스할 때, 시스템이 자동으로 Cloud Storage 커넥터를 사용하여 Cloud Storage의 파일에 액세스합니다.<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project>- 아래 나열된
WordCount.java
코드를 로컬 머신에 복사합니다.src/main/java/dataproc/codelab
경로를 사용하여 디렉터리 집합을 만듭니다.mkdir -p src/main/java/dataproc/codelab
WordCount.java
를 로컬 머신의src/main/java/dataproc/codelab
에 복사합니다.cp WordCount.java src/main/java/dataproc/codelab
WordCount.java
는 Cloud Storage에서 텍스트 파일을 읽고 단어 수를 계산한 후 텍스트 파일 결과를 Cloud Storage에 쓰는 Java의 Spark 작업입니다.package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- 패키지를 빌드합니다.
성공적으로 빌드되면mvn clean package
target/word-count-1.0.jar
이 생성됩니다. - 패키지를 Cloud Storage로 스테이징합니다.
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
build.sbt
파일을 로컬 머신에 복사합니다. 다음build.sbt
파일은 Scala 및 Spark 라이브러리 종속 항목을 지정합니다. 이는 Dataproc 클러스터가 런타임 시 이러한 라이브러리를 제공함을 나타내기 위해provided
범위를 제공합니다. 커넥터가 표준 HDFS 인터페이스를 구현하므로build.sbt
파일은 Cloud Storage 종속 항목을 지정하지 않습니다. Spark 작업이 Cloud Storage 클러스터 파일(gs://
로 시작하는 URI가 있는 파일)에 액세스할 때, 시스템이 자동으로 Cloud Storage 커넥터를 사용하여 Cloud Storage의 파일에 액세스합니다.scalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" )word-count.scala
를 로컬 머신에 복사합니다. 이는 Cloud Storage에서 텍스트 파일을 읽고 단어 수를 계산한 후 텍스트 파일 결과를 Cloud Storage에 쓰는 Java의 Spark 작업입니다.package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- 패키지를 빌드합니다.
성공적으로 빌드되면sbt clean package
target/scala-2.11/word-count_2.11-1.0.jar
이 생성됩니다. - 패키지를 Cloud Storage로 스테이징합니다.
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
word-count.py
를 로컬 머신에 복사합니다. 이는 PySpark를 사용하여 Cloud Storage에서 텍스트 파일을 읽고 단어 수를 계산한 후 텍스트 파일 결과를 Cloud Storage에 쓰는 Python의 Spark 작업입니다.#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
작업 제출
다음 gcloud
명령어를 실행하여 WordCount 작업을 Dataproc 클러스터에 제출합니다.
자바
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
출력 보기
작업이 완료되면 다음 gcloud CLI 명령어를 실행하여 WordCount 출력을 확인합니다.
gcloud storage cat gs://${BUCKET_NAME}/output/*
WordCount 출력은 다음과 비슷하게 표시됩니다.
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)