Cloud Storage コネクタと Apache Spark の併用

このチュートリアルでは、 Cloud Storage コネクタApache Sparkを併用するサンプルコードの実行方法について説明します。

Lightning Engine は Cloud Storage への接続を強化し、ネイティブ エンジンの パフォーマンスを最適化します。改良された Cloud Storage コネクタにより、メタデータ オペレーションを最小限に抑えることで費用の削減が実現し、最適化されたファイル出力コミッターにより、Spark ワークロードのパフォーマンスと信頼性が高まります。この限定公開プレビュー機能への早期アクセスをリクエストするには、 早期アクセス フォームに記入してください。

目標

Java、Scala、または Python で簡単な wordcount Spark ジョブを作成し、Managed Service for Apache Spark クラスタでジョブを実行します。

費用

このドキュメントでは、課金対象である次の コンポーネントを使用します Google Cloud:

  • Compute Engine
  • Managed Service for Apache Spark
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

新規の Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

以下の手順を実行して、このチュートリアルでコードを実行する準備をします。

  1. プロジェクトを設定します。必要に応じて、Managed Service for Apache Spark、Compute Engine、Cloud Storage API を有効にし、ローカルマシンに Google Cloud CLI をインストールしてプロジェクトを設定します。

    1. アカウントにログインします。 Google Cloud を初めて使用する場合は、 アカウントを作成して、実際のシナリオで Google プロダクトのパフォーマンスを評価してください。 Google Cloud新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    3. Verify that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    5. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    6. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    8. Google Cloud CLI をインストールします。

    9. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

    10. gcloud CLI を初期化するには、次のコマンドを実行します:

      gcloud init
    11. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    12. Verify that billing is enabled for your Google Cloud project.

    13. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    14. Create a service account:

      1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles.
      2. In the Google Cloud console, go to the Create service account page.

        Go to Create service account
      3. Select your project.
      4. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

        In the Service account description field, enter a description. For example, Service account for quickstart.

      5. Click Create and continue.
      6. Grant the Project > Owner role to the service account.

        To grant the role, find the Select a role list, then select Project > Owner.

      7. Click Continue.
      8. Click Done to finish creating the service account.

        Do not close your browser window. You will use it in the next step.

    15. Create a service account key:

      1. In the Google Cloud console, click the email address for the service account that you created.
      2. Click Keys.
      3. Click Add key, and then click Create new key.
      4. Click Create. A JSON key file is downloaded to your computer.
      5. Click Close.
    16. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

    17. Google Cloud CLI をインストールします。

    18. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

    19. gcloud CLI を初期化するには、次のコマンドを実行します:

      gcloud init

  2. Cloud Storage バケットを作成します。Cloud Storage はチュートリアル データを保持するために必要です。使用できるバケットがない場合は、プロジェクトに新しいバケットを作成します。

    1. コンソールで Cloud Storage の Google Cloud [**バケット**] ページに移動します。

      [バケット] に移動

    2. [ [Create]] をクリックします。
    3. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
      1. [スタートガイド] セクションで、次の操作を行います。
      2. [データの保存場所の選択] セクションで、次の操作を行います。
        1. ロケーション タイプを選択してください。
        2. [Location type] プルダウン メニューから、バケットのデータが永続的に保存されるロケーションを選択します。
        3. クロスバケット レプリケーションを設定するには、 [Storage Transfer Service 経由でクロスバケット レプリケーションを追加する] を選択し、 次の手順を実施します:

          クロスバケット レプリケーションを設定する

          1. [バケット] メニューで、バケットを選択します。
          2. [レプリケーション設定] セクションで、[構成] をクリックして、レプリケーション ジョブの設定を構成します。

            [**クロスバケット レプリケーションを構成する**] ペインが表示されます。

            • オブジェクト名の接頭辞で複製するオブジェクトをフィルタするには、 オブジェクトを追加または除外する接頭辞を入力し、 [接頭辞を追加] をクリックします。
            • 複製されたオブジェクトのストレージ クラスを設定するには、 [Storage class] メニューからストレージ クラスを選択します。 この手順をスキップすると、複製されたオブジェクトはデフォルトで宛先バケットのストレージ クラスを使用します。
            • [完了] をクリックします。
      3. [データの保存場所の選択] セクションで、次の操作を行います。
        1. バケットのデフォルトのストレージ クラスを選択するか、バケットデータのストレージ クラスを自動的に管理するAutoclassを選択します。
        2. 階層名前空間を有効にするには、 [データ量が多いワークロード向けにストレージを最適化] セクションで、 [このバケットで階層的な名前空間を有効にする] を選択します。
      4. In the [オブジェクトへのアクセスを制御する方法を選択する] セクションで、バケットに 公開アクセスの防止 を適用するかどうかを選択し、バケットのオブジェクトに使用する アクセス制御方法 を選択します。
      5. [オブジェクト データを保護する方法を選択する] セクションで、次の操作を行います。
        • [**データ保護**] で、バケットに設定するオプションを選択します。
          • 削除(復元可能)を有効にするには、 [削除(復元可能)ポリシー(データ復旧用)] チェックボックスをオンにして、 削除後にオブジェクトを保持する日数を指定します。
          • オブジェクトのバージョニングを設定するには、 [オブジェクトのバージョニング(バージョン管理用)] チェックボックスをオンにして、 オブジェクトごとの最大バージョン数と、非現行バージョンが期限切れになるまでの日数を指定します。
          • オブジェクトとバケットの保持ポリシーを有効にするには、[保持(コンプライアンス用)] チェックボックスをオンにして、次の操作を行います。
            • [オブジェクト保持ロック]を有効にするには、 [オブジェクト保持を有効にする]チェックボックスをオンにします。
            • [Bucket Lock] を有効にするには、[バケット保持ポリシーを設定する] チェックボックスをオンにして、保持期間の単位と保持期間を選択します。
        • オブジェクト データの暗号化方法を選択するには、 [データ暗号化] セクション()を開き、 [データの暗号化] 方法を選択します
    4. [作成] をクリックします。

  3. ローカル環境変数を設定します。ローカルマシンで環境変数を設定します。プロジェクト ID と、このチュートリアルで使用する Cloud Storage バケットの名前を設定します。 Google Cloud 既存または新規の Managed Service for Apache Spark クラスタの 名前とリージョン も指定します。 次の手順で、このチュートリアルで使用するクラスタを作成できます。

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Managed Service for Apache Spark クラスタを作成します。以下のコマンドを実行して、指定された Compute Engine ゾーン単一ノード の Managed Service for Apache Spark クラスタを作成します。

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. 一般公開データを Cloud Storage バケットにコピーします。一般公開データであるシェイクスピアのテキスト スニペットを Cloud Storage バケットの input フォルダにコピーします。

    gcloud storage cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Java(Apache Maven)Scala(SBT)、または Python の開発環境を設定します。

Spark wordcount ジョブを準備する

以下のタブを選択し、手順に沿ってクラスタに送信するジョブ パッケージまたはファイルを準備します。次のいずれかのジョブタイプを準備できます。

Java

  1. pom.xml ファイルをローカルマシンにコピーします。 次の pom.xml ファイルで、Scala ライブラリと Spark ライブラリ の依存関係を指定します。この依存関係には、実行時に Managed Service for Apache Spark クラスタがこれらの ライブラリを提供することを示す provided スコープが指定されます。pom.xml ファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs:// で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
  2. 以下の WordCount.java コードをローカルマシンにコピーします。
    1. パス src/main/java/dataproc/codelab を指定して一連のディレクトリを作成します。
      mkdir -p src/main/java/dataproc/codelab
      
    2. WordCount.java をローカルマシンの src/main/java/dataproc/codelab にコピーします。
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java は Java の Spark ジョブです。Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
  3. パッケージをビルドします。
    mvn clean package
    
    ビルドが成功すると、target/word-count-1.0.jar が作成されます。
  4. パッケージを Cloud Storage にステージングします。
    gcloud storage cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt ファイルをローカルマシンにコピーします。 次の build.sbt ファイルで、Scala ライブラリと Spark ライブラリの依存関係を指定します。この依存関係には、実行時に Managed Service for Apache Spark クラスタがこれらのライブラリを提供することを示す provided スコープが指定されます。build.sbt ファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs:// で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
  2. ローカルマシンに word-count.scala をコピーします。これは Java の Spark ジョブです。Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
  3. パッケージをビルドします。
    sbt clean package
    
    ビルドが成功すると、target/scala-2.11/word-count_2.11-1.0.jar が作成されます。
  4. パッケージを Cloud Storage にステージングします。
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. ローカルマシンに word-count.py をコピーします。これは PySpark を使用した Python の Spark ジョブです。Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])

ジョブを送信する

次の gcloud コマンドを実行して、wordcount ジョブを Managed Service for Apache Spark クラスタに送信します。

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

出力を表示する

ジョブが終了したら、次の gcloud CLI コマンドを実行して wordcount の出力を表示します。

gcloud storage cat gs://${BUCKET_NAME}/output/*

wordcount の出力は、次のようになります。

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

クリーンアップ

チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金されないようにする最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. コンソールで [**リソースの管理**] ページに移動します。 Google Cloud

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、 [Shut down] をクリックしてプロジェクトを削除します。

Managed Service for Apache Spark クラスタを削除する

プロジェクトを削除する代わりに、プロジェクト内のクラスタのみを削除することもできます。

Cloud Storage バケットを削除する

Google Cloud コンソール

  1. コンソールで Cloud Storage の [**バケット**] ページに移動します。 Google Cloud

    [バケット] に移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. バケットを削除するには、 [Delete] をクリックして、 指示に沿って操作します。

コマンドライン

    バケットを削除します。
    gcloud storage buckets delete BUCKET_NAME

次のステップ