Dataplex Universal Catalog を使用したデータリネージ

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

このページでは、Cloud Composer でデータ リネージ統合を有効にする方法について説明します。

データリネージ統合について

データリネージDataplex Universal Catalog の機能で、システム内でのデータの移動(データの送信元、データの送信先、データに適用される変換)を追跡します。

Cloud Composer は apache-airflow-providers-openlineage パッケージを使用して、Data Lineage API に送信されるリネージ イベントを生成します。

このパッケージは Cloud Composer 環境にすでにインストールされています。このパッケージの別のバージョンをインストールすると、サポートされる演算子のリストが変更される可能性があります。プリインストールされたパッケージのバージョンは、必要になった場合にのみ変更し、それ以外の場合はそのままにしておくことをおすすめします。

環境を作成するときに、次の条件が満たされると、データリネージ統合が自動的に有効になります。

  • プロジェクトで Data Lineage API が有効になっている。詳細については、Dataplex Universal Catalog のドキュメントの Data Lineage API の有効化をご覧ください。

  • カスタム リネージ バックエンドが Airflow で構成されていません。

データリネージ統合は、環境を作成する際に無効にできます。

既存の環境では、データリネージ統合をいつでも有効または無効にできます。

Cloud Composer の機能に関する考慮事項

Cloud Composer は、次の場合に RPC 呼び出しを行ってリネージ イベントを作成します。

  • Airflow タスクが開始または終了したとき
  • DAG の実行が開始または終了したとき

これらのエンティティの詳細については、Dataplex Universal Catalog のドキュメントのリネージ情報モデルリネージ API リファレンスをご覧ください。

出力されたリネージ トラフィックは、Data Lineage API の割り当ての対象になります。Cloud Composer は書き込み割り当てを消費します。

リネージデータの処理に関連する料金は、リネージの料金の対象となります。データリネージに関する考慮事項をご覧ください。

Cloud Composer のパフォーマンスに関する考慮事項

データリネージは、Airflow タスクの実行の終了時に報告されます。データリネージ レポートの平均的な所要時間は、約 1~2 秒です。

これは、タスク自体のパフォーマンスには影響しません。リネージが Lineage API に正常に報告されない場合でも、Airflow タスクは失敗しません。メインのオペレータ ロジックへの影響はありませんが、リネージデータの報告を考慮して、タスク インスタンス全体の実行時間が少し長くなります。

データリネージを報告する環境では、データリネージを報告するのに余分な時間が必要になるため、関連する費用が若干増加します。

コンプライアンス

データリネージは、VPC Service Controls などの機能向けにさまざまなサポートレベルを提供します。データリネージに関する考慮事項を参照して、サポートレベルが環境要件を満たしていることを確認してください。

始める前に

  • この機能は、さまざまなコンプライアンス サポートを提供します。まず、Cloud Composer 固有の機能に関する考慮事項データリネージ機能に関する考慮事項の両方を確認してください。

  • データリネージ統合は、Cloud Composer バージョン 2.1.2 以降と Airflow バージョン 2.2.5 以降でサポートされています。

  • データ リネージに必要な IAM 権限はすべて、Composer ワークフロー(roles/composer.worker)ロールに含まれています。このロールは、環境のサービス アカウントに必要なロールです。

    データリネージの権限の詳細については、Dataplex Universal Catalog ドキュメントのリネージのロールと権限をご覧ください。

演算子がサポートされているかどうかを確認する

データリネージのサポートは、オペレーターが配置されているプロバイダ パッケージによって提供されます。

  1. 演算子が存在するプロバイダ パッケージの変更ログで、OpenLineage のサポートを追加するエントリを確認します。

    たとえば、BigQueryToBigQueryOperator は apache-airflow-providers-google バージョン 11.0.0 以降で OpenLineage をサポートしています。

  2. 環境で使用されているプロバイダ パッケージのバージョンを確認します。これを行うには、環境で使用されている Cloud Composer のバージョンのプリインストールされたパッケージのリストをご覧ください。環境にパッケージの別のバージョンをインストールすることもできます。

また、apache-airflow-providers-openlineage ドキュメントのサポートされているクラスのページには、サポートされている最新の演算子が記載されています。

データリネージ統合を構成する

Cloud Composer のデータリネージ統合は、環境ごとに管理されます。つまり、この機能を有効にするには、次の 2 つのステップを行う必要があります。

  1. プロジェクトで Data Lineage API を有効にする。
  2. 特定の Cloud Composer 環境でデータリネージ統合を有効にする。

Cloud Composer でデータリネージを有効にする

コンソール

  1. Google Cloud コンソールで、[環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [環境の設定] タブを選択します。

  4. [Dataplex データリネージ統合] セクションで、[編集] をクリックします。

  5. [Dataplex データリネージ統合] パネルで、[Dataplex データリネージとの統合を有効にする] を選択して [保存] をクリックします。

gcloud

--enable-cloud-data-lineage-integration 引数を使用します。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

次のように置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

Cloud Composer でデータリネージを無効にする

Cloud Composer 環境でリネージ統合を無効にしても、データリネージ API は無効になりません。プロジェクトのリネージ レポートを完全に無効にする必要がある場合は、Data Lineage API も無効にします。サービスの無効化をご覧ください。

コンソール

  1. Google Cloud コンソールで、[環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

  3. [環境の設定] タブを選択します。

  4. [Dataplex データリネージ統合] セクションで、[編集] をクリックします。

  5. [Dataplex データリネージ統合] パネルで、[Dataplex データリネージとの統合を無効にする] を選択して [保存] をクリックします。

gcloud

--disable-cloud-data-lineage-integration 引数を使用します。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

次のように置き換えます。

  • ENVIRONMENT_NAME: 環境の名前。
  • LOCATION: 環境が配置されているリージョン。

例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

サポートされているオペレーターでリネージ イベントを送信する

データリネージが有効になっている場合、サポートされているオペレータはリネージ イベントを自動的に送信します。DAG コードを変更する必要はありません。

たとえば、次のタスクを実行します。

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': 'example-project',
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

その結果、Dataplex Universal Catalog UI で次のリネージグラフが作成されます。

Dataplex UI のリネージグラフの例。
図 1. Dataplex Universal Catalog UI での BigQuery テーブルのリネージグラフの例。

カスタム リネージ イベントを送信する

自動リネージ レポートでサポートされていないオペレータのリネージを報告する場合は、カスタム リネージ イベントを送信できます。

たとえば、カスタム イベントの送信に使用するオペレータと、その際の手順は、次のとおりです。

  • BashOperator: タスク定義の inlets パラメータまたは outlets パラメータを変更します。
  • PythonOperator: タスク定義の task.inlets パラメータまたは task.outlets パラメータを変更します。
  • inlets パラメータには AUTO を使用できます。これにより、値はアップストリーム タスクの outlets と同じ値に設定されます。

次の例は、インレットとアウトレットの使用方法を示しています。

from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

...

bash_task = BashOperator(
    task_id="bash_task",
    dag=dag,
    bash_command="sleep 0",
    inlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table1",
        )
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table2",
        )
    ],
)


def _python_task(task):
    print("Python task")


python_task = PythonOperator(
    task_id="python_task",
    dag=dag,
    python_callable=_python_task,
    inlets=[
        AUTO,
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table3",
        ),
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table4",
        )
    ],
)

bash_task >> python_task

その結果、Dataplex Universal Catalog UI に次のリネージグラフが作成されます。

Dataplex UI でのカスタム イベントのリネージグラフの例。
図 2. Dataplex Universal Catalog UI で複数の BigQuery テーブルのリネージグラフを表示した例。

Cloud Composer でリネージログを表示する

[Dataplex Universal Catalog データリネージ統合] セクションの [環境の構成] ページのリンクを使用して、データリネージに関連するログを調べることができます。

トラブルシューティング

リネージデータが Lineage API に報告されない場合や、Dataplex Universal Catalog で表示されない場合は、次のトラブルシューティング手順を試してください。

  • Cloud Composer 環境のプロジェクトでデータリネージ API が有効になっていることを確認します。
  • Cloud Composer 環境でデータリネージ統合が有効になっているかどうかを確認します。
  • 使用する演算子が、自動リネージ レポートのサポートに含まれているかどうかを確認します。サポートされる Airflow オペレーターをご覧ください。
  • Cloud Composer のリネージログで、発生する可能性のある問題を確認します。

次のステップ