將環境遷移至 Airflow 3 (並排)

Managed Airflow (第 3 代) | Managed Airflow (第 2 代) | Managed Airflow (舊版第 1 代)

本頁面說明如何將 DAG、資料和設定,從現有的 Managed Airflow (第 3 代) 環境 (使用 Airflow 2) 移轉至 Managed Airflow (第 3 代) 環境 (使用 Airflow 3)。

寄件者 收件者 方法 指南
Managed Airflow (第 3 代)、Airflow 2 Managed Airflow (第 3 代)、Airflow 3 並排顯示,手動轉移 這篇指南
Managed Airflow (第 2 代) Managed Airflow (第 3 代) 並排使用遷移指令碼 指令碼遷移指南
Managed Airflow (第 2 代) Managed Airflow (第 3 代) 並排比較快照 快照遷移指南
Managed Airflow (舊版第 1 代)、Airflow 2 Managed Airflow (第 3 代) 並排比較快照 快照遷移指南
Managed Airflow (舊版第 1 代)、Airflow 2 Managed Airflow (第 2 代) 並排比較快照 快照遷移指南
Managed Airflow (舊版第 1 代)、Airflow 2 Managed Airflow (第 2 代) 並排顯示,手動轉移 手動遷移指南
Managed Airflow (舊版第 1 代)、Airflow 1 Managed Airflow (第 2 代)、Airflow 2 並排比較快照 快照遷移指南
Managed Airflow (舊版第 1 代)、Airflow 1 Managed Airflow (第 2 代)、Airflow 2 並排顯示,手動轉移 手動遷移指南
Managed Airflow (舊版第 1 代)、Airflow 1 Managed Airflow (舊版第 1 代)、Airflow 2 並排顯示,手動轉移 手動遷移指南

Airflow 3 的變更

開始在 Airflow 3 中使用 Managed Airflow 環境前,請先瞭解 Airflow 3 為 Managed Airflow (第 3 代) 環境帶來的變更。

如要瞭解 Airflow 3 社群版導入的變更,請參閱「Apache Airflow 3 正式發布!」一文。

DAG 版本管理

  • 在 Airflow 3 中,DAG 會根據開始時的版本執行至完成,即使在 DAG 執行期間上傳了新版本,也不會受到影響。

    • Airflow UI 中的所有 DAG 執行作業,現在都會與對應的 DAG 版本 (執行時的版本) 建立關聯。包括工作結構和 DAG 程式碼。

改善補充作業

Airflow 3 大幅修正了回填 (重新執行管道以取得歷來資料) 的處理方式。回填作業從手動程序改為完全可觀察的功能,並整合至核心 Airflow 引擎:

  • 現在可直接在 Airflow 排程器中管理回填作業,不必再將其視為獨立的手動程序。進而提升可擴充性,並提供更精確的控制權。
  • 除了 Airflow CLI,現在還可直接透過 Airflow UI 或 API 呼叫,觸發、停止及監控回填進度。
  • Airflow 排程器可提供歷來回填作業的狀態和健康狀態,讓您一目瞭然。
  • 機器學習社群非常需要這項功能 (用於根據舊資料重新訓練模型),但回填改善功能適用於所有 ETL/ELT 工作流程。

提升安全性和可靠性

  • 在 Airflow 3 中,工作只能透過 Task SDK 與中央 API 伺服器通訊 (在 Airflow 2 中,工作可以直接存取資料庫)。API 伺服器會有效率地集區管理這些連線。資料庫可避免連線尖峰,讓整個環境在負載量大的情況下更加穩定。

  • Airflow 3 採用全新工作執行介面,可進一步隔離工作,避免某項工作干擾或存取其他工作資料。

  • Airflow 3 的 CLI 不再直接存取資料庫,新的 airflowctl 指令列介面是專為透過 API 進行遠端存取設計的獨立套件。它不會直接存取資料庫,而是透過 API 與 Airflow 互動,安全性更高。

以事件為核心的排程和資料資產

  • 資料集已演變為資料資產。Airflow 可透過資料資產,更妥善地追蹤及回應 Airflow 外部系統建立或更新的資料。

  • Airflow 3 導入了稱為「監控程式」的新概念。這些元件會監控資料資產的變更,讓 Airflow 在資料抵達時立即觸發工作流程。現在 DAG 可以在訊息進入訊息佇列時立即觸發,不必再輪詢 (每分鐘檢查一次是否有檔案)。

  • Airflow 3 採用以資產為中心的全新語法,並使用 Python 裝飾器,讓開發人員編寫的程式碼更簡潔直覺。

新版 Airflow UI

  • Airflow UI 是使用 React (前端) 和 FastAPI (後端) 從頭重寫。
  • 新的 Airflow UI 會透過標準化 REST API 和專為 UI 作業設計的 API 執行作業。
  • 以 FastAPI 取代 Flask 實作項目後,Airflow UI 的回應速度大幅提升。
  • 我們統一了格線和圖表檢視畫面,讓工作流程更加順暢,方便您在 DAG 高階結構和特定工作記錄之間切換。

Airflow 3 的重大變更

Airflow 3 導入了幾項重大變更,其中有些是破壞性變更:

  • 我們無法保證 Airflow 2 的現有 DAG 能在 Airflow 3 中正常運作。您必須測試這些 DAG,並視需要變更匯入項目、DAG 參數和其他實作詳細資料。
  • 部分 Airflow 2 設定選項在 Airflow 3 中已重新命名或移除。 如要進一步瞭解參數,請參閱「Airflow 設定參考資料」。

  • 無法從工作程式碼直接存取 Airflow 資料庫:

    • 工作程式碼無法再直接匯入及使用 Airflow 資料庫工作階段或模型。
    • PostgresHookPostgresOperator 無法與 airflow_db 連線搭配使用。
  • 部分自訂 PyPI 套件可能與新版 Airflow 及其依附元件不相容。

  • REST API (/api/v1) 已由 /api/v2 取代。

  • TaskGroups、資產和資料感知排程會取代 SubDAG。

  • 服務等級協議已淘汰並移除。並由「截止日期快訊」取代。

  • CLI 指令中的 subdir 引數已移除。

  • 部分 Airflow 環境變數已移除。詳情請參閱 Airflow 說明文件中的「重大變更」。

  • catchup_by_default DAG 參數現在預設為 False

  • create_cron_data_intervals 設定現在預設為 False。 也就是說,系統預設會使用 CronTriggerTimetable,而非 CronDataIntervalTimetable

  • Airflow 3.0.0 的異動清單

  • Airflow 3.1.0 的異動清單

Airflow 3 和 Airflow 2 環境的差異

使用 Airflow 2 的 Managed Airflow 環境與使用 Airflow 3 的環境之間的主要差異如下:

  • Airflow 3 環境中的工作負載設定:

    • 所有 Airflow 元件的記憶體容量下限為 2 GB。
    • 與 Airflow 2 環境相比,環境預設集中的 Airflow 觸發器和工作站設定有所變更。
    • 觸發條件的預設 CPU 數量為 1。
    • 觸發器的預設記憶體容量為 2 GB。
  • 自動計算 [celery]worker_concurrency 設定選項的方式已變更,可因應 Airflow 3 元件的不同記憶體用量。

  • 在 Airflow 3 中,無法直接從工作程式碼存取 Airflow 資料庫

  • Airflow 3 使用 airflowctl 指令列公用程式執行 Airflow CLI 指令。

  • Airflow 3 環境中預先安裝的 PyPI 套件有所不同。如要查看預先安裝的 PyPI 套件清單,請參閱「預先安裝的套件變更記錄」。

並行遷移至 Airflow 3

並行遷移程序包含下列步驟:

  1. 確認與 Airflow 3 的相容性。
  2. 建立 Airflow 3 環境,並轉移設定覆寫和環境變數。
  3. 將 PyPI 套件安裝到 Airflow 3 環境。
  4. 將變數、連線和集區轉移至 Airflow 3。
  5. 從 Airflow 2.* 環境 bucket 轉移其他資料。
  6. 轉移使用者和角色。
  7. 確認 DAG 已準備好使用 Airflow 3。
  8. 將 DAG 轉移至 Airflow 3 環境。
  9. 監控 Airflow 3 環境。

步驟 1:確認與 Airflow 3 的相容性

如要檢查是否與 Airflow 3 相容,請按照下列步驟操作:

步驟 2:建立 Airflow 3 環境,轉移設定覆寫和環境變數

在這個步驟中,您會使用 Airflow 3 建立新的 Managed Airflow (第 3 代) 環境,並開始從 Airflow 2 環境轉移設定參數:

請按照建立 Managed Airflow (第 3 代) 環境的步驟操作,並完成下列事項:

  1. 選取 Airflow 建構版本時,請選取 Airflow 3 的建構版本。
  2. 從 Airflow 2 環境複製所有相容的 Airflow 設定選項覆寫。

  3. 從 Airflow 2 環境複製所有環境變數。

  4. 請繼續使用 Airflow 3 建立環境。

下表列出部分 Airflow 設定選項的變更。請注意,這份清單僅列舉部分內容,如要進一步瞭解 Airflow 設定選項的變更,請參閱 Airflow 說明文件中的「Airflow Configuration Reference」和「Airflow Release Notes」。

Airflow 2 選項 Airflow 3 選項
[scheduler]min_file_process_interval [dag_processor]min_file_process_interval
[webserver]rbac_user_registration_role [api]rbac_user_registration_role
[core]dag_file_processor_timeout [dag_processor]dag_file_processor_timeout
[scheduler]dag_dir_list_interval [dag_processor]refresh_interval
[scheduler]max_threads [dag_processor]parsing_processes
[scheduler]parsing_processes [dag_processor]parsing_processes
[webserver]instance_name [api]instance_name
[scheduler]scheduler_zombie_task_threshold [scheduler]task_instance_heartbeat_timeout
[webserver]rbac 已淘汰
[api]auth_backend=airflow.api.auth.backend.deny_all 已淘汰
[api]auth_backends=airflow.api.auth.backend.deny_all 已淘汰
[api]composer_auth_user_registration_role 已淘汰

步驟 3:將 PyPI 套件安裝到 Airflow 3 環境

建立 Airflow 3 環境後,請安裝 PyPI 套件

  1. 從 Airflow 2 環境複製 PyPI 套件需求條件。
  2. 啟動 PyPI 套件更新作業,並等待環境更新完成。

由於 Airflow 3 環境使用不同的預先安裝套件組合,更新作業期間可能會發生 PyPI 套件衝突。如要進一步瞭解如何排解 PyPI 套件衝突問題,請參閱「與預先安裝的 PyPI 套件發生衝突」。

步驟 4:從 Airflow 2 匯出變數、連線和集區

如果沒有變數或連線,請略過相關的匯出和匯入指令。

如果除了 default_pool 以外,您還有其他自訂集區,才需要轉移集區。否則請略過匯出及匯入集區的指令。

  1. 從 Airflow 2 環境匯出變數:

    gcloud composer environments run AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables -- export /home/airflow/gcs/data/variables.json
    

    更改下列內容:

    • AIRFLOW_2_ENV:Airflow 2 環境的名稱。
    • AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
  2. 從 Airflow 2 環境匯出連線:

    gcloud composer environments run AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections -- export /home/airflow/gcs/data/connections.json
    

    更改下列內容:

    • AIRFLOW_2_ENV:Airflow 2 環境的名稱。
    • AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
  3. 從 Airflow 2 環境匯出集區:

    gcloud composer environments run AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools -- export /home/airflow/gcs/data/pools.json
    

    更改下列內容:

    • AIRFLOW_2_ENV:Airflow 2 環境的名稱。
    • AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
  4. 取得 Airflow 2 環境的 bucket 名稱:

    gcloud composer environments describe AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        --format="value(storageConfig.bucket)"
    

    更改下列內容:

    • AIRFLOW_2_ENV:Airflow 2 環境的名稱。
    • AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
  5. 從 Airflow 2 環境值區的 /data 目錄,將 variables.jsonconnections.jsonpools.json 檔案下載到本機目錄:

    gcloud storage cp gs://AIRFLOW_2_BUCKET/data/variables.json ./variables.json
    gcloud storage cp gs://AIRFLOW_2_BUCKET/data/connections.json ./connections.json
    gcloud storage cp gs://AIRFLOW_2_BUCKET/data/pools.json ./pools.json
    

    更改下列內容:

    • AIRFLOW_2_BUCKET:Airflow 2 環境的 bucket 名稱,在上一個步驟中取得。

步驟 5:將變數、連線和集區匯入 Airflow 3

如果沒有變數或連線,請略過相關的匯出和匯入指令。

如果除了 default_pool 以外,您還有其他自訂集區,才需要轉移集區。否則請略過匯出及匯入集區的指令。

  1. 設定 airflowctl,以便為 Airflow 3 環境執行 Airflow CLI 指令。

  2. 使用 airflowctl 將變數、連線和集區匯入 Airflow 3 環境:

    airflowctl variables import ./variables.json
    airflowctl connections import ./connections.json
    airflowctl pools import ./pools.json
    
  3. 確認變數、連線和集區已匯入 Airflow 3 環境:

    airflowctl variables list
    airflowctl connections list
    airflowctl pools list
    
  4. 清理 JSON 檔案:

    gcloud storage rm gs://AIRFLOW_2_BUCKET/data/variables.json
    gcloud storage rm gs://AIRFLOW_2_BUCKET/data/connections.json
    gcloud storage rm gs://AIRFLOW_2_BUCKET/data/pools.json
    rm ./variables.json
    rm ./connections.json
    rm ./pools.json
    

    更改下列內容:

    • AIRFLOW_2_BUCKET:Airflow 2 環境的 bucket 名稱。

步驟 6:從 Airflow 2 環境的值區轉移其他資料

在這個步驟中,您要從 Airflow 2 環境的 bucket 轉移剩餘資料。

  1. 取得 Airflow 3 環境的 bucket 名稱:

    gcloud composer environments describe AIRFLOW_3_ENV \
        --location AIRFLOW_3_LOCATION \
        --format="value(storageConfig.bucket)"
    

    更改下列內容:

    • AIRFLOW_3_ENV:Airflow 3 環境的名稱。
    • AIRFLOW_3_LOCATION:Airflow 3 環境所在的區域。
  2. 將 Airflow 2 環境 bucket 中的外掛程式匯出至 Airflow 3 環境 bucket 的 /plugins 目錄:

    gcloud composer environments storage plugins export \
      --destination=AIRFLOW_3_BUCKET/plugins \
      --environment=AIRFLOW_2_ENV \
      --location=AIRFLOW_2_LOCATION
    

    更改下列內容:

    • AIRFLOW_3_BUCKET:Airflow 3 環境的 bucket 名稱,在上一個步驟中取得。
    • AIRFLOW_2_ENV:Airflow 2 環境的名稱。
    • AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
  3. 確認已成功匯入 /plugins 目錄:

    gcloud composer environments storage plugins list \
      --environment=AIRFLOW_3_ENV \
      --location=AIRFLOW_3_LOCATION
    

    更改下列內容:

    • AIRFLOW_3_ENV:Airflow 3 環境的名稱。
    • AIRFLOW_3_LOCATION:Airflow 3 環境所在的區域。
  4. 將 Airflow 2 環境中的 /data 目錄匯出至 Airflow 3 環境:

    gcloud composer environments storage data export \
      --destination=AIRFLOW_3_BUCKET/data \
      --environment=AIRFLOW_2_ENV \
      --location=AIRFLOW_2_LOCATION
    

    更改下列內容:

    • AIRFLOW_3_BUCKET:Airflow 3 環境的 bucket 名稱,在上一個步驟中取得。
    • AIRFLOW_2_ENV:Airflow 2 環境的名稱。
    • AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
  5. 確認 /data 資料夾已成功匯入:

    gcloud composer environments storage data list \
      --environment=AIRFLOW_3_ENV \
      --location=AIRFLOW_3_LOCATION
    

步驟 7:轉移使用者和角色

airflowctl 尚未支援 usersroles 指令,因此無法遷移使用者和角色。

步驟 8:確認 DAG 已準備好在 Airflow 3 中執行

  1. 調整 Airflow DAG,使其與 Airflow 3 相容

  2. 檢查直接存取 Airflow 資料庫的自訂撰寫工作:

    在 Airflow 3 中,運算子無法使用資料庫工作階段直接存取 Airflow 中繼資料庫。如果您有自訂運算子,請檢查程式碼,確保沒有直接的資料庫存取呼叫。

    您可以採用下列任一替代做法,在工作遷移時避免直接存取 Airflow 資料庫:

    • 透過將 Airflow 資料庫內容匯出至 Cloud SQL 執行個體,存取 Airflow 資料庫。

    • 使用 Airflow Python 用戶端。社群版 Airflow 提供的 Python 用戶端已為大多數資料表定義 API,例如 DagRunsTaskInstancesVariablesConnectionsXComsapache-airflow-client 套件已預先安裝在 Managed Airflow Airflow 3 建構版本中。

    • 從 DAG 執行 airflowctlBashOperator

    如果查詢匯出的 Airflow 資料庫不適用於您的用途,且 Airflow Python 用戶端和 airflowctl 都未提供所需功能,請考慮在 Airflow 社群版本中要求新的 API 端點或 Task SDK 功能。

  3. 如果您有 KubernetesExecutor 工作,請將 queue="kubernetes" 替換為 executor="KubernetesExecutor",調整運算子定義。

    Airflow 3 中的 KubernetesExecutor 工作範例:

    PythonOperator(
    task_id="airflow3_kubernetes_executor_task",
    dag=dag,
    python_callable=f,
    executor="KubernetesExecutor",
    )
    
  4. 如果您在工作程式碼中使用 AIRFLOW__WEBSERVER__BASE_URL 環境變數,請改用 [api]base_url Airflow 設定選項。

    在 Airflow 3 中取得這個值的範例如下:

    from airflow.configuration import conf
    
    webserver_base_url = conf.get("api", "base_url")
    

步驟 9:將 DAG 轉移至 Airflow 3 環境

在環境之間轉移 DAG 時,可能會發生下列潛在問題:

  • 如果兩個環境都啟用 DAG (未暫停),每個環境都會按照排程執行自己的 DAG 副本。這可能會導致相同資料和執行時間的 DAG 同時執行。

  • 由於 DAG 趕上進度,Airflow 會排定額外的 DAG 執行作業,從 DAG 中指定的開始日期開始。這是因為新的 Airflow 執行個體不會將 Airflow 2 環境的 DAG 執行記錄納入考量。這可能會導致系統從指定開始日期起,排定大量 DAG 執行作業。

防止 DAG 同時執行

在 Airflow 3 環境中,覆寫 dags_are_paused_at_creation Airflow 設定選項。變更後,所有新的 DAG 預設都會暫停。

區段
core dags_are_paused_at_creation True

避免 DAG 執行作業過多或過少

在轉移至 Airflow 3 環境的 DAG 中,指定新的靜態開始日期

為避免邏輯日期出現間隙和重疊,第一次 DAG 執行必須在 Airflow 3 環境中,於下一個排程間隔發生時進行。如要這麼做,請將 DAG 中的新開始日期設為 Airflow 2 環境中上次執行的日期之前。

舉例來說,假設您的 DAG 每天會在 Airflow 2 環境中於 15:00、17:00 和 21:00 執行,而您打算在 15:15 轉移 DAG,且最後一次 DAG 執行時間是 15:00,那麼 Airflow 3 環境的開始日期可以是今天的 14:45。在 Airflow 3 環境中啟用 DAG 後,Airflow 會排定在 17:00 執行 DAG。

再舉一例,假設您的 DAG 每天會在 Airflow 2 環境中於 00:00 執行,最後一次執行 DAG 是在 2026 年 3 月 26 日 00:00,而您打算在 2026 年 3 月 26 日 13:00 轉移 DAG,則 Airflow 3 環境的開始日期可以是 2026 年 3 月 25 日 23:45。在 Airflow 3 環境中啟用 DAG 後,Airflow 會排定在 2026 年 3 月 27 日 00:00 執行 DAG。

將 DAG 逐一轉移至 Airflow 3 環境

請針對每個 DAG 按照下列程序進行轉移:

  1. 請確認 DAG 中的新開始日期已如上一節所述設定。

  2. 將更新後的 DAG 上傳至 Airflow 3 環境。由於設定覆寫,這個 DAG 在 Airflow 3 環境中已暫停,因此目前尚未排定任何 DAG 執行作業。

  3. Airflow 網頁介面中,前往「DAGs」(DAG),檢查是否有回報的 DAG 語法錯誤。

  4. 在您打算轉移 DAG 時:

    1. 在 Airflow 2 環境中暫停 DAG。

    2. 在 Airflow 3 環境中取消暫停 DAG。

    3. 確認新的 DAG 執行作業是否已排定在正確的時間執行。

    4. 等待 Airflow 3 環境執行 DAG,並檢查執行是否成功。

  5. 視 DAG 執行作業是否成功而定:

    • 如果 DAG 執行作業成功,即可繼續操作,並在 Airflow 3 環境中使用 DAG。最後,請考慮刪除 Airflow 2 版本的 DAG。

    • 如果 DAG 執行作業失敗,請嘗試排解 DAG 問題,直到 DAG 在 Airflow 3 中成功執行為止。

      如有需要,您隨時可以改回使用 Airflow 2 版本的 DAG:

      1. 在 Airflow 3 環境中暫停 DAG。

      2. 在 Airflow 3 環境中取消暫停 DAG。系統會排定新的 DAG 執行作業,日期和時間與失敗的 DAG 執行作業相同。

      3. 準備好繼續使用 Airflow 3 版本的 DAG 時,請調整開始日期,將新版 DAG 上傳至 Airflow 3 環境,然後重複上述程序。

步驟 10:監控 Airflow 3 環境

將所有 DAG 和設定轉移至 Airflow 3 環境後,請監控潛在問題、失敗的 DAG 執行作業,以及整體環境健康狀態。如果 Airflow 3 環境順利運作一段時間,即可移除 Airflow 2 環境。

後續步驟