Apache Spark로 Cloud Storage 커넥터 사용

이 튜토리얼은 Apache Spark에서 Cloud Storage 커넥터를 사용하는 예시 코드를 실행하는 방법을 보여줍니다.

Spark WordCount 작업 준비

아래의 탭을 선택하여 단계에 따라 클러스터에 제출할 작업 패키지 또는 파일을 준비합니다. 다음 작업 유형 중 하나를 준비할 수 있습니다.

자바

  1. pom.xml 파일을 로컬 머신에 복사합니다. 다음 pom.xml 파일은 Scala 및 Spark 라이브러리 종속 항목을 지정합니다. 이는 Dataproc 클러스터가 런타임 시 이러한 라이브러리를 제공함을 나타내기 위해 provided 범위를 제공합니다. 커넥터가 표준 HDFS 인터페이스를 구현하므로 pom.xml 파일은 Cloud Storage 종속 항목을 지정하지 않습니다. Spark 작업이 Cloud Storage 클러스터 파일(gs://로 시작하는 URI가 있는 파일)에 액세스할 때, 시스템이 자동으로 Cloud Storage 커넥터를 사용하여 Cloud Storage의 파일에 액세스합니다.
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. 아래 나열된 WordCount.java 코드를 로컬 머신에 복사합니다.
    1. src/main/java/dataproc/codelab 경로를 사용하여 디렉터리 집합을 만듭니다.
      mkdir -p src/main/java/dataproc/codelab
      
    2. WordCount.java를 로컬 머신의 src/main/java/dataproc/codelab에 복사합니다.
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java는 Cloud Storage에서 텍스트 파일을 읽고 단어 수를 계산한 후 텍스트 파일 결과를 Cloud Storage에 쓰는 Java의 Spark 작업입니다.

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. 패키지를 빌드합니다.
    mvn clean package
    
    성공적으로 빌드되면 target/word-count-1.0.jar이 생성됩니다.
  4. 패키지를 Cloud Storage로 스테이징합니다.
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt 파일을 로컬 머신에 복사합니다. 다음 build.sbt 파일은 Scala 및 Spark 라이브러리 종속 항목을 지정합니다. 이는 Dataproc 클러스터가 런타임 시 이러한 라이브러리를 제공함을 나타내기 위해 provided 범위를 제공합니다. 커넥터가 표준 HDFS 인터페이스를 구현하므로 build.sbt 파일은 Cloud Storage 종속 항목을 지정하지 않습니다. Spark 작업이 Cloud Storage 클러스터 파일(gs://로 시작하는 URI가 있는 파일)에 액세스할 때, 시스템이 자동으로 Cloud Storage 커넥터를 사용하여 Cloud Storage의 파일에 액세스합니다.
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. word-count.scala를 로컬 머신에 복사합니다. 이는 Cloud Storage에서 텍스트 파일을 읽고 단어 수를 계산한 후 텍스트 파일 결과를 Cloud Storage에 쓰는 Java의 Spark 작업입니다.
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. 패키지를 빌드합니다.
    sbt clean package
    
    성공적으로 빌드되면 target/scala-2.11/word-count_2.11-1.0.jar이 생성됩니다.
  4. 패키지를 Cloud Storage로 스테이징합니다.
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py를 로컬 머신에 복사합니다. 이는 PySpark를 사용하여 Cloud Storage에서 텍스트 파일을 읽고 단어 수를 계산한 후 텍스트 파일 결과를 Cloud Storage에 쓰는 Python의 Spark 작업입니다.
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

작업 제출

다음 gcloud 명령어를 실행하여 WordCount 작업을 Dataproc 클러스터에 제출합니다.

자바

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

출력 보기

작업이 완료되면 다음 gcloud CLI 명령어를 실행하여 WordCount 출력을 확인합니다.

gcloud storage cat gs://${BUCKET_NAME}/output/*

WordCount 출력은 다음과 비슷하게 표시됩니다.

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)