Managed Airflow (第 3 代) | Managed Airflow (第 2 代) | Managed Airflow (舊版第 1 代)
本頁說明如何在 Managed Airflow 中啟用資料沿襲整合功能。
關於資料歷程整合功能
資料歷程是 Knowledge Catalog 的功能,可追蹤資料在系統中的移動情形,包括來源、傳遞目的地和採用的轉換作業。
Managed Airflow 會使用 apache-airflow-providers-openlineage
套件產生歷程事件,並傳送至 Data Lineage API。
這個套件已安裝在 Managed Airflow 環境中。如果您安裝其他版本的套件,支援的運算子清單可能會有所變更。建議您只在必要時這麼做,否則請保留預先安裝的套件版本。
資料沿襲適用於與支援資料沿襲的 Knowledge Catalog 區域相同的環境。
如果 Managed Service for Apache Airflow 環境已啟用資料歷程,Managed Service for Apache Airflow 會將使用任何支援運算子的 DAG 歷程資訊,回報給 Data Lineage API。如要為不受支援的運算子回報沿襲,也可以傳送自訂沿襲事件。
您可以透過下列方式存取沿襲資訊:
建立環境時,如果符合下列條件,系統會自動啟用資料沿襲整合功能:
專案已啟用 Data Lineage API。詳情請參閱 Knowledge Catalog 說明文件中的「啟用資料沿襲 API」。
Airflow 中未設定自訂 Lineage Backend。
您可以在建立環境時停用資料歷程整合功能。
Managed Service for Apache Airflow 的功能考量
在下列情況下,Managed Airflow 會發出 RPC 呼叫,以建立沿襲事件:
- Airflow 工作開始或完成時
- DAG 執行作業開始或結束時
如要進一步瞭解這些實體,請參閱 Knowledge Catalog 說明文件中的歷程資訊模型和 Lineage API 參考資料。
發出的歷程流量須遵守 Data Lineage API 的配額。Managed Airflow 會耗用寫入配額。
處理沿襲資料的相關費用適用於沿襲定價。 請參閱資料歷程考量事項。
Managed Service for Apache Airflow 的效能考量
Airflow 工作執行完畢後,系統會回報資料歷程。 平均而言,資料歷程報表約需 1 到 2 秒即可產生。
這不會影響工作本身的效能:如果系統未成功向 Lineage API 報告沿襲資訊,Airflow 工作不會失敗。主要運算子邏輯不會受到影響,但整個工作執行個體會執行較長的時間,以納入報表沿襲資料。
如果環境會回報資料沿襲,相關費用會稍微增加,因為回報資料沿襲需要額外時間。
法規遵循
資料沿襲功能對 VPC Service Controls 等功能提供不同支援層級。請參閱資料沿襲注意事項,確保支援等級符合環境需求。
事前準備
這項功能提供不同的法規遵循支援。請務必先查看Managed Service for Apache Airflow 專屬的功能考量,以及資料沿襲功能考量。
在 Airflow 2.2.5 以上版本中,Managed Airflow 2.1.2 以上版本支援資料歷程整合功能。
Composer Worker (
roles/composer.worker) 角色已包含資料歷程功能的所有必要 IAM 權限。這是環境服務帳戶的必要角色。如要進一步瞭解資料沿襲權限,請參閱 Knowledge Catalog 說明文件中的沿襲角色和權限。
確認是否支援電信業者
資料沿襲支援是由運算子所在供應商套件提供:
檢查運算子所在供應器套件的變更記錄,找出新增 OpenLineage 支援的項目。
舉例來說,BigQueryToBigQueryOperator 從
apache-airflow-providers-google11.0.0 版開始支援 OpenLineage。檢查環境使用的供應商套件版本。如要這麼做,請參閱環境中使用的 Managed Airflow 版本預先安裝的套件清單。您也可以在環境中安裝其他版本的套件。
此外,apache-airflow-providers-openlineage 說明文件中的「支援的類別」頁面列出了最新支援的運算子。
設定資料歷程整合功能
Managed Service for Apache Airflow 的資料沿襲整合功能是以環境為單位進行管理。也就是說,啟用這項功能需要兩個步驟:
- 在專案中啟用 Data Lineage API。
- 在特定 Managed Service for Apache Airflow 環境中啟用資料沿襲整合功能。
在 Managed Service for Apache Airflow 中啟用資料沿襲功能
控制台
前往 Google Cloud 控制台的「Environments」(環境) 頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
選取「環境設定」分頁標籤。
在「Knowledge Catalog Lineage integration」(Knowledge Catalog 歷程整合功能) 部分中,按一下「Edit」(編輯)。
在「Knowledge Catalog Lineage integration」(Knowledge Catalog 歷程整合功能) 面板中,選取「Enable integration with Knowledge Catalog Lineage」(啟用 Knowledge Catalog 歷程整合功能)。
按一下 [儲存]。
gcloud
使用 --enable-cloud-data-lineage-integration 引數。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
更改下列內容:
ENVIRONMENT_NAME:環境名稱。LOCATION:環境所在的區域。
範例:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
在 Managed Service for Apache Airflow 中停用資料沿襲
在 Managed Service for Apache Airflow 環境中停用歷程整合功能,不會停用 Data Lineage API。如要徹底停用專案的歷程報表功能,請一併停用 Data Lineage API。詳情請參閱停用服務的相關說明。
控制台
前往 Google Cloud 控制台的「Environments」(環境) 頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
選取「環境設定」分頁標籤。
在「Knowledge Catalog Lineage integration」(Knowledge Catalog 歷程整合功能) 部分中,按一下「Edit」(編輯)。
在「Knowledge Catalog 歷程整合」面板中,選取「停用 Knowledge Catalog 歷程整合功能」。
按一下 [儲存]。
gcloud
使用 --disable-cloud-data-lineage-integration 引數。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
更改下列內容:
ENVIRONMENT_NAME:環境名稱。LOCATION:環境所在的區域。
範例:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
在支援的運算子中傳送沿襲事件
如果啟用資料歷程,支援的運算子會自動傳送歷程事件。您不需要變更 DAG 程式碼。
舉例來說,執行下列工作:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
結果是在 Knowledge Catalog UI 中建立下列沿襲圖:
傳送自訂沿襲事件
如要為不支援自動沿襲報告的運算子回報沿襲,可以傳送自訂沿襲事件。
舉例來說,如要傳送自訂事件,並使用:
- BashOperator:修改工作定義中的
inlets或outlets參數。 - PythonOperator:修改工作定義中的
task.inlets或task.outlets參數。 - 您可以將
AUTO用於inlets參數。這會將值設為上游工作的outlets。
以下範例說明如何使用入口和出口:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
因此,系統會在 Knowledge Catalog UI 中建立下列沿襲圖:
在 Managed Service for Apache Airflow 中查看沿襲記錄
您可以使用「環境設定」頁面上的連結,檢查與資料歷程相關的記錄。這個連結位於「Knowledge Catalog 歷程整合」部分。
疑難排解
如果系統未將沿襲資料回報給 Lineage API,或您無法在 Knowledge Catalog 中查看沿襲資料,請嘗試下列疑難排解步驟:
- 確認已在 Managed Service for Apache Airflow 環境的專案中啟用 Data Lineage API。
- 檢查 Managed Service for Apache Airflow 環境是否已啟用資料沿襲整合功能。
- 確認您使用的運算子是否包含在自動沿襲報表支援中。詳情請參閱「支援的 Airflow 運算子」。
- 在 Managed Service for Apache Airflow 中檢查沿襲記錄,找出可能的問題。