Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Halaman ini menjelaskan cara menggunakan Cloud Run Functions untuk memicu DAG Managed Service for Apache Airflow sebagai respons terhadap peristiwa.
Apache Airflow dirancang untuk menjalankan DAG sesuai jadwal reguler, tetapi Anda juga dapat memicu DAG sebagai respons terhadap peristiwa. Salah satu caranya adalah menggunakan Cloud Run Functions untuk memicu DAG Managed Airflow saat peristiwa tertentu terjadi.
Contoh dalam panduan ini menjalankan DAG setiap kali terjadi perubahan di bucket Cloud Storage. Perubahan pada objek apa pun dalam bucket akan memicu fungsi. Fungsi ini membuat permintaan ke Airflow REST API dari lingkungan Managed Airflow Anda. Airflow memproses permintaan ini dan menjalankan DAG. DAG menampilkan informasi tentang perubahan tersebut.
Sebelum memulai
Memeriksa konfigurasi jaringan lingkungan Anda
Solusi ini tidak berfungsi dalam konfigurasi Kontrol Layanan VPC dan IP Pribadi karena konektivitas dari Cloud Run Functions ke server web Airflow tidak dapat dikonfigurasi dalam konfigurasi ini.
Di Managed Airflow (Gen 2), Anda dapat menggunakan pendekatan lain: Memicu DAG menggunakan Cloud Run Functions dan Pesan Pub/Sub
Mengaktifkan API untuk project Anda
Konsol
Aktifkan Managed Airflow dan Cloud Run Functions API.
Peran yang diperlukan untuk mengaktifkan API
Untuk mengaktifkan API, Anda memerlukan peran IAM Service Usage Admin
(roles/serviceusage.serviceUsageAdmin),
yang berisi izin serviceusage.services.enable. Pelajari cara memberikan
peran.
gcloud
Aktifkan Managed Airflow dan Cloud Run Functions API:
Peran yang diperlukan untuk mengaktifkan API
Untuk mengaktifkan API, Anda memerlukan peran IAM Service Usage Admin (roles/serviceusage.serviceUsageAdmin), yang berisi izin
serviceusage.services.enable. Pelajari cara memberikan
peran.
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Mengaktifkan Airflow REST API
Bergantung pada versi Airflow Anda:
- Untuk Airflow 2, REST API stabil sudah diaktifkan secara default. Jika lingkungan Anda menonaktifkan API stabil, aktifkan REST API stabil.
- Untuk Airflow 1, aktifkan REST API eksperimental.
Mengizinkan panggilan API ke Airflow REST API menggunakan Kontrol Akses Server Web
Cloud Run Functions dapat menjangkau Airflow REST API menggunakan alamat IPv4 atau IPv6.
Jika Anda tidak yakin rentang IP panggilan yang akan digunakan, gunakan opsi konfigurasi default di Kontrol Akses Server Web, yaitu All IP addresses have access (default) agar tidak memblokir Cloud Run Functions Anda secara tidak sengaja.
Membuat bucket Cloud Storage
Contoh ini memicu DAG sebagai respons terhadap perubahan dalam bucket Cloud Storage. Buat bucket baru untuk digunakan dalam contoh ini.
Mendapatkan URL server web Airflow
Contoh ini membuat permintaan REST API ke endpoint server web Airflow.
Anda menggunakan bagian URL antarmuka web Airflow sebelum .appspot.com dalam kode Cloud Function Anda.
Konsol
Di Google Cloud konsol, buka halaman Environments.
Klik nama lingkungan Anda.
Di halaman Environment details, buka tab Environment configuration.
URL server web Airflow tercantum dalam item Airflow web UI.
gcloud
Jalankan perintah berikut:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
Ganti:
ENVIRONMENT_NAMEdengan nama lingkungan.LOCATIONdengan region tempat lingkungan berada.
Mendapatkan client_id proxy IAM
Untuk membuat permintaan ke endpoint Airflow REST API, fungsi ini memerlukan client ID proxy Identity and Access Management yang melindungi server web Airflow.
Managed Airflow tidak memberikan informasi ini secara langsung. Sebagai gantinya, buat permintaan yang tidak diautentikasi ke server web Airflow dan ambil client ID dari URL pengalihan:
cURL
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
Ganti AIRFLOW_URL dengan URL antarmuka web Airflow.
Dalam output, cari string yang mengikuti client_id. Contoh:
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
Python
Simpan kode berikut dalam file bernama get_client_id.py. Isi nilai Anda untuk project_id, location, dan composer_environment, lalu jalankan kode di Cloud Shell atau lingkungan lokal Anda.
Mengupload DAG ke lingkungan Anda
Upload DAG ke lingkungan Anda. DAG contoh berikut menampilkan konfigurasi eksekusi DAG yang diterima. Anda memicu DAG ini dari fungsi, yang akan Anda buat nanti dalam panduan ini.
Men-deploy Cloud Function yang memicu DAG
Anda dapat men-deploy Cloud Function menggunakan bahasa pilihan yang didukung oleh Cloud Run Functions atau Cloud Run. Tutorial ini menunjukkan a Cloud Function yang diimplementasikan di Python dan Java.
Menentukan parameter konfigurasi Cloud Function
Pemicu. Untuk contoh ini, pilih pemicu yang berfungsi saat objek baru dibuat dalam bucket, atau objek yang ada ditimpa.
Jenis Pemicu. Cloud Storage.
Jenis Peristiwa. Selesaikan / Buat.
Bucket. Pilih bucket yang harus memicu fungsi ini.
Coba lagi jika gagal. Sebaiknya nonaktifkan opsi ini untuk tujuan contoh ini. Jika Anda menggunakan fungsi sendiri di lingkungan produksi, aktifkan opsi ini untuk menangani error sementara.
Akun layanan runtime, di bagian Runtime, build, koneksi, dan setelan keamanan. Gunakan salah satu opsi berikut, bergantung pada preferensi Anda:
Pilih Akun layanan default Compute Engine. Dengan izin IAM default, akun ini dapat menjalankan fungsi yang mengakses lingkungan Managed Airflow.
Buat akun layanan kustom yang memiliki peran Pengguna Composer dan tentukan sebagai akun layanan runtime untuk fungsi ini. Opsi ini mengikuti prinsip hak istimewa minimum.
Runtime dan titik entri, pada langkah Code. Saat menambahkan kode untuk contoh ini, pilih runtime Python 3.7 atau yang lebih baru dan tentukan
trigger_dagsebagai titik entri.
Menambahkan persyaratan
Tentukan dependensi dalam file requirements.txt:
Masukkan kode berikut ke file main.py dan lakukan penggantian berikut:
Ganti nilai variabel
client_iddengan nilaiclient_idyang Anda peroleh sebelumnya.Ganti nilai variabel
webserver_iddengan project ID tenant Anda, yang merupakan bagian dari URL antarmuka web Airflow sebelum.appspot.com. Anda telah memperoleh URL antarmuka web Airflow sebelumnya.Tentukan versi Airflow REST API yang Anda gunakan:
- Jika Anda menggunakan Airflow REST API stabil, tetapkan variabel
USE_EXPERIMENTAL_APIkeFalse. - Jika Anda menggunakan Airflow REST API eksperimental, tidak ada perubahan yang diperlukan. Variabel
USE_EXPERIMENTAL_APIsudah ditetapkan keTrue.
- Jika Anda menggunakan Airflow REST API stabil, tetapkan variabel
Menguji fungsi
Untuk memeriksa apakah fungsi dan DAG Anda berfungsi sesuai harapan:
- Tunggu hingga fungsi Anda di-deploy.
- Upload file ke bucket Cloud Storage Anda. Sebagai alternatif, Anda dapat memicu fungsi secara manual dengan memilih tindakan Test the function untuk fungsi tersebut di Google Cloud konsol.
- Periksa halaman DAG di antarmuka web Airflow. DAG harus memiliki satu eksekusi DAG yang aktif atau sudah selesai.
- Di UI Airflow, periksa log tugas untuk eksekusi ini. Anda akan melihat bahwa tugas
print_gcs_infomenampilkan data yang diterima dari fungsi ke log:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
Langkah berikutnya
- Mengakses UI Airflow
- Mengakses Airflow REST API
- Menulis DAG
- Menulis Cloud Run Functions
- Pemicu Cloud Storage