Lightning Engine は Cloud Storage への接続を強化し、ネイティブ エンジンの パフォーマンスを最適化します。改良された Cloud Storage コネクタにより、メタデータ オペレーションを最小限に抑えることで費用の削減が実現し、最適化されたファイル出力コミッターにより、Spark ワークロードのパフォーマンスと信頼性が高まります。この限定公開プレビュー機能への早期アクセスをリクエストするには、 早期アクセス フォームに記入してください。
目標
Java、Scala、または Python で簡単な wordcount Spark ジョブを作成し、Managed Service for Apache Spark クラスタでジョブを実行します。費用
このドキュメントでは、課金対象である次の コンポーネントを使用します Google Cloud:
- Compute Engine
- Managed Service for Apache Spark
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
以下の手順を実行して、このチュートリアルでコードを実行する準備をします。
プロジェクトを設定します。必要に応じて、Managed Service for Apache Spark、Compute Engine、Cloud Storage API を有効にし、ローカルマシンに Google Cloud CLI をインストールしてプロジェクトを設定します。
- アカウントにログインします。 Google Cloud を初めて使用する場合は、 アカウントを作成して、実際のシナリオで Google プロダクトのパフォーマンスを評価してください。 Google Cloud新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
Create a service account:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles. -
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
Ensure that you have the Create Service Accounts IAM role
(
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALSto the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. -
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します:
gcloud init -
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
Create a service account:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin). Learn how to grant roles. -
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
Ensure that you have the Create Service Accounts IAM role
(
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALSto the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. -
Google Cloud CLI をインストールします。
-
外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。
-
gcloud CLI を初期化するには、次のコマンドを実行します:
gcloud init
Cloud Storage バケットを作成します。Cloud Storage はチュートリアル データを保持するために必要です。使用できるバケットがない場合は、プロジェクトに新しいバケットを作成します。
- コンソールで Cloud Storage の Google Cloud [**バケット**] ページに移動します。
- [ [Create]] をクリックします。
- [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
- [スタートガイド] セクションで、次の操作を行います。
-
[データの保存場所の選択] セクションで、次の操作を行います。
- ロケーション タイプを選択してください。
- [Location type] プルダウン メニューから、バケットのデータが永続的に保存されるロケーションを選択します。
- ロケーション タイプとして [デュアルリージョン] を選択した場合は、関連するチェックボックスを使用して [ターボ レプリケーション] を有効にすることもできます。
- クロスバケット レプリケーションを設定するには、
[Storage Transfer Service 経由でクロスバケット レプリケーションを追加する] を選択し、
次の手順を実施します:
クロスバケット レプリケーションを設定する
- [バケット] メニューで、バケットを選択します。
[レプリケーション設定] セクションで、[構成] をクリックして、レプリケーション ジョブの設定を構成します。
[**クロスバケット レプリケーションを構成する**] ペインが表示されます。
- オブジェクト名の接頭辞で複製するオブジェクトをフィルタするには、 オブジェクトを追加または除外する接頭辞を入力し、 [接頭辞を追加] をクリックします。
- 複製されたオブジェクトのストレージ クラスを設定するには、 [Storage class] メニューからストレージ クラスを選択します。 この手順をスキップすると、複製されたオブジェクトはデフォルトで宛先バケットのストレージ クラスを使用します。
- [完了] をクリックします。
-
[データの保存場所の選択] セクションで、次の操作を行います。
- バケットのデフォルトのストレージ クラスを選択するか、バケットデータのストレージ クラスを自動的に管理するAutoclassを選択します。
- 階層名前空間を有効にするには、 [データ量が多いワークロード向けにストレージを最適化] セクションで、 [このバケットで階層的な名前空間を有効にする] を選択します。
- In the [オブジェクトへのアクセスを制御する方法を選択する] セクションで、バケットに 公開アクセスの防止 を適用するかどうかを選択し、バケットのオブジェクトに使用する アクセス制御方法 を選択します。
-
[オブジェクト データを保護する方法を選択する] セクションで、次の操作を行います。
- [**データ保護**] で、バケットに設定するオプションを選択します。
- 削除(復元可能)を有効にするには、 [削除(復元可能)ポリシー(データ復旧用)] チェックボックスをオンにして、 削除後にオブジェクトを保持する日数を指定します。
- オブジェクトのバージョニングを設定するには、 [オブジェクトのバージョニング(バージョン管理用)] チェックボックスをオンにして、 オブジェクトごとの最大バージョン数と、非現行バージョンが期限切れになるまでの日数を指定します。
- オブジェクトとバケットの保持ポリシーを有効にするには、[保持(コンプライアンス用)] チェックボックスをオンにして、次の操作を行います。
- [オブジェクト保持ロック]を有効にするには、 [オブジェクト保持を有効にする]チェックボックスをオンにします。
- [Bucket Lock] を有効にするには、[バケット保持ポリシーを設定する] チェックボックスをオンにして、保持期間の単位と保持期間を選択します。
- オブジェクト データの暗号化方法を選択するには、 [データ暗号化] セクション()を開き、 [データの暗号化] 方法を選択します。
- [**データ保護**] で、バケットに設定するオプションを選択します。
- [作成] をクリックします。
ローカル環境変数を設定します。ローカルマシンで環境変数を設定します。プロジェクト ID と、このチュートリアルで使用する Cloud Storage バケットの名前を設定します。 Google Cloud 既存または新規の Managed Service for Apache Spark クラスタの 名前とリージョン も指定します。 次の手順で、このチュートリアルで使用するクラスタを作成できます。
PROJECT=project-id
BUCKET_NAME=bucket-name
CLUSTER=cluster-name
REGION=cluster-region Example: "us-central1"
Managed Service for Apache Spark クラスタを作成します。以下のコマンドを実行して、指定された Compute Engine ゾーンに 単一ノード の Managed Service for Apache Spark クラスタを作成します。
gcloud dataproc clusters create ${CLUSTER} \ --project=${PROJECT} \ --region=${REGION} \ --single-node一般公開データを Cloud Storage バケットにコピーします。一般公開データであるシェイクスピアのテキスト スニペットを Cloud Storage バケットの
inputフォルダにコピーします。gcloud storage cp gs://pub/shakespeare/rose.txt \ gs://${BUCKET_NAME}/input/rose.txtJava(Apache Maven)、Scala(SBT)、または Python の開発環境を設定します。
Spark wordcount ジョブを準備する
以下のタブを選択し、手順に沿ってクラスタに送信するジョブ パッケージまたはファイルを準備します。次のいずれかのジョブタイプを準備できます。
- Java の Spark ジョブ。JAR パッケージを構築する際に Apache Maven を使用
- Scala の Spark ジョブ。JAR パッケージを構築する際に SBT を使用
- Python の Spark ジョブ(PySpark)
Java
pom.xmlファイルをローカルマシンにコピーします。 次のpom.xmlファイルで、Scala ライブラリと Spark ライブラリ の依存関係を指定します。この依存関係には、実行時に Managed Service for Apache Spark クラスタがこれらの ライブラリを提供することを示すprovidedスコープが指定されます。pom.xmlファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs://で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11</artifactId> <version>Spark version, for example,2.3.1</version> <scope>provided</scope> </dependency> </dependencies> </project>- 以下の
WordCount.javaコードをローカルマシンにコピーします。- パス
src/main/java/dataproc/codelabを指定して一連のディレクトリを作成します。mkdir -p src/main/java/dataproc/codelab
WordCount.javaをローカルマシンのsrc/main/java/dataproc/codelabにコピーします。cp WordCount.java src/main/java/dataproc/codelab
WordCount.javaは Java の Spark ジョブです。Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- パス
- パッケージをビルドします。
ビルドが成功すると、mvn clean package
target/word-count-1.0.jarが作成されます。 - パッケージを Cloud Storage にステージングします。
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
build.sbtファイルをローカルマシンにコピーします。 次のbuild.sbtファイルで、Scala ライブラリと Spark ライブラリの依存関係を指定します。この依存関係には、実行時に Managed Service for Apache Spark クラスタがこれらのライブラリを提供することを示すprovidedスコープが指定されます。build.sbtファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs://で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。scalaVersion := "Scala version, for example,
2.11.8" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1" % "provided" )- ローカルマシンに
word-count.scalaをコピーします。これは Java の Spark ジョブです。Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- パッケージをビルドします。
ビルドが成功すると、sbt clean package
target/scala-2.11/word-count_2.11-1.0.jarが作成されます。 - パッケージを Cloud Storage にステージングします。
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- ローカルマシンに
word-count.pyをコピーします。これは PySpark を使用した Python の Spark ジョブです。Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
ジョブを送信する
次の gcloud コマンドを実行して、wordcount ジョブを Managed Service for Apache Spark クラスタに送信します。
Java
gcloud dataproc jobs submit spark \
--cluster=${CLUSTER} \
--class=dataproc.codelab.WordCount \
--jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
--region=${REGION} \
-- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \
--cluster=${CLUSTER} \
--class=dataproc.codelab.WordCount \
--jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
--region=${REGION} \
-- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \
--cluster=${CLUSTER} \
--region=${REGION} \
-- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
出力を表示する
ジョブが終了したら、次の gcloud CLI コマンドを実行して wordcount の出力を表示します。
gcloud storage cat gs://${BUCKET_NAME}/output/*
wordcount の出力は、次のようになります。
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)
クリーンアップ
チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。
プロジェクトの削除
課金されないようにする最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
プロジェクトを削除するには:
- コンソールで [**リソースの管理**] ページに移動します。 Google Cloud
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、 [Shut down] をクリックしてプロジェクトを削除します。
Managed Service for Apache Spark クラスタを削除する
プロジェクトを削除する代わりに、プロジェクト内のクラスタのみを削除することもできます。
Cloud Storage バケットを削除する
Google Cloud コンソール
- コンソールで Cloud Storage の [**バケット**] ページに移動します。 Google Cloud
- 削除するバケットのチェックボックスをクリックします。
- バケットを削除するには、 [Delete] をクリックして、 指示に沿って操作します。
コマンドライン
-
バケットを削除します。
gcloud storage buckets delete BUCKET_NAME
次のステップ
- Spark ジョブ調整のヒントを見る。