使用 CeleryKubernetesExecutor

Managed Airflow (第 3 代) | Managed Airflow (第 2 代) | Managed Airflow (舊版第 1 代)

本頁說明如何在 Managed Airflow 中啟用 CeleryKubernetesExecutor,以及如何在 DAG 中使用 KubernetesExecutor。

關於 CeleryKubernetesExecutor

CeleryKubernetesExecutor 是一種執行器,可同時使用 CeleryExecutor 和 KubernetesExecutor。Airflow 會根據您為工作定義的佇列選取執行器。在一個 DAG 中,您可以透過 CeleryExecutor 執行部分工作,並透過 KubernetesExecutor 執行其他工作:

  • CeleryExecutor 經過最佳化調整,可快速且大規模地執行工作。
  • KubernetesExecutor 專為執行耗用大量資源的工作而設計,且會獨立執行工作。

Managed Airflow 中的 CeleryKubernetesExecutor

在 Managed Airflow 中,CeleryKubernetesExecutor 可讓您為工作使用 KubernetesExecutor。您無法在 Managed Airflow 中,將 KubernetesExecutor 與 CeleryKubernetesExecutor 分開使用。

在環境的叢集中,Managed Airflow 會在與 Airflow 工作站相同的命名空間中,執行您透過 KubernetesExecutor 執行的工作。這類工作與 Airflow 工作人員具有相同的繫結,且可存取專案中的資源。

使用 KubernetesExecutor 執行的工作會採用Managed Airflow 定價模式,因為這些工作的 Pod 會在環境的叢集中執行。這些 Pod 適用於 Managed Airflow 運算 SKU (適用於 CPU、記憶體和儲存空間)。

建議在下列情況下使用 CeleryExecutor 執行工作:

  • 工作啟動時間非常重要。
  • 工作不需要執行階段隔離,也不會耗用大量資源。

建議在下列情況下使用 KubernetesExecutor 執行工作:

  • 工作需要執行階段隔離。舉例來說,由於工作會在自己的 Pod 中執行,因此不會爭用記憶體和 CPU。
  • 工作會耗用大量資源,您想控管可用的 CPU 和記憶體資源。

KubernetesExecutor 與 KubernetesPodOperator 的比較

使用 KubernetesExecutor 執行工作,與使用 KubernetesPodOperator 執行工作類似。工作會在 Pod 中執行,因此可提供 Pod 層級的工作隔離,並提升資源管理成效。

不過,兩者之間還是有一些主要差異:

  • KubernetesExecutor 只會在環境的已版本化 Managed Airflow 命名空間中執行工作。您無法在 Managed Airflow 中變更這個命名空間。您可以指定 KubernetesPodOperator 執行 Pod 工作的命名空間。
  • KubernetesExecutor 可以使用任何內建的 Airflow 運算子。KubernetesPodOperator 只會執行容器的進入點所定義的指令碼。
  • KubernetesExecutor 會使用預設的 Managed Airflow Docker 映像檔,其中包含與 Managed Airflow 環境中定義的 Python、Airflow 設定選項覆寫、環境變數和 PyPI 套件相同的項目。

關於 Docker 映像檔

根據預設,KubernetesExecutor 會使用與 Managed Airflow 用於 Celery 工作站的 Docker 映像檔,啟動工作。這是環境的Managed Airflow 映像檔,其中包含您為環境指定的所有變更,例如自訂 PyPI 套件或環境變數。

事前準備

  • 您可以在 Managed Airflow (第 3 代) 中使用 CeleryKubernetesExecutor。

  • 在 Managed Airflow (第 3 代) 中,無法使用 CeleryKubernetesExecutor 以外的任何執行器。也就是說,您可以在一個 DAG 中使用 CeleryExecutor、KubernetesExecutor 或兩者,但無法將環境設為只使用 KubernetesExecutor 或 CeleryExecutor。

設定 CeleryKubernetesExecutor

您可能想覆寫與 KubernetesExecutor 相關的現有 Airflow 設定選項:

  • [kubernetes]worker_pods_creation_batch_size

    這個選項會定義每個排程器迴圈的 Kubernetes 工作站 Pod 建立呼叫次數。預設值為 1,因此每個排程器心跳只會啟動一個 Pod。如果大量使用 KubernetesExecutor,建議增加這個值。

  • [kubernetes]worker_pods_pending_timeout

    這個選項會以秒為單位定義工作人員可停留在 Pending 狀態 (正在建立 Pod) 的時間長度,超過這個時間就會視為失敗。預設值為 5 分鐘。

使用 KubernetesExecutor 或 CeleryExecutor 執行工作

您可以在一個 DAG 中使用 CeleryExecutor、KubernetesExecutor 或兩者執行工作:

  • 如要使用 KubernetesExecutor 執行工作,請在工作的 queue 參數中指定 kubernetes 值。
  • 如要使用 CeleryExecutor 執行工作,請省略 queue 參數。

下列範例會使用 KubernetesExecutor 執行 task-kubernetes 工作,並使用 CeleryExecutor 執行 task-celery 工作:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

執行與 KubernetesExecutor 相關的 Airflow CLI 指令

您可以使用 gcloud 執行多個與 KubernetesExecutor 相關的 Airflow CLI 指令

自訂工作站 Pod 規格

您可以透過工作中的 executor_config 參數傳遞工作站 Pod 規格,藉此自訂規格。您可以使用這項功能定義自訂 CPU 和記憶體需求。

您可以覆寫用於執行工作的整個工作站 Pod 規格。如要擷取 KubernetesExecutor 使用的工作的 Pod 規格,可以執行 kubernetes generate-dag-yaml Airflow CLI 指令。

如要進一步瞭解如何自訂工作站 Pod 規格,請參閱 Airflow 說明文件

代管 Airflow (第 3 代) 支援下列資源需求值:

資源 下限 上限 步驟
CPU 0.25 32 間距值:0.25、0.5、1、2、4、6、8、10、...、32。系統會將要求的值無條件進位至最接近的支援步階值 (例如 5 進位至 6)。
記憶體 2G (GB) 128G (GB) 間距值:2、3、4、5、...、128。系統會將要求的值無條件進位至最接近的支援步數值 (例如 3.5G 會進位至 4G)。
儲存空間 - 100G (GB) 任何值。如果要求的容量超過 100 GB,系統只會提供 100 GB。

如要進一步瞭解 Kubernetes 中的資源單位,請參閱「Kubernetes 中的資源單位」。

以下範例顯示使用自訂工作站 Pod 規格的工作:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

查看工作記錄

KubernetesExecutor 執行的工作記錄會顯示在「記錄」分頁中,與 CeleryExecutor 執行的工作記錄一起顯示:

  1. 前往 Google Cloud 控制台的「Environments」(環境) 頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。

  3. 前往「記錄」分頁。

  4. 依序前往「所有記錄」>「Airflow 記錄」 >「工作人員」

  5. 名為 airflow-k8s-worker 的工作站會執行 KubernetesExecutor 工作。如要尋找特定工作的記錄,可以在搜尋時使用 DAG ID 或工作 ID 做為關鍵字。

後續步驟