Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このページでは、Cloud Composer 2 を使用して Serverless for Apache Spark ワークロードを Google Cloudで実行する方法について説明します。
次のセクションの例では、Serverless for Apache Spark のバッチワークロードを管理する 演算子を使用する方法を示しています。これらの演算子は、Serverless for Apache Spark バッチ ワークロードの作成、削除、一覧表示、取得を行う DAG で使用します。
Serverless for Apache Spark バッチ ワークロードと連携する 演算子 用の DAG を作成します。
カスタム コンテナと Dataproc Metastore を使用する DAG を作成します。
これらの DAG の永続履歴サーバーを構成します。
始める前に
Dataproc API を有効にします。
コンソール
Dataproc API を有効にします。
API を有効にするために必要なロール
API を有効にするには、
serviceusage.services.enable権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。詳しくは、ロールを付与する方法をご覧ください。gcloud
Dataproc API を有効にします。
API を有効にするために必要なロール
API を有効にするには、 権限を含む Service Usage 管理者 IAM ロール(
roles/serviceusage.serviceUsageAdmin)が必要です。serviceusage.services.enable詳しくは、ロールを付与する方法をご覧ください。gcloud services enable dataproc.googleapis.com
バッチ ワークロード ファイルの場所を選択します。次のいずれかのオプションを使用できます。
- このファイルを格納する Cloud Storage バケットを作成する。
- 環境のバケットを使用する。このファイルを Airflow と同期する必要がないため、
/dagsフォルダまたは/dataフォルダの外に別のサブフォルダを作成できます。例:/batches - 既存のバケットを使用します。
ファイルと Airflow 変数を設定する
このセクションでは、このチュートリアルで使用するファイルを設定し、Airflow 変数を構成する方法について説明します。
Serverless for Apache Spark ML ワークロード ファイルをバケットにアップロードする
このチュートリアルのワークロードは、pyspark スクリプトを実行します。
任意の pyspark スクリプトを
spark-job.pyという名前のローカル ファイルに保存します。 たとえば、サンプルの pyspark スクリプトを使用できます。ファイルをアップロードしますを、選択した場所に 始める前にで。
Airflow 変数を設定する
次のセクションの例では、Airflow 変数を使用しています。これらの変数の値を Airflow で設定すると、DAG コードでこれらの値にアクセスできます。
このチュートリアルの例では、次の Airflow 変数を使用します。使用する例によっては、必要に応じて設定できます。
DAG コードで使用する次の Airflow 変数を設定します。
project_id: プロジェクト ID.bucket_name: ワークロードのメインの Python ファイル(spark-job.py)が配置されているバケットの URI。このロケーションは、始める前にで選択しました。phs_cluster: 永続的履歴サーバーのクラスタ名。この変数は、永続履歴サーバーを作成するときに設定します。image_name: カスタム コンテナ イメージの名前とタグ(image:tag)。この変数は、DataprocCreateBatchOperator でカスタム コンテナ イメージを使用するときに設定します。metastore_cluster: Dataproc Metastore サービス名。 この変数は、DataprocCreateBatchOperator で Dataproc Metastore サービスを使用するときに設定します。region_name: Dataproc Metastore サービスが配置されているリージョン。この変数は、DataprocCreateBatchOperator で Dataproc Metastore サービスを使用するときに設定します。
コンソールと Airflow UI を使用して各 Airflow 変数を設定する Google Cloud
コンソールで、[**環境**] ページに移動します。 Google Cloud
環境のリストで、使用中の環境の [Airflow] リンクをクリックします。Airflow UI が開きます。
Airflow UI で、[管理者] [>] [変数] を選択します。
[新しいレコードの追加] をクリックします。
[キー] フィールドに変数の名前を指定し、[値] フィールドにその変数の値を設定します。
[保存] をクリックします。
永続履歴サーバーを作成する
永続的履歴サーバー(PHS)を使用して、バッチ ワークロードの Spark 履歴ファイルを表示します。
- 永続履歴サーバーを作成します。
phs_clusterAirflow 変数で PHS クラスタの名前が指定されていることを確認します。
DataprocCreateBatchOperator
次の DAG は、Serverless for Apache Spark バッチ ワークロードを開始します。
DataprocCreateBatchOperator 引数の詳細については、
演算子のソースコードをご覧ください。
DataprocCreateBatchOperator の batch
パラメータに渡すことができる属性の詳細については、
Batch クラスの説明をご覧ください。
DataprocCreateBatchOperator でカスタム コンテナ イメージを使用する
次の例は、カスタム コンテナ イメージを使用してワークロードを実行する方法を示しています。カスタム コンテナを使用すると、デフォルトのコンテナ イメージで提供されていない Python 依存関係を追加できます。
カスタム コンテナ イメージを使用するには:
image_nameAirflow 変数でイメージを指定します。カスタム イメージで DataprocCreateBatchOperator を使用します。
DataprocCreateBatchOperator で Dataproc Metastore サービスを使用する
DAG から Dataproc Metastore サービス を使用するには:
Metastore サービスがすでに開始されていることを確認します。
Metastore サービスの起動については、 Dataproc Metastore の有効化と無効化をご覧ください。
構成を作成するための Batch 演算子の詳細については、 PeripheralsConfigをご覧ください。
メタストア サービスが稼働したら、 その名前を
metastore_cluster変数で指定し、そのリージョンをregion_nameAirflow 変数で指定します。DataprocCreateBatchOperator で Metastore サービスを使用する。
DataprocDeleteBatchOperator
DataprocDeleteBatchOperator を使用して、ワークロードのバッチ ID に基づいてバッチを削除できます。
DataprocListBatchesOperator
DataprocDeleteBatchOperator は、指定された project_id とリージョン内に存在するバッチを一覧表示します。
DataprocGetBatchOperator
DataprocGetBatchOperator は、特定のバッチ ワークロードを取得します。