Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このページでは、Cloud Composer でデータリネージ統合を有効にする方法について説明します。
データリネージ統合について
データリネージは Knowledge Catalogの機能で、システム内でのデータの移動(データの送信元、データの送信先、データに適用された変換)を追跡できます。
Cloud Composer は、apache-airflow-providers-openlineage パッケージを使用して、Data Lineage API に送信されるリネージイベントを生成します。
このパッケージは、Cloud Composer 環境にすでにインストールされています。このパッケージの別のバージョンをインストールすると、サポートされているオペレーターのリストが変更される可能性があります。必要な場合にのみインストールし、それ以外の場合はプリインストールされているバージョンのパッケージを保持することをおすすめします。
データリネージは、データリネージをサポートするKnowledge Catalog リージョンと同じリージョン内の環境で使用できます。
Cloud Composer 環境でデータリネージが有効になっている場合、 Cloud Composer は、 サポートされているオペレーターを使用する DAG のリネージ情報を Data Lineage API に報告します。サポートされていないオペレーターのリネージを報告する場合は、カスタム リネージ イベントを送信することもできます。
リネージ情報には、次の方法でアクセスできます。
- データリネージ API
- Knowledge Catalog でサポートされているエントリのリネージグラフ。 詳細については、Knowledge Catalog ドキュメントの リネージグラフ をご覧ください。
環境を作成するときに、次の条件が満たされると、データリネージ統合が自動的に有効 になります。
プロジェクトで Data Lineage API が有効になっている。詳細については、 Knowledge Catalog ドキュメントの Data Lineage API の有効化をご覧ください。
カスタム リネージ バックエンドが Airflow で構成されていません。
データリネージ統合は、環境を作成する際に無効にできます。
既存の環境では、データリネージ統合をいつでも有効または 無効にできます。
Cloud Composer の機能に関する考慮事項
Cloud Composer は、次の場合に RPC 呼び出しを行ってリネージイベントを作成します。
- Airflow タスクが開始または終了したとき
- DAG の実行が開始または終了したとき
これらのエンティティの詳細については、Knowledge Catalog ドキュメントの リネージ情報モデル とリネージ API リファレンスをご覧ください。
出力されたリネージ トラフィックは、 Data Lineage API の割り当ての対象になります。 Cloud Composer は書き込み割り当てを消費します。
リネージデータの処理に関連する料金は、リネージの料金の対象となります。データリネージに関する考慮事項をご覧ください。
Cloud Composer のパフォーマンスに関する考慮事項
データリネージは、Airflow タスクの実行の終了時に報告されます。 データリネージ レポートの平均的な所要時間は、約 1~2 秒です。
これは、タスク自体のパフォーマンスには影響しません。リネージが Lineage API に正常に報告されない場合でも、Airflow タスクは失敗しません。メインのオペレータ ロジックへの影響はありませんが、リネージデータの報告を考慮して、タスク インスタンス全体の実行時間が少し長くなります。
データリネージを報告する環境では、データリネージを報告するのに余分な時間が必要になるため、関連する費用が若干増加します。
コンプライアンス
データリネージは、VPC Service Controls などの機能向けにさまざまなサポートレベルを提供します。データリネージに関する考慮事項 を参照して、サポートレベルが環境要件を満たしていることを確認してください。
始める前に
この機能はさまざまなコンプライアンス サポート を提供します。まず、 Cloud Composer 固有の 機能に関する考慮事項と データリネージ機能に関する考慮事項をご覧ください。
データリネージに必要な IAM 権限はすべて、Composer Worker(
roles/composer.worker)ロールに含まれています。このロール は、環境のサービス アカウントに必要な ロールです。データリネージの権限の詳細については、 Knowledge Catalog ドキュメントのリネージのロールと権限をご覧ください。
オペレーターがサポートされているかどうかを確認する
データリネージのサポートは、オペレーターが配置されているプロバイダ パッケージによって提供されます。
オペレーターが配置されているプロバイダ パッケージの変更ログで、OpenLineage のサポートを追加するエントリを確認します。
たとえば、BigQueryToBigQueryOperator は、OpenLineage を
apache-airflow-providers-googleバージョン 11.0.0 以降でサポートしています。環境で使用されているプロバイダ パッケージのバージョンを確認します。これを行うには、環境で使用されている Airflow ビルドのプリインストール パッケージのリストをご覧ください。環境に別のバージョンのパッケージをインストールすることもできます。
また、サポートされているクラス
apache-airflow-providers-openlineage ドキュメントのページには、サポートされている最新のオペレーターが一覧表示されています。
データリネージ統合を構成する
Cloud Composer のデータリネージ統合は、環境ごとに管理されます。つまり、この機能を有効にするには、次の 2 つのステップを行う必要があります。
- プロジェクトで Data Lineage API を有効にする。
- 特定の Cloud Composer 環境でデータリネージ統合を有効にする。
Cloud Composer でデータリネージを有効にする
コンソール
コンソールで、[**環境**] ページに移動します。 Google Cloud
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[環境の設定] タブを選択します。
[Dataplex データリネージ統合] セクションで、[編集] をクリックします。
[Dataplex データリネージ統合] パネルで、[Dataplex データリネージとの統合を有効にする] をオンにして、[保存] をクリックします。
gcloud
--enable-cloud-data-lineage-integration 引数を使用します。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
次のように置き換えます。
ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。
例:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Cloud Composer でデータリネージを無効にする
Cloud Composer 環境でリネージ統合を無効にしても、データリネージ API は無効になりません。プロジェクトのリネージ レポートを完全に無効にする必要がある場合は、Data Lineage API も無効にします。サービスの無効化をご覧ください。
コンソール
コンソールで、[**環境**] ページに移動します。 Google Cloud
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[環境の設定] タブを選択します。
[Dataplex データリネージ統合] セクションで、[編集] をクリックします。
[Dataplex データリネージ統合] パネルで、[Dataplex データリネージとの統合を無効にする] を選択して [保存] をクリックします。
gcloud
--disable-cloud-data-lineage-integration 引数を使用します。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
次のように置き換えます。
ENVIRONMENT_NAME: 環境の名前。LOCATION: 環境が配置されているリージョン。
例:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
サポートされているオペレーターでリネージイベントを送信する
データリネージが有効になっている場合、サポートされているオペレーターはリネージイベントを自動的に送信します。DAG コードを変更する必要はありません。
たとえば、次のタスクを実行します。
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
その結果、Knowledge Catalog UI で次のリネージグラフが作成されます。
カスタム リネージ イベントを送信する
自動リネージ レポートでサポートされていないオペレータのリネージを報告する場合は、カスタム リネージ イベントを送信できます。
たとえば、カスタム イベントの送信に使用するオペレータと、その際の手順は、次のとおりです。
- BashOperator: タスク定義の
inletsパラメータまたはoutletsパラメータを変更します。 - PythonOperator: タスク定義の
task.inletsパラメータまたはtask.outletsパラメータを変更します。 inletsパラメータにAUTOを使用できます。これにより、値はアップストリーム タスクのoutletsと同じ値に設定されます。
次の例は、インレットとアウトレットの使用方法を示しています。
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
その結果、Knowledge Catalog UI に次のリネージグラフが作成されます。
Cloud Composer でリネージログを表示する
[Knowledge Catalog データリネージ統合] セクションの [環境の構成] ページのリンクを使用して、データリネージに関連するログを調べることができます。
トラブルシューティング
リネージデータが Lineage API に報告されない場合や、Knowledge Catalog で表示されない場合は、次のトラブルシューティング手順を試してください。
- Cloud Composer 環境のプロジェクトでデータリネージ API が有効になっていることを確認します。
- Cloud Composer 環境でデータリネージ統合が有効になっているかどうかを確認します。
- 使用する演算子が、自動リネージ レポートのサポートに含まれているかどうかを確認します。サポートされる Airflow オペレーターをご覧ください。
- Cloud Composer のリネージログで、発生する可能性のある問題を確認します。