Urutan data dengan Katalog Universal Dataplex

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Halaman ini menjelaskan cara mengaktifkan integrasi silsilah data di Cloud Composer.

Tentang integrasi silsilah data

Silsilah data adalah fitur Dataplex Universal Catalog yang melacak cara data berpindah melalui sistem Anda: dari mana data berasal, ke mana data diteruskan, dan transformasi apa yang diterapkan padanya.

Cloud Composer menggunakan paket apache-airflow-providers-openlineage untuk membuat peristiwa silsilah yang dikirim ke Data Lineage API.

Paket ini sudah diinstal di lingkungan Cloud Composer. Jika Anda menginstal versi lain paket ini, daftar operator yang didukung dapat berubah. Sebaiknya lakukan hal ini hanya jika diperlukan. Jika tidak, pertahankan versi paket yang sudah diinstal sebelumnya.

  • Silsilah data tersedia untuk lingkungan di region yang sama dengan region Dataplex Universal Catalog yang mendukung silsilah data.

  • Jika silsilah data diaktifkan di lingkungan Cloud Composer Anda, Cloud Composer akan melaporkan informasi silsilah ke Data Lineage API untuk DAG yang menggunakan salah satu operator yang didukung. Anda juga dapat mengirim peristiwa silsilah kustom jika ingin melaporkan silsilah untuk operator yang tidak didukung.

  • Anda dapat mengakses informasi silsilah dengan:

    • Data Lineage API
    • Grafik silsilah untuk entri yang didukung di Dataplex Universal Catalog. Untuk mengetahui informasi selengkapnya, lihat Grafik silsilah data dalam dokumentasi Dataplex Universal Catalog.

Saat Anda membuat lingkungan, integrasi silsilah data akan diaktifkan secara otomatis jika kondisi berikut terpenuhi:

  • Data Lineage API diaktifkan di project Anda. Untuk mengetahui informasi selengkapnya, baca bagian Mengaktifkan Data Lineage API dalam dokumentasi Dataplex Universal Catalog.

  • Backend Silsilah kustom tidak dikonfigurasi di Airflow.

Anda dapat menonaktifkan integrasi silsilah data saat membuat lingkungan.

Untuk lingkungan yang sudah ada, Anda dapat mengaktifkan atau menonaktifkan integrasi silsilah data kapan saja.

Pertimbangan fitur di Cloud Composer

Cloud Composer melakukan panggilan RPC untuk membuat peristiwa silsilah dalam kasus berikut:

  • Saat tugas Airflow dimulai atau selesai
  • Saat operasi DAG dimulai atau selesai

Untuk mengetahui detail tentang entity ini, baca artikel model informasi silsilah dan referensi Lineage API dalam dokumentasi Dataplex Universal Catalog.

Traffic silsilah yang dihasilkan akan dibatasi oleh kuota di Data Lineage API. Cloud Composer menggunakan kuota Penulisan.

Harga yang terkait dengan penanganan data silsilah akan bergantung pada harga silsilah. Baca artikel pertimbangan silsilah data.

Pertimbangan performa di Cloud Composer

Silsilah data dilaporkan di akhir eksekusi tugas Airflow. Rata-rata, pelaporan silsilah data memerlukan waktu sekitar 1-2 detik.

Hal ini tidak memengaruhi performa tugas itu sendiri: Tugas Airflow tidak akan gagal jika silsilah tidak berhasil dilaporkan ke Lineage API. Tidak ada dampak pada logika operator utama, tetapi seluruh instance tugas akan dieksekusi sedikit lebih lama untuk mengantisipasi pelaporan silsilah data.

Lingkungan yang melaporkan silsilah data akan mengalami sedikit peningkatan biaya yang relevan, karena diperlukan waktu tambahan untuk melaporkan silsilah data.

Kepatuhan

Silsilah data menawarkan berbagai tingkat dukungan untuk fitur seperti Kontrol Layanan VPC. Baca artikel pertimbangan silsilah data untuk memastikan kesesuaian tingkat dukungan dengan persyaratan lingkungan Anda.

Sebelum memulai

Memeriksa operator yang didukung

Dukungan silsilah data disediakan oleh paket penyedia tempat operator berada:

  1. Periksa log perubahan paket penyedia tempat operator berada untuk mengetahui entri yang menambahkan dukungan OpenLineage.

    Misalnya, BigQueryToBigQueryOperator mendukung OpenLineage mulai dari apache-airflow-providers-google versi 11.0.0.

  2. Periksa versi paket penyedia yang digunakan oleh lingkungan Anda. Untuk melakukannya, lihat daftar paket yang diprainstal untuk versi Cloud Composer yang digunakan di lingkungan Anda. Anda juga dapat menginstal versi lain paket di lingkungan Anda.

Selain itu, halaman Supported classes dalam dokumentasi apache-airflow-providers-openlineage memberikan daftar terbaru semua operator yang didukung.

Mengonfigurasi integrasi silsilah data

Integrasi silsilah data untuk Cloud Composer dikelola per lingkungan. Artinya, diperlukan dua langkah untuk mengaktifkan fitur ini:

  1. Aktifkan Data Lineage API di project Anda.
  2. Aktifkan integrasi silsilah data di lingkungan Cloud Composer tertentu.

Mengaktifkan silsilah data di Cloud Composer

Konsol

  1. Di konsol Google Cloud , buka halaman Environments.

    Buka Environments

  2. Dalam daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.

  3. Pilih tab Environment configuration.

  4. Di bagian Dataplex data lineage integration, klik Edit.

  5. Pada panel Dataplex data lineage integration, pilih Enable integration with Dataplex data lineage, lalu klik Save.

gcloud

Gunakan argumen --enable-cloud-data-lineage-integration.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

Ganti kode berikut:

  • ENVIRONMENT_NAME: nama lingkungan Anda.
  • LOCATION: region tempat lingkungan berada.

Contoh:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

Menonaktifkan silsilah data di Cloud Composer

Menonaktifkan integrasi silsilah data di lingkungan Cloud Composer tidak akan menonaktifkan Data Lineage API. Jika Anda ingin menonaktifkan pelaporan silsilah data sepenuhnya untuk project Anda, nonaktifkan juga Data Lineage API. Baca artikel Menonaktifkan layanan.

Konsol

  1. Di konsol Google Cloud , buka halaman Environments.

    Buka Environments

  2. Dalam daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.

  3. Pilih tab Environment configuration.

  4. Di bagian Dataplex data lineage integration, klik Edit.

  5. Pada panel Dataplex data lineage integration, pilih Disable integration with Dataplex data lineage, lalu klik Save.

gcloud

Gunakan argumen --disable-cloud-data-lineage-integration.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

Ganti kode berikut:

  • ENVIRONMENT_NAME: nama lingkungan Anda.
  • LOCATION: region tempat lingkungan berada.

Contoh:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

Mengirim peristiwa silsilah di operator yang didukung

Jika silsilah data diaktifkan, operator yang didukung akan mengirim peristiwa silsilah secara otomatis. Anda tidak perlu mengubah kode DAG.

Misalnya, menjalankan tugas berikut:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': 'example-project',
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

Akan menghasilkan grafik silsilah berikut di UI Dataplex Universal Catalog:

Contoh grafik silsilah di UI Dataplex.
Gambar 1. Contoh grafik silsilah untuk tabel BigQuery di UI Dataplex Universal Catalog.

Mengirim peristiwa silsilah kustom

Anda dapat mengirim peristiwa silsilah kustom jika ingin melaporkan silsilah untuk operator yang tidak didukung untuk pelaporan silsilah otomatis.

Misalnya, untuk mengirim peristiwa kustom dengan:

  • BashOperator: ubah parameter inlets atau outlets dalam definisi tugas.
  • PythonOperator: ubah parameter task.inlets atau task.outlets dalam definisi tugas.
  • Anda dapat menggunakan AUTO untuk parameter inlets. Tindakan ini menetapkan nilainya sama dengan outlets untuk tugas upstream-nya.

Contoh berikut menunjukkan penggunaan inlet dan outlet:

from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

...

bash_task = BashOperator(
    task_id="bash_task",
    dag=dag,
    bash_command="sleep 0",
    inlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table1",
        )
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table2",
        )
    ],
)


def _python_task(task):
    print("Python task")


python_task = PythonOperator(
    task_id="python_task",
    dag=dag,
    python_callable=_python_task,
    inlets=[
        AUTO,
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table3",
        ),
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table4",
        )
    ],
)

bash_task >> python_task

Ini menghasilkan grafik silsilah berikut di UI Dataplex Universal Catalog:

Contoh grafik silsilah untuk peristiwa kustom di UI Dataplex.
Gambar 2. Contoh grafik silsilah untuk beberapa tabel BigQuery di UI Dataplex Universal Catalog.

Melihat log silsilah di Cloud Composer

Anda dapat memeriksa log yang terkait dengan silsilah data menggunakan link di halaman Environment configuration di bagian Dataplex Universal Catalog data lineage integration.

Pemecahan masalah

Jika data silsilah tidak dilaporkan ke Lineage API, atau Anda tidak dapat melihatnya di Dataplex Universal Catalog, coba langkah-langkah pemecahan masalah berikut:

  • Pastikan Data Lineage API diaktifkan di project lingkungan Cloud Composer Anda.
  • Periksa apakah integrasi silsilah data diaktifkan di lingkungan Cloud Composer.
  • Periksa apakah operator yang Anda gunakan disertakan dalam dukungan pelaporan silsilah otomatis. Lihat Operator Airflow yang didukung.
  • Lihat log silsilah di Cloud Composer untuk mengetahui masalah yang mungkin timbul.

Langkah berikutnya