在 Airflow DAG 中使用可延期运算符

Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)

本页面介绍如何在您的 环境中启用对可延期运算符的支持,以及如何在您的 DAG 中使用可延期 Google Cloud 运算符。

关于 Managed Airflow 中的可延期运算符

如果您至少有一个触发器实例(或在 高度弹性环境中至少有两个),则可以在 DAG 中使用 可延期运算符和触发器

对于可延期运算符,Airflow 会将任务执行拆分为以下阶段:

  1. 启动操作。在此阶段,任务会占用一个 Airflow 工作器槽。任务会执行一项操作,将作业委托给其他服务。

    例如,运行 BigQuery 作业可能需要几秒到几小时不等的时间。创建作业后,操作会将工作标识符(BigQuery 作业 ID)传递给 Airflow 触发器。

  2. 触发器会监控作业,直到作业完成。在此阶段,工作器槽不会被占用。Airflow 触发器具有异步架构,能够处理数百个此类作业。当触发器检测到作业已完成时,它会发送一个事件来触发最后一个阶段。

  3. 在最后一个阶段,Airflow 工作器会执行回调。例如,此回调可以将任务标记为成功,或执行另一项操作并设置作业以再次由触发器监控。

触发器是无状态的,因此能够应对中断或重启。因此,长时间运行的作业能够应对 pod 重启,除非重启发生在最后一个阶段(预计该阶段很短)。

准备工作

  • 在 Managed Airflow(第 2 代)中,可延期运算符和传感器需要满足以下条件:

    • Managed Service for Apache Airflow 版本 2.0.31 及更高版本
    • Airflow 2.2.5、2.3.3 及更高版本

启用对可延期运算符的支持

一个名为“Airflow 触发器”的环境组件会异步监控环境中的所有延期任务。 此类任务的延期操作完成后,触发器会将任务传递给 Airflow 工作器。

您的环境中至少需要一个触发器实例(或在高度弹性环境中至少需要两个),才能在 DAG 中使用可延期模式。 您可以在创建环境时配置触发器 ,也可以 调整现有环境的触发器数量和性能参数

Google Cloud 支持可延期模式的运算符

只有部分 Airflow 运算符经过扩展,支持可延期模型。 以下列表是 apache-airflow-providers-google 软件包中支持可延期模式的运算符的参考。 包含所需最低 apache-airflow-providers-google 软件包版本的列表示该运算符支持可延期模式的最早软件包版本。

BigQuery 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

BigQuery Data Transfer Service 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

批量运算符

运算符名称 所需的 apache-airflow-providers-google 版本
CloudBatchSubmitJobOperator 10.7.0

Cloud Build 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
CloudBuildCreateBuildOperator 8.7.0

Managed Service for Apache Airflow 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0
CloudComposerRunAirflowCLICommandOperator 11.0.0

Cloud Run 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
CloudRunExecuteJobOperator 10.7.0

Cloud SQL 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
CloudSQLExportInstanceOperator 10.3.0

Storage Transfer Service 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
CloudDataTransferServiceS3ToGCSOperator 14.0.0
CloudDataTransferServiceGCSToGCSOperator 14.0.0

Dataflow 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0
DataflowStartYamlJobOperator 11.0.0

Cloud Data Fusion 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
CloudDataFusionStartPipelineOperator 8.9.0

Knowledge Catalog 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
DataplexRunDataQualityScanOperator 10.8.0
DataplexGetDataQualityScanResultOperator 10.8.0
DataplexRunDataProfileScanOperator 11.0.0

Google Kubernetes Engine 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0
GKEStartPodOperator 12.0.0
GKEStartJobOperator 11.0.0

Pub/Sub 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
PubSubPullOperator 14.0.0

AI Platform 运算符

运算符名称 所需的 apache-airflow-providers-google 版本
MLEngineStartTrainingJobOperator 8.9.0

在 DAG 中使用可延期运算符

所有 Google Cloud 运算符的常见惯例是使用 可延期模式和 deferrable 布尔值参数。如果运算符没有此参数,则无法在可延期模式下运行。 Google Cloud其他运算符可能有不同的惯例。例如,某些社区运算符有一个单独的类,其名称中带有 Async 后缀。

以下示例 DAG 在可延期模式下使用 DataprocSubmitJobOperator 运算符:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

查看触发器日志

触发器会生成日志,这些日志与其他环境组件的日志一起提供。如需详细了解如何查看环境 日志,请参阅查看日志

监控触发器

如需详细了解如何监控触发器组件,请参阅 Airflow 指标

除了监控触发器之外,您还可以在环境的 Monitoring 信息中心内查看未完成的任务 指标,以检查延期任务的数量。

后续步骤