使用 Managed Airflow 运行 Managed Service for Apache Spark 工作负载

Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)

本页面介绍如何使用 Managed Airflow(第 2 代)在 Managed Service for Apache Spark 工作负载上运行 Google Cloud。

以下部分中的示例展示了如何使用 运算符来管理 Managed Service for Apache Spark 批量工作负载。您可以在 DAG 中使用这些运算符来创建、删除、列出和获取 Managed Service for Apache Spark 批量工作负载:

准备工作

  1. 启用 Dataproc API:

    控制台

    启用 Managed Service for Apache Spark API。

    启用 API 所需的角色

    如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予角色

    启用 API

    gcloud

    启用 Managed Service for Apache Spark API:

    启用 API 所需的角色

    如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予 角色

    gcloud services enable dataproc.googleapis.com

  2. 为批量工作负载文件选择位置。您可以使用以下任一选项:

    • 创建一个 Cloud Storage 存储桶来 存储此文件。
    • 使用环境的存储桶。由于您无需将此文件与 Airflow 同步,因此可以在 /dags/data 文件夹之外创建一个单独的子文件夹。例如,/batches
    • 使用现有存储桶。

设置文件和 Airflow 变量

本部分演示了如何设置文件并为此教程配置 Airflow 变量。

将 Managed Service for Apache Spark ML 工作负载文件上传到存储桶

本教程中的工作负载运行 pyspark 脚本:

  1. 将任何 pyspark 脚本保存到名为 spark-job.py 的本地文件中。 例如,您可以使用 示例 pyspark 脚本

  2. 将文件上传到您在 准备工作中选择的位置。

设置 Airflow 变量

以下部分中的示例使用 Airflow 变量。您可以在 Airflow 中为这些变量设置值,然后 DAG 代码就可以访问这些值。

本教程中的示例使用以下 Airflow 变量。您可以根据所使用的示例,按需设置这些变量。

设置以下 Airflow 变量,以便在 DAG 代码中使用:

使用 Google Cloud 控制台和 Airflow 界面设置每个 Airflow 变量

  1. 在 Google Cloud 控制台中,前往环境 页面。

    前往“环境”

  2. 在环境列表中,点击环境对应的 Airflow 链接。Airflow 界面随即打开。

  3. 在 Airflow 界面中,依次选择 Admin > Variables

  4. 点击添加新记录

  5. Key 字段中指定变量的名称,并在 Val 字段中为其设置值。

  6. 点击保存

创建 Persistent History Server

使用 Persistent History Server (PHS) 查看批量工作负载的 Spark 历史记录文件:

  1. 创建 Persistent History Server
  2. 确保您在 phs_cluster Airflow 变量中指定了 PHS 集群的名称。

DataprocCreateBatchOperator

以下 DAG 会启动 Managed Service for Apache Spark 批量工作负载。

如需详细了解 DataprocCreateBatchOperator 实参,请参阅 运算符的源代码

如需详细了解您可以在 batch 形参中传递的属性,请参阅 Batch 类的说明DataprocCreateBatchOperator


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

将自定义容器映像与 DataprocCreateBatchOperator 搭配使用

以下示例展示了如何使用自定义容器映像来运行工作负载。例如,您可以使用自定义容器来添加默认容器映像未提供的 Python 依赖项。

如需使用自定义容器映像,请执行以下操作:

  1. 创建自定义容器映像并将其上传到 Container Registry

  2. image_name Airflow 变量中指定映像。

  3. 将 DataprocCreateBatchOperator 与自定义映像搭配使用:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 搭配使用

如需从 DAG 中使用 Dataproc Metastore 服务 ,请执行以下操作:

  1. 检查 Metastore 服务是否已启动。

    如需了解如何启动 Metastore 服务,请参阅 启用和停用 Dataproc Metastore

    如需详细了解用于创建 配置的 Batch 运算符,请参阅 PeripheralsConfig

  2. Metastore 服务启动并运行后,在 metastore_cluster变量中指定其名称,并在region_name Airflow 变量中指定其区域。

  3. 在 DataprocCreateBatchOperator 中使用 Metastore 服务:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

您可以使用 DataprocDeleteBatchOperator 根据工作负载的批次 ID 删除批次。

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator 会列出给定 project_id 和区域中存在的批次。

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator 会提取一个特定的批量工作负载。

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

后续步骤