Managed Airflow(第 3 世代) | Managed Airflow(第 2 世代) | Managed Airflow(レガシー第 1 世代)
このページでは、Managed Airflow でデータリネージ統合を有効にする方法について説明します。
データリネージ統合について
データリネージは Knowledge Catalogの機能で、システム内でのデータの移動(データの送信元、データの送信先、データに適用された変換)を追跡できます。
Managed Airflow は、apache-airflow-providers-openlineage パッケージを使用して、Data Lineage API に送信されるリネージイベントを生成します。
このパッケージは、Managed Airflow 環境にすでにインストールされています。このパッケージの別のバージョンをインストールすると、サポートされているオペレーターのリストが変更される可能性があります。必要な場合にのみインストールし、それ以外の場合はプリインストールされているバージョンのパッケージを保持することをおすすめします。
データリネージは、データリネージをサポートするKnowledge Catalog リージョンと同じリージョン内の環境で使用できます。
Managed Service for Apache Airflow 環境でデータリネージが有効になっている場合、 Managed Service for Apache Airflow は、 サポートされているオペレーターのいずれかを使用する DAG のリネージ情報を Data Lineage API に報告します。サポートされていないオペレーターのリネージを報告する場合は、カスタム リネージ イベントを送信することもできます。
リネージ情報には、次の方法でアクセスできます。
- データリネージ API
- Knowledge Catalog でサポートされているエントリのリネージグラフ。 詳細については、Knowledge Catalog ドキュメントの リネージグラフ をご覧ください。
環境を作成するときに、次の条件が満たされると、データリネージ統合が自動的に有効 になります。
プロジェクトで Data Lineage API が有効になっている。詳細については、 Knowledge Catalog ドキュメントの Data Lineage API の有効化をご覧ください。
カスタム リネージ バックエンドが Airflow で構成されていません。
データリネージ統合は、環境を作成する際に無効にできます。
既存の環境では、データリネージ統合をいつでも有効または 無効にできます。
Managed Service for Apache Airflow の機能に関する考慮事項
Managed Airflow は、次の場合に RPC 呼び出しを行ってリネージイベントを作成します。
- Airflow タスクが開始または終了したとき
- DAG の実行が開始または終了したとき
これらのエンティティの詳細については、Knowledge Catalog ドキュメントの リネージ情報モデル とリネージ API リファレンスをご覧ください。
出力されたリネージ トラフィックは、 Data Lineage API の割り当ての対象になります。 Managed Airflow は書き込み割り当てを消費します。
リネージデータの処理に関連する料金は、リネージの料金の対象となります。 データリネージに関する考慮事項をご覧ください。
Managed Service for Apache Airflow のパフォーマンスに関する考慮事項
データリネージは、Airflow タスクの実行の終了時に報告されます。 データリネージ レポートの平均的な所要時間は、約 1~2 秒です。
これは、タスク自体のパフォーマンスには影響しません。リネージが Lineage API に正常に報告されない場合でも、Airflow タスクは失敗しません。メインのオペレータ ロジックへの影響はありませんが、リネージデータの報告を考慮して、タスク インスタンス全体の実行時間が少し長くなります。
データリネージを報告する環境では、データリネージを報告するのに余分な時間が必要になるため、関連する費用が若干増加します。
コンプライアンス
データリネージは、VPC Service Controls などの機能向けにさまざまなサポートレベルを提供します。データリネージに関する考慮事項 を参照して、サポートレベルが環境要件を満たしていることを確認してください。
始める前に
この機能はコンプライアンスのサポートが異なります 。まず、 Managed Service for Apache Airflow に固有の機能に関する考慮事項と、 データリネージ機能に関する考慮事項の両方を確認してください。
データリネージに必要な IAM 権限はすべて、Composer ワークロード(
roles/composer.worker)ロールに含まれています。このロール は、環境のサービス アカウントに必要な ロールです。データリネージの権限の詳細については、 Knowledge Catalog ドキュメントのリネージのロールと権限をご覧ください。
オペレーターがサポートされているかどうかを確認する
データリネージのサポートは、オペレーターが配置されているプロバイダ パッケージによって提供されます。
オペレーターが配置されているプロバイダ パッケージの変更ログで、OpenLineage のサポートを追加するエントリを確認します。
たとえば、BigQueryToBigQueryOperator は、OpenLineage を
apache-airflow-providers-googleバージョン 11.0.0 以降でサポートしています。環境で使用されているプロバイダ パッケージのバージョンを確認します。これを行うには、環境で使用されている Airflow ビルドのプリインストール パッケージのリストをご覧ください。環境に別のバージョンのパッケージをインストールすることもできます。
また、サポートされているクラス
apache-airflow-providers-openlineage ドキュメントのページには、サポートされている最新のオペレーターが一覧表示されています。
データリネージ統合を構成する
Managed Service for Apache Airflow のデータリネージ統合は、環境ごとに管理されます。つまり、この機能を有効にするには、次の 2 つのステップを行う必要があります。
- プロジェクトで Data Lineage API を有効にする。
- 特定の Managed Service for Apache Airflow 環境でデータリネージ統合を有効にする。
Managed Service for Apache Airflow でデータリネージを有効にする
コンソール
コンソールで、[**環境**] ページに移動します。 Google Cloud
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[環境の設定] タブを選択します。
[Knowledge Catalog リネージ統合] セクションで、[編集] をクリックします。
[Knowledge Catalog リネージ統合] パネルで、[Knowledge Catalog のリネージとの統合を有効にする] を選択します。
[保存] をクリックします。
gcloud
--enable-cloud-data-lineage-integration 引数を使用します。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
次のように置き換えます。
ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。
例:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Managed Service for Apache Airflow でデータリネージを無効にする
Managed Service for Apache Airflow 環境でリネージ統合を無効にしても、データリネージ API は無効になりません。プロジェクトのリネージ レポートを完全に無効にする必要がある場合は、Data Lineage API も無効にします。サービスの無効化をご覧ください。
コンソール
コンソールで、[**環境**] ページに移動します。 Google Cloud
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[環境の設定] タブを選択します。
[Knowledge Catalog リネージ統合] セクションで、[編集] をクリックします。
[Knowledge Catalog リネージ統合] パネルで、[Knowledge Catalog のリネージとの統合を無効にする] を選択します。
[保存] をクリックします。
gcloud
--disable-cloud-data-lineage-integration 引数を使用します。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
次のように置き換えます。
ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。
例:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
サポートされているオペレーターでリネージイベントを送信する
データリネージが有効になっている場合、サポートされているオペレーターはリネージイベントを自動的に送信します。DAG コードを変更する必要はありません。
たとえば、次のタスクを実行します。
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
その結果、Knowledge Catalog UI で次のリネージグラフが作成されます。
カスタム リネージ イベントを送信する
自動リネージ レポートでサポートされていないオペレータのリネージを報告する場合は、カスタム リネージ イベントを送信できます。
たとえば、カスタム イベントの送信に使用するオペレータと、その際の手順は、次のとおりです。
- BashOperator: タスク定義の
inletsパラメータまたはoutletsパラメータを変更します。 - PythonOperator: タスク定義の
task.inletsパラメータまたはtask.outletsパラメータを変更します。 inletsパラメータにAUTOを使用できます。これにより、値はアップストリーム タスクのoutletsと同じ値に設定されます。
次の例は、インレットとアウトレットの使用方法を示しています。
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
その結果、Knowledge Catalog UI に次のリネージグラフが作成されます。
Managed Service for Apache Airflow でリネージログを表示する
[Knowledge Catalog リネージ統合] セクションの [環境の構成] ページのリンクを使用して、データリネージに関連するログを調べることができます。
トラブルシューティング
リネージデータが Lineage API に報告されない場合や、Knowledge Catalog で表示されない場合は、次のトラブルシューティング手順を試してください。
- Managed Service for Apache Airflow 環境のプロジェクトでデータリネージ API が有効になっていることを確認します。
- Managed Service for Apache Airflow 環境でデータリネージ統合が有効になっているかどうかを確認します。
- 使用する演算子が、自動リネージ レポートのサポートに含まれているかどうかを確認します。詳細については、サポートされる Airflow オペレーターをご覧ください。
- Managed Service for Apache Airflow のリネージログで、発生する可能性のある問題を確認します。