Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このページでは、Cloud Composer でデータ リネージ統合を有効にする方法について説明します。
データリネージ統合について
データリネージは Dataplex Universal Catalog の機能で、システム内でのデータの移動(データの送信元、データの送信先、データに適用される変換)を追跡します。
Cloud Composer は apache-airflow-providers-openlineage
パッケージを使用して、Data Lineage API に送信されるリネージ イベントを生成します。
このパッケージは Cloud Composer 環境にすでにインストールされています。このパッケージの別のバージョンをインストールすると、サポートされる演算子のリストが変更される可能性があります。プリインストールされたパッケージのバージョンは、必要になった場合にのみ変更し、それ以外の場合はそのままにしておくことをおすすめします。
データリネージは、データリネージをサポートする Dataplex Universal Catalog リージョンと同じリージョン内の環境で使用できます。
Cloud Composer 環境でデータリネージが有効になっている場合、Cloud Composer は、サポートされているオペレーターを利用する DAG のリネージ情報を Data Lineage API に報告します。サポートされていないオペレータのリネージを報告する場合は、カスタム リネージ イベントを送信することもできます。
リネージ情報には、次の方法でアクセスできます。
- データリネージ API
- Dataplex Universal Catalog でサポートされているエントリのリネージグラフ。詳細については、Dataplex Universal Catalog のドキュメントのリネージグラフをご覧ください。
環境を作成するときに、次の条件が満たされると、データリネージ統合が自動的に有効になります。
プロジェクトで Data Lineage API が有効になっている。詳細については、Dataplex Universal Catalog のドキュメントの Data Lineage API の有効化をご覧ください。
カスタム リネージ バックエンドが Airflow で構成されていません。
データリネージ統合は、環境を作成する際に無効にできます。
既存の環境では、データリネージ統合をいつでも有効または無効にできます。
Cloud Composer の機能に関する考慮事項
Cloud Composer は、次の場合に RPC 呼び出しを行ってリネージ イベントを作成します。
- Airflow タスクが開始または終了したとき
- DAG の実行が開始または終了したとき
これらのエンティティの詳細については、Dataplex Universal Catalog のドキュメントのリネージ情報モデルとリネージ API リファレンスをご覧ください。
出力されたリネージ トラフィックは、Data Lineage API の割り当ての対象になります。Cloud Composer は書き込み割り当てを消費します。
リネージデータの処理に関連する料金は、リネージの料金の対象となります。データリネージに関する考慮事項をご覧ください。
Cloud Composer のパフォーマンスに関する考慮事項
データリネージは、Airflow タスクの実行の終了時に報告されます。データリネージ レポートの平均的な所要時間は、約 1~2 秒です。
これは、タスク自体のパフォーマンスには影響しません。リネージが Lineage API に正常に報告されない場合でも、Airflow タスクは失敗しません。メインのオペレータ ロジックへの影響はありませんが、リネージデータの報告を考慮して、タスク インスタンス全体の実行時間が少し長くなります。
データリネージを報告する環境では、データリネージを報告するのに余分な時間が必要になるため、関連する費用が若干増加します。
コンプライアンス
データリネージは、VPC Service Controls などの機能向けにさまざまなサポートレベルを提供します。データリネージに関する考慮事項を参照して、サポートレベルが環境要件を満たしていることを確認してください。
始める前に
この機能は、さまざまなコンプライアンス サポートを提供します。まず、Cloud Composer 固有の機能に関する考慮事項とデータリネージ機能に関する考慮事項の両方を確認してください。
データ リネージに必要な IAM 権限はすべて、Composer ワークフロー(
roles/composer.worker
)ロールに含まれています。このロールは、環境のサービス アカウントに必要なロールです。データリネージの権限の詳細については、Dataplex Universal Catalog ドキュメントのリネージのロールと権限をご覧ください。
演算子がサポートされているかどうかを確認する
データリネージのサポートは、オペレーターが配置されているプロバイダ パッケージによって提供されます。
演算子が存在するプロバイダ パッケージの変更ログで、OpenLineage のサポートを追加するエントリを確認します。
たとえば、BigQueryToBigQueryOperator は
apache-airflow-providers-google
バージョン 11.0.0 以降で OpenLineage をサポートしています。環境で使用されているプロバイダ パッケージのバージョンを確認します。これを行うには、環境で使用されている Airflow ビルドのプリインストールされているパッケージのリストをご覧ください。環境にパッケージの別のバージョンをインストールすることもできます。
また、apache-airflow-providers-openlineage
ドキュメントのサポートされているクラスのページには、サポートされている最新の演算子が記載されています。
データリネージ統合を構成する
Cloud Composer のデータリネージ統合は、環境ごとに管理されます。つまり、この機能を有効にするには、次の 2 つのステップを行う必要があります。
- プロジェクトで Data Lineage API を有効にする。
- 特定の Cloud Composer 環境でデータリネージ統合を有効にする。
Cloud Composer でデータリネージを有効にする
コンソール
Google Cloud コンソールで、[環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[環境の設定] タブを選択します。
[Dataplex データリネージ統合] セクションで、[編集] をクリックします。
[Dataplex データリネージ統合] パネルで、[Dataplex データリネージとの統合を有効にする] を選択して [保存] をクリックします。
gcloud
--enable-cloud-data-lineage-integration
引数を使用します。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
次のように置き換えます。
ENVIRONMENT_NAME
: 環境の名前。LOCATION
: 環境が配置されているリージョン。
例:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Cloud Composer でデータリネージを無効にする
Cloud Composer 環境でリネージ統合を無効にしても、データリネージ API は無効になりません。プロジェクトのリネージ レポートを完全に無効にする必要がある場合は、Data Lineage API も無効にします。サービスの無効化をご覧ください。
コンソール
Google Cloud コンソールで、[環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[環境の設定] タブを選択します。
[Dataplex データリネージ統合] セクションで、[編集] をクリックします。
[Dataplex データリネージ統合] パネルで、[Dataplex データリネージとの統合を無効にする] を選択して [保存] をクリックします。
gcloud
--disable-cloud-data-lineage-integration
引数を使用します。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
次のように置き換えます。
ENVIRONMENT_NAME
: 環境の名前。LOCATION
: 環境が配置されているリージョン。
例:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
サポートされているオペレーターでリネージ イベントを送信する
データリネージが有効になっている場合、サポートされているオペレータはリネージ イベントを自動的に送信します。DAG コードを変更する必要はありません。
たとえば、次のタスクを実行します。
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
その結果、Dataplex Universal Catalog UI で次のリネージグラフが作成されます。

カスタム リネージ イベントを送信する
自動リネージ レポートでサポートされていないオペレータのリネージを報告する場合は、カスタム リネージ イベントを送信できます。
たとえば、カスタム イベントの送信に使用するオペレータと、その際の手順は、次のとおりです。
- BashOperator: タスク定義の
inlets
パラメータまたはoutlets
パラメータを変更します。 - PythonOperator: タスク定義の
task.inlets
パラメータまたはtask.outlets
パラメータを変更します。 inlets
パラメータにはAUTO
を使用できます。これにより、値はアップストリーム タスクのoutlets
と同じ値に設定されます。
次の例は、インレットとアウトレットの使用方法を示しています。
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
その結果、Dataplex Universal Catalog UI に次のリネージグラフが作成されます。

Cloud Composer でリネージログを表示する
[Dataplex Universal Catalog データリネージ統合] セクションの [環境の構成] ページのリンクを使用して、データリネージに関連するログを調べることができます。
トラブルシューティング
リネージデータが Lineage API に報告されない場合や、Dataplex Universal Catalog で表示されない場合は、次のトラブルシューティング手順を試してください。
- Cloud Composer 環境のプロジェクトでデータリネージ API が有効になっていることを確認します。
- Cloud Composer 環境でデータリネージ統合が有効になっているかどうかを確認します。
- 使用する演算子が、自動リネージ レポートのサポートに含まれているかどうかを確認します。サポートされる Airflow オペレーターをご覧ください。
- Cloud Composer のリネージログで、発生する可能性のある問題を確認します。