Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)
本页面介绍如何使用 Managed Airflow(第 2 代)在 Managed Service for Apache Spark 工作负载上运行 Google Cloud。
以下部分中的示例展示了如何使用 运算符来管理 Managed Service for Apache Spark 批量工作负载。您可以在 DAG 中使用这些运算符来创建、删除、列出和获取 Managed Service for Apache Spark 批量工作负载:
为使用 Managed Service for Apache Spark 批量工作负载的 运算符 创建 DAG:
创建使用 自定义容器和 Dataproc Metastore的 DAG。
为这些 DAG 配置 Persistent History Server。
准备工作
启用 Dataproc API:
控制台
启用 Managed Service for Apache Spark API。
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (
roles/serviceusage.serviceUsageAdmin),该角色包含serviceusage.services.enable权限。了解如何授予角色。gcloud
启用 Managed Service for Apache Spark API:
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (
roles/serviceusage.serviceUsageAdmin),该角色包含serviceusage.services.enable权限。了解如何授予 角色。gcloud services enable dataproc.googleapis.com
为批量工作负载文件选择位置。您可以使用以下任一选项:
- 创建一个 Cloud Storage 存储桶来 存储此文件。
- 使用环境的存储桶。由于您无需将此文件与 Airflow 同步,因此可以在
/dags或/data文件夹之外创建一个单独的子文件夹。例如,/batches。 - 使用现有存储桶。
设置文件和 Airflow 变量
本部分演示了如何设置文件并为此教程配置 Airflow 变量。
将 Managed Service for Apache Spark ML 工作负载文件上传到存储桶
本教程中的工作负载运行 pyspark 脚本:
将任何 pyspark 脚本保存到名为
spark-job.py的本地文件中。 例如,您可以使用 示例 pyspark 脚本。
设置 Airflow 变量
以下部分中的示例使用 Airflow 变量。您可以在 Airflow 中为这些变量设置值,然后 DAG 代码就可以访问这些值。
本教程中的示例使用以下 Airflow 变量。您可以根据所使用的示例,按需设置这些变量。
设置以下 Airflow 变量,以便在 DAG 代码中使用:
project_id:项目 ID。bucket_name:工作负载的主 Python 文件 (spark-job.py) 所在的存储桶的 URI。您在 准备工作中选择了此位置。phs_cluster:Persistent History Server 集群名称。您在设置此变量 时创建 Persistent History Server。image_name:自定义容器映像的名称和标记 (image:tag)。当您 将自定义容器映像与 DataprocCreateBatchOperator 搭配使用时,可以 设置此变量。metastore_cluster:Dataproc Metastore 服务名称。 当您将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 搭配使用时,可以设置此变量。region_name:Dataproc Metastore 服务所在的区域。当您将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 搭配使用时,可以设置此变量。
使用 Google Cloud 控制台和 Airflow 界面设置每个 Airflow 变量
在 Google Cloud 控制台中,前往环境 页面。
在环境列表中,点击环境对应的 Airflow 链接。Airflow 界面随即打开。
在 Airflow 界面中,依次选择 Admin > Variables。
点击添加新记录 。
在 Key 字段中指定变量的名称,并在 Val 字段中为其设置值。
点击保存 。
创建 Persistent History Server
使用 Persistent History Server (PHS) 查看批量工作负载的 Spark 历史记录文件:
- 创建 Persistent History Server。
- 确保您在
phs_clusterAirflow 变量中指定了 PHS 集群的名称。
DataprocCreateBatchOperator
以下 DAG 会启动 Managed Service for Apache Spark 批量工作负载。
如需详细了解 DataprocCreateBatchOperator 实参,请参阅
运算符的源代码。
如需详细了解您可以在 batch 形参中传递的属性,请参阅 Batch 类的说明。DataprocCreateBatchOperator
将自定义容器映像与 DataprocCreateBatchOperator 搭配使用
以下示例展示了如何使用自定义容器映像来运行工作负载。例如,您可以使用自定义容器来添加默认容器映像未提供的 Python 依赖项。
如需使用自定义容器映像,请执行以下操作:
在
image_nameAirflow 变量中指定映像。将 DataprocCreateBatchOperator 与自定义映像搭配使用:
将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 搭配使用
如需从 DAG 中使用 Dataproc Metastore 服务 ,请执行以下操作:
检查 Metastore 服务是否已启动。
如需了解如何启动 Metastore 服务,请参阅 启用和停用 Dataproc Metastore。
如需详细了解用于创建 配置的 Batch 运算符,请参阅 PeripheralsConfig。
Metastore 服务启动并运行后,在
metastore_cluster变量中指定其名称,并在region_nameAirflow 变量中指定其区域。在 DataprocCreateBatchOperator 中使用 Metastore 服务:
DataprocDeleteBatchOperator
您可以使用 DataprocDeleteBatchOperator 根据工作负载的批次 ID 删除批次。
DataprocListBatchesOperator
DataprocDeleteBatchOperator 会列出给定 project_id 和区域中存在的批次。
DataprocGetBatchOperator
DataprocGetBatchOperator 会提取一个特定的批量工作负载。