Managed Airflow (第 3 代) | Managed Airflow (第 2 代) | Managed Airflow (舊版第 1 代)
本頁面說明如何將 DAG、資料和設定,從現有的 Managed Airflow (第 3 代) 環境 (使用 Airflow 2) 移轉至 Managed Airflow (第 3 代) 環境 (使用 Airflow 3)。
其他遷移指南
| 寄件者 | 收件者 | 方法 | 指南 |
|---|---|---|---|
| Managed Airflow (第 3 代)、Airflow 2 | Managed Airflow (第 3 代)、Airflow 3 | 並排顯示,手動轉移 | 這篇指南 |
| Managed Airflow (第 2 代) | Managed Airflow (第 3 代) | 並排使用遷移指令碼 | 指令碼遷移指南 |
| Managed Airflow (第 2 代) | Managed Airflow (第 3 代) | 並排比較快照 | 快照遷移指南 |
| Managed Airflow (舊版第 1 代)、Airflow 2 | Managed Airflow (第 3 代) | 並排比較快照 | 快照遷移指南 |
| Managed Airflow (舊版第 1 代)、Airflow 2 | Managed Airflow (第 2 代) | 並排比較快照 | 快照遷移指南 |
| Managed Airflow (舊版第 1 代)、Airflow 2 | Managed Airflow (第 2 代) | 並排顯示,手動轉移 | 手動遷移指南 |
| Managed Airflow (舊版第 1 代)、Airflow 1 | Managed Airflow (第 2 代)、Airflow 2 | 並排比較快照 | 快照遷移指南 |
| Managed Airflow (舊版第 1 代)、Airflow 1 | Managed Airflow (第 2 代)、Airflow 2 | 並排顯示,手動轉移 | 手動遷移指南 |
| Managed Airflow (舊版第 1 代)、Airflow 1 | Managed Airflow (舊版第 1 代)、Airflow 2 | 並排顯示,手動轉移 | 手動遷移指南 |
Airflow 3 的變更
開始在 Airflow 3 中使用 Managed Airflow 環境前,請先瞭解 Airflow 3 為 Managed Airflow (第 3 代) 環境帶來的變更。
如要瞭解 Airflow 3 社群版導入的變更,請參閱「Apache Airflow 3 正式發布!」一文。
DAG 版本管理
在 Airflow 3 中,DAG 會根據開始時的版本執行至完成,即使在 DAG 執行期間上傳了新版本,也不會受到影響。
- Airflow UI 中的所有 DAG 執行作業,現在都會與對應的 DAG 版本 (執行時的版本) 建立關聯。包括工作結構和 DAG 程式碼。
改善補充作業
Airflow 3 大幅修正了回填 (重新執行管道以取得歷來資料) 的處理方式。回填作業從手動程序改為完全可觀察的功能,並整合至核心 Airflow 引擎:
- 現在可直接在 Airflow 排程器中管理回填作業,不必再將其視為獨立的手動程序。進而提升可擴充性,並提供更精確的控制權。
- 除了 Airflow CLI,現在還可直接透過 Airflow UI 或 API 呼叫,觸發、停止及監控回填進度。
- Airflow 排程器可提供歷來回填作業的狀態和健康狀態,讓您一目瞭然。
- 機器學習社群非常需要這項功能 (用於根據舊資料重新訓練模型),但回填改善功能適用於所有 ETL/ELT 工作流程。
提升安全性和可靠性
在 Airflow 3 中,工作只能透過 Task SDK 與中央 API 伺服器通訊 (在 Airflow 2 中,工作可以直接存取資料庫)。API 伺服器會有效率地集區管理這些連線。資料庫可避免連線尖峰,讓整個環境在負載量大的情況下更加穩定。
Airflow 3 採用全新工作執行介面,可進一步隔離工作,避免某項工作干擾或存取其他工作資料。
Airflow 3 的 CLI 不再直接存取資料庫,新的
airflowctl指令列介面是專為透過 API 進行遠端存取設計的獨立套件。它不會直接存取資料庫,而是透過 API 與 Airflow 互動,安全性更高。
以事件為核心的排程和資料資產
Airflow 3 導入了稱為「監控程式」的新概念。這些元件會監控資料資產的變更,讓 Airflow 在資料抵達時立即觸發工作流程。現在 DAG 可以在訊息進入訊息佇列時立即觸發,不必再輪詢 (每分鐘檢查一次是否有檔案)。
Airflow 3 採用以資產為中心的全新語法,並使用 Python 裝飾器,讓開發人員編寫的程式碼更簡潔直覺。
新版 Airflow UI
- Airflow UI 是使用 React (前端) 和 FastAPI (後端) 從頭重寫。
- 新的 Airflow UI 會透過標準化 REST API 和專為 UI 作業設計的 API 執行作業。
- 以 FastAPI 取代 Flask 實作項目後,Airflow UI 的回應速度大幅提升。
- 我們統一了格線和圖表檢視畫面,讓工作流程更加順暢,方便您在 DAG 高階結構和特定工作記錄之間切換。
Airflow 3 的重大變更
Airflow 3 導入了幾項重大變更,其中有些是破壞性變更:
- 我們無法保證 Airflow 2 的現有 DAG 能在 Airflow 3 中正常運作。您必須測試這些 DAG,並視需要變更匯入項目、DAG 參數和其他實作詳細資料。
部分 Airflow 2 設定選項在 Airflow 3 中已重新命名或移除。 如要進一步瞭解參數,請參閱「Airflow 設定參考資料」。
無法從工作程式碼直接存取 Airflow 資料庫:
- 工作程式碼無法再直接匯入及使用 Airflow 資料庫工作階段或模型。
PostgresHook和PostgresOperator無法與airflow_db連線搭配使用。
部分自訂 PyPI 套件可能與新版 Airflow 及其依附元件不相容。
REST API (
/api/v1) 已由/api/v2取代。TaskGroups、資產和資料感知排程會取代 SubDAG。
CLI 指令中的 subdir 引數已移除。
部分 Airflow 環境變數已移除。詳情請參閱 Airflow 說明文件中的「重大變更」。
catchup_by_defaultDAG 參數現在預設為False。create_cron_data_intervals設定現在預設為False。 也就是說,系統預設會使用CronTriggerTimetable,而非CronDataIntervalTimetable。
Airflow 3 和 Airflow 2 環境的差異
使用 Airflow 2 的 Managed Airflow 環境與使用 Airflow 3 的環境之間的主要差異如下:
Airflow 3 環境中的工作負載設定:
自動計算
[celery]worker_concurrency設定選項的方式已變更,可因應 Airflow 3 元件的不同記憶體用量。在 Airflow 3 中,無法直接從工作程式碼存取 Airflow 資料庫。
Airflow 3 使用
airflowctl指令列公用程式執行 Airflow CLI 指令。Airflow 3 環境中預先安裝的 PyPI 套件有所不同。如要查看預先安裝的 PyPI 套件清單,請參閱「預先安裝的套件變更記錄」。
並行遷移至 Airflow 3
並行遷移程序包含下列步驟:
- 確認與 Airflow 3 的相容性。
- 建立 Airflow 3 環境,並轉移設定覆寫和環境變數。
- 將 PyPI 套件安裝到 Airflow 3 環境。
- 將變數、連線和集區轉移至 Airflow 3。
- 從 Airflow 2.* 環境 bucket 轉移其他資料。
- 轉移使用者和角色。
- 確認 DAG 已準備好使用 Airflow 3。
- 將 DAG 轉移至 Airflow 3 環境。
- 監控 Airflow 3 環境。
步驟 1:確認與 Airflow 3 的相容性
如要檢查是否與 Airflow 3 相容,請按照下列步驟操作:
- 確認環境使用的是 Airflow 2.7 以上版本。建議您先升級至最新版 Airflow 2,再遷移至 Airflow 3。
- 確認環境運作正常,且已順利執行一段時間。
- 請確認 DAG 和 Airflow 設定未採用Airflow 3 已移除的任何功能。
- 請參閱將 DAG 改為與 Airflow 3 相容的操作說明,瞭解在遷移過程中是否需要變更 DAG。
- 使用 Airflow 社群版提供的
ruff工具,檢查 Airflow DAG 的相容性。如需操作說明,請參閱 Airflow 說明文件中的「檢查 Airflow DAG 是否相容」一節。
步驟 2:建立 Airflow 3 環境,轉移設定覆寫和環境變數
在這個步驟中,您會使用 Airflow 3 建立新的 Managed Airflow (第 3 代) 環境,並開始從 Airflow 2 環境轉移設定參數:
請按照建立 Managed Airflow (第 3 代) 環境的步驟操作,並完成下列事項:
- 選取 Airflow 建構版本時,請選取 Airflow 3 的建構版本。
從 Airflow 2 環境複製所有相容的 Airflow 設定選項覆寫。
從 Airflow 2 環境複製所有環境變數。
請繼續使用 Airflow 3 建立環境。
下表列出部分 Airflow 設定選項的變更。請注意,這份清單僅列舉部分內容,如要進一步瞭解 Airflow 設定選項的變更,請參閱 Airflow 說明文件中的「Airflow Configuration Reference」和「Airflow Release Notes」。
| Airflow 2 選項 | Airflow 3 選項 |
|---|---|
[scheduler]min_file_process_interval
|
[dag_processor]min_file_process_interval
|
[webserver]rbac_user_registration_role
|
[api]rbac_user_registration_role
|
[core]dag_file_processor_timeout
|
[dag_processor]dag_file_processor_timeout
|
[scheduler]dag_dir_list_interval
|
[dag_processor]refresh_interval
|
[scheduler]max_threads
|
[dag_processor]parsing_processes
|
[scheduler]parsing_processes
|
[dag_processor]parsing_processes
|
[webserver]instance_name
|
[api]instance_name
|
[scheduler]scheduler_zombie_task_threshold
|
[scheduler]task_instance_heartbeat_timeout
|
[webserver]rbac
|
已淘汰 |
[api]auth_backend=airflow.api.auth.backend.deny_all
|
已淘汰 |
[api]auth_backends=airflow.api.auth.backend.deny_all
|
已淘汰 |
[api]composer_auth_user_registration_role
|
已淘汰 |
步驟 3:將 PyPI 套件安裝到 Airflow 3 環境
建立 Airflow 3 環境後,請安裝 PyPI 套件:
- 從 Airflow 2 環境複製 PyPI 套件需求條件。
- 啟動 PyPI 套件更新作業,並等待環境更新完成。
由於 Airflow 3 環境使用不同的預先安裝套件組合,更新作業期間可能會發生 PyPI 套件衝突。如要進一步瞭解如何排解 PyPI 套件衝突問題,請參閱「與預先安裝的 PyPI 套件發生衝突」。
步驟 4:從 Airflow 2 匯出變數、連線和集區
如果沒有變數或連線,請略過相關的匯出和匯入指令。
如果除了 default_pool 以外,您還有其他自訂集區,才需要轉移集區。否則請略過匯出及匯入集區的指令。
從 Airflow 2 環境匯出變數:
gcloud composer environments run AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables -- export /home/airflow/gcs/data/variables.json更改下列內容:
AIRFLOW_2_ENV:Airflow 2 環境的名稱。AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
從 Airflow 2 環境匯出連線:
gcloud composer environments run AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ connections -- export /home/airflow/gcs/data/connections.json更改下列內容:
AIRFLOW_2_ENV:Airflow 2 環境的名稱。AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
從 Airflow 2 環境匯出集區:
gcloud composer environments run AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools -- export /home/airflow/gcs/data/pools.json更改下列內容:
AIRFLOW_2_ENV:Airflow 2 環境的名稱。AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
取得 Airflow 2 環境的 bucket 名稱:
gcloud composer environments describe AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ --format="value(storageConfig.bucket)"更改下列內容:
AIRFLOW_2_ENV:Airflow 2 環境的名稱。AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
從 Airflow 2 環境值區的
/data目錄,將variables.json、connections.json和pools.json檔案下載到本機目錄:gcloud storage cp gs://AIRFLOW_2_BUCKET/data/variables.json ./variables.json gcloud storage cp gs://AIRFLOW_2_BUCKET/data/connections.json ./connections.json gcloud storage cp gs://AIRFLOW_2_BUCKET/data/pools.json ./pools.json更改下列內容:
AIRFLOW_2_BUCKET:Airflow 2 環境的 bucket 名稱,在上一個步驟中取得。
步驟 5:將變數、連線和集區匯入 Airflow 3
如果沒有變數或連線,請略過相關的匯出和匯入指令。
如果除了 default_pool 以外,您還有其他自訂集區,才需要轉移集區。否則請略過匯出及匯入集區的指令。
設定
airflowctl,以便為 Airflow 3 環境執行 Airflow CLI 指令。使用
airflowctl將變數、連線和集區匯入 Airflow 3 環境:airflowctl variables import ./variables.json airflowctl connections import ./connections.json airflowctl pools import ./pools.json確認變數、連線和集區已匯入 Airflow 3 環境:
airflowctl variables list airflowctl connections list airflowctl pools list清理 JSON 檔案:
gcloud storage rm gs://AIRFLOW_2_BUCKET/data/variables.json gcloud storage rm gs://AIRFLOW_2_BUCKET/data/connections.json gcloud storage rm gs://AIRFLOW_2_BUCKET/data/pools.json rm ./variables.json rm ./connections.json rm ./pools.json更改下列內容:
AIRFLOW_2_BUCKET:Airflow 2 環境的 bucket 名稱。
步驟 6:從 Airflow 2 環境的值區轉移其他資料
在這個步驟中,您要從 Airflow 2 環境的 bucket 轉移剩餘資料。
取得 Airflow 3 環境的 bucket 名稱:
gcloud composer environments describe AIRFLOW_3_ENV \ --location AIRFLOW_3_LOCATION \ --format="value(storageConfig.bucket)"更改下列內容:
AIRFLOW_3_ENV:Airflow 3 環境的名稱。AIRFLOW_3_LOCATION:Airflow 3 環境所在的區域。
將 Airflow 2 環境 bucket 中的外掛程式匯出至 Airflow 3 環境 bucket 的
/plugins目錄:gcloud composer environments storage plugins export \ --destination=AIRFLOW_3_BUCKET/plugins \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION更改下列內容:
AIRFLOW_3_BUCKET:Airflow 3 環境的 bucket 名稱,在上一個步驟中取得。AIRFLOW_2_ENV:Airflow 2 環境的名稱。AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
確認已成功匯入
/plugins目錄:gcloud composer environments storage plugins list \ --environment=AIRFLOW_3_ENV \ --location=AIRFLOW_3_LOCATION更改下列內容:
AIRFLOW_3_ENV:Airflow 3 環境的名稱。AIRFLOW_3_LOCATION:Airflow 3 環境所在的區域。
將 Airflow 2 環境中的
/data目錄匯出至 Airflow 3 環境:gcloud composer environments storage data export \ --destination=AIRFLOW_3_BUCKET/data \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION更改下列內容:
AIRFLOW_3_BUCKET:Airflow 3 環境的 bucket 名稱,在上一個步驟中取得。AIRFLOW_2_ENV:Airflow 2 環境的名稱。AIRFLOW_2_LOCATION:Airflow 2 環境所在的區域。
確認
/data資料夾已成功匯入:gcloud composer environments storage data list \ --environment=AIRFLOW_3_ENV \ --location=AIRFLOW_3_LOCATION
步驟 7:轉移使用者和角色
airflowctl 尚未支援 users 和 roles 指令,因此無法遷移使用者和角色。
步驟 8:確認 DAG 已準備好在 Airflow 3 中執行
調整 Airflow DAG,使其與 Airflow 3 相容。
檢查直接存取 Airflow 資料庫的自訂撰寫工作:
在 Airflow 3 中,運算子無法使用資料庫工作階段直接存取 Airflow 中繼資料庫。如果您有自訂運算子,請檢查程式碼,確保沒有直接的資料庫存取呼叫。
您可以採用下列任一替代做法,在工作遷移時避免直接存取 Airflow 資料庫:
透過將 Airflow 資料庫內容匯出至 Cloud SQL 執行個體,存取 Airflow 資料庫。
使用 Airflow Python 用戶端。社群版 Airflow 提供的 Python 用戶端已為大多數資料表定義 API,例如
DagRuns、TaskInstances、Variables、Connections、XComs。apache-airflow-client套件已預先安裝在 Managed Airflow Airflow 3 建構版本中。從 DAG 執行
airflowctl到BashOperator。
如果查詢匯出的 Airflow 資料庫不適用於您的用途,且 Airflow Python 用戶端和
airflowctl都未提供所需功能,請考慮在 Airflow 社群版本中要求新的 API 端點或 Task SDK 功能。如果您有
KubernetesExecutor工作,請將queue="kubernetes"替換為executor="KubernetesExecutor",調整運算子定義。Airflow 3 中的
KubernetesExecutor工作範例:PythonOperator( task_id="airflow3_kubernetes_executor_task", dag=dag, python_callable=f, executor="KubernetesExecutor", )如果您在工作程式碼中使用
AIRFLOW__WEBSERVER__BASE_URL環境變數,請改用[api]base_urlAirflow 設定選項。在 Airflow 3 中取得這個值的範例如下:
from airflow.configuration import conf webserver_base_url = conf.get("api", "base_url")
步驟 9:將 DAG 轉移至 Airflow 3 環境
在環境之間轉移 DAG 時,可能會發生下列潛在問題:
如果兩個環境都啟用 DAG (未暫停),每個環境都會按照排程執行自己的 DAG 副本。這可能會導致相同資料和執行時間的 DAG 同時執行。
由於 DAG 趕上進度,Airflow 會排定額外的 DAG 執行作業,從 DAG 中指定的開始日期開始。這是因為新的 Airflow 執行個體不會將 Airflow 2 環境的 DAG 執行記錄納入考量。這可能會導致系統從指定開始日期起,排定大量 DAG 執行作業。
防止 DAG 同時執行
在 Airflow 3 環境中,覆寫 dags_are_paused_at_creation Airflow 設定選項。變更後,所有新的 DAG 預設都會暫停。
| 區段 | 鍵 | 值 |
|---|---|---|
core |
dags_are_paused_at_creation |
True |
避免 DAG 執行作業過多或過少
在轉移至 Airflow 3 環境的 DAG 中,指定新的靜態開始日期。
為避免邏輯日期出現間隙和重疊,第一次 DAG 執行必須在 Airflow 3 環境中,於下一個排程間隔發生時進行。如要這麼做,請將 DAG 中的新開始日期設為 Airflow 2 環境中上次執行的日期之前。
舉例來說,假設您的 DAG 每天會在 Airflow 2 環境中於 15:00、17:00 和 21:00 執行,而您打算在 15:15 轉移 DAG,且最後一次 DAG 執行時間是 15:00,那麼 Airflow 3 環境的開始日期可以是今天的 14:45。在 Airflow 3 環境中啟用 DAG 後,Airflow 會排定在 17:00 執行 DAG。
再舉一例,假設您的 DAG 每天會在 Airflow 2 環境中於 00:00 執行,最後一次執行 DAG 是在 2026 年 3 月 26 日 00:00,而您打算在 2026 年 3 月 26 日 13:00 轉移 DAG,則 Airflow 3 環境的開始日期可以是 2026 年 3 月 25 日 23:45。在 Airflow 3 環境中啟用 DAG 後,Airflow 會排定在 2026 年 3 月 27 日 00:00 執行 DAG。
將 DAG 逐一轉移至 Airflow 3 環境
請針對每個 DAG 按照下列程序進行轉移:
請確認 DAG 中的新開始日期已如上一節所述設定。
將更新後的 DAG 上傳至 Airflow 3 環境。由於設定覆寫,這個 DAG 在 Airflow 3 環境中已暫停,因此目前尚未排定任何 DAG 執行作業。
在 Airflow 網頁介面中,前往「DAGs」(DAG),檢查是否有回報的 DAG 語法錯誤。
在您打算轉移 DAG 時:
在 Airflow 2 環境中暫停 DAG。
在 Airflow 3 環境中取消暫停 DAG。
確認新的 DAG 執行作業是否已排定在正確的時間執行。
等待 Airflow 3 環境執行 DAG,並檢查執行是否成功。
視 DAG 執行作業是否成功而定:
如果 DAG 執行作業成功,即可繼續操作,並在 Airflow 3 環境中使用 DAG。最後,請考慮刪除 Airflow 2 版本的 DAG。
如果 DAG 執行作業失敗,請嘗試排解 DAG 問題,直到 DAG 在 Airflow 3 中成功執行為止。
如有需要,您隨時可以改回使用 Airflow 2 版本的 DAG:
在 Airflow 3 環境中暫停 DAG。
在 Airflow 3 環境中取消暫停 DAG。系統會排定新的 DAG 執行作業,日期和時間與失敗的 DAG 執行作業相同。
準備好繼續使用 Airflow 3 版本的 DAG 時,請調整開始日期,將新版 DAG 上傳至 Airflow 3 環境,然後重複上述程序。
步驟 10:監控 Airflow 3 環境
將所有 DAG 和設定轉移至 Airflow 3 環境後,請監控潛在問題、失敗的 DAG 執行作業,以及整體環境健康狀態。如果 Airflow 3 環境順利運作一段時間,即可移除 Airflow 2 環境。