Cloud Composer を使用して Apache Spark 向け Serverless ワークロードを実行する

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

このページでは、Cloud Composer 2 を使用して Serverless for Apache Spark ワークロードを Google Cloudで実行する方法について説明します。

次のセクションの例では、Serverless for Apache Spark のバッチワークロードを管理する 演算子を使用する方法を示しています。これらの演算子は、Serverless for Apache Spark バッチ ワークロードの作成、削除、一覧表示、取得を行う DAG で使用します。

始める前に

  1. Dataproc API を有効にします。

    コンソール

    Dataproc API を有効にします。

    API を有効にするために必要なロール

    API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。詳しくは、ロールを付与する方法をご覧ください。

    API の有効化

    gcloud

    Dataproc API を有効にします。

    API を有効にするために必要なロール

    API を有効にするには、 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。serviceusage.services.enable詳しくは、ロールを付与する方法をご覧ください。

    gcloud services enable dataproc.googleapis.com

  2. バッチ ワークロード ファイルの場所を選択します。次のいずれかのオプションを使用できます。

    • このファイルを格納する Cloud Storage バケットを作成する。
    • 環境のバケットを使用する。このファイルを Airflow と同期する必要がないため、/dags フォルダまたは /data フォルダの外に別のサブフォルダを作成できます。例: /batches
    • 既存のバケットを使用します。

ファイルと Airflow 変数を設定する

このセクションでは、このチュートリアルで使用するファイルを設定し、Airflow 変数を構成する方法について説明します。

Serverless for Apache Spark ML ワークロード ファイルをバケットにアップロードする

このチュートリアルのワークロードは、pyspark スクリプトを実行します。

  1. 任意の pyspark スクリプトを spark-job.py という名前のローカル ファイルに保存します。 たとえば、サンプルの pyspark スクリプトを使用できます。

  2. ファイルをアップロードしますを、選択した場所に 始める前にで。

Airflow 変数を設定する

次のセクションの例では、Airflow 変数を使用しています。これらの変数の値を Airflow で設定すると、DAG コードでこれらの値にアクセスできます。

このチュートリアルの例では、次の Airflow 変数を使用します。使用する例によっては、必要に応じて設定できます。

DAG コードで使用する次の Airflow 変数を設定します。

コンソールと Airflow UI を使用して各 Airflow 変数を設定する Google Cloud

  1. コンソールで、[**環境**] ページに移動します。 Google Cloud

    [環境] に移動

  2. 環境のリストで、使用中の環境の [Airflow] リンクをクリックします。Airflow UI が開きます。

  3. Airflow UI で、[管理者] [>] [変数] を選択します。

  4. [新しいレコードの追加] をクリックします。

  5. [キー] フィールドに変数の名前を指定し、[] フィールドにその変数の値を設定します。

  6. [保存] をクリックします。

永続履歴サーバーを作成する

永続的履歴サーバー(PHS)を使用して、バッチ ワークロードの Spark 履歴ファイルを表示します。

  1. 永続履歴サーバーを作成します
  2. phs_cluster Airflow 変数で PHS クラスタの名前が指定されていることを確認します。

DataprocCreateBatchOperator

次の DAG は、Serverless for Apache Spark バッチ ワークロードを開始します。

DataprocCreateBatchOperator 引数の詳細については、 演算子のソースコードをご覧ください

DataprocCreateBatchOperatorbatch パラメータに渡すことができる属性の詳細については、 Batch クラスの説明をご覧ください


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

DataprocCreateBatchOperator でカスタム コンテナ イメージを使用する

次の例は、カスタム コンテナ イメージを使用してワークロードを実行する方法を示しています。カスタム コンテナを使用すると、デフォルトのコンテナ イメージで提供されていない Python 依存関係を追加できます。

カスタム コンテナ イメージを使用するには:

  1. カスタム コンテナ イメージを作成して Container Registry にアップロードします。

  2. image_name Airflow 変数でイメージを指定します。

  3. カスタム イメージで DataprocCreateBatchOperator を使用します。

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

DataprocCreateBatchOperator で Dataproc Metastore サービスを使用する

DAG から Dataproc Metastore サービス を使用するには:

  1. Metastore サービスがすでに開始されていることを確認します。

    Metastore サービスの起動については、 Dataproc Metastore の有効化と無効化をご覧ください。

    構成を作成するための Batch 演算子の詳細については、 PeripheralsConfigをご覧ください。

  2. メタストア サービスが稼働したら、 その名前を metastore_cluster 変数で指定し、そのリージョンを region_name Airflow 変数で指定します。

  3. DataprocCreateBatchOperator で Metastore サービスを使用する。

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

DataprocDeleteBatchOperator を使用して、ワークロードのバッチ ID に基づいてバッチを削除できます。

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator は、指定された project_id とリージョン内に存在するバッチを一覧表示します。

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator は、特定のバッチ ワークロードを取得します。

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

次のステップ