Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Halaman ini menjelaskan cara mengelola DAG di lingkungan Managed Airflow Anda.
Managed Airflow menggunakan bucket Cloud Storage untuk menyimpan DAG lingkungan Managed Airflow Anda. Lingkungan Anda menyinkronkan DAG dari bucket ini ke komponen Airflow seperti pekerja Airflow dan penjadwal.
Sebelum memulai
- Karena Apache Airflow tidak memberikan isolasi DAG yang kuat, sebaiknya pertahankan lingkungan produksi dan pengujian terpisah untuk mencegah gangguan DAG. Untuk mengetahui informasi selengkapnya, lihat Menguji DAG.
- Pastikan akun Anda memiliki izin yang cukup untuk mengelola DAG.
- Perubahan pada DAG diterapkan ke Airflow dalam waktu 3-5 menit. Anda dapat melihat status tugas di UI Airflow.
Mengakses bucket lingkungan Anda
Untuk mengakses bucket yang terkait dengan lingkungan Anda:
Konsol
Di konsol Google Cloud , buka halaman Environments.
Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda dan di kolom DAGs folder , klik link DAGs. Halaman Bucket details akan terbuka. Halaman ini menampilkan konten folder
/dagsdi bucket lingkungan Anda.
gcloud
gcloud CLI memiliki perintah terpisah untuk menambahkan dan menghapus DAG di bucket lingkungan Anda.
Jika ingin berinteraksi dengan bucket lingkungan Anda, Anda juga dapat menggunakan Google Cloud CLI. Untuk mendapatkan alamat bucket lingkungan Anda, jalankan perintah gcloud CLI berikut:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Ganti:
ENVIRONMENT_NAMEdengan nama lingkungan.LOCATIONdengan region tempat lingkungan berada.
Contoh:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
Buat permintaan API environments.get. Di
resource Environment, di
EnvironmentConfig, di
resource dagGcsPrefix adalah alamat bucket lingkungan Anda.
Contoh:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
Gunakan library google-auth untuk mendapatkan kredensial dan gunakan library requests untuk memanggil REST API.
Menambahkan atau memperbarui DAG
Untuk menambahkan atau memperbarui DAG, pindahkan file .py Python untuk DAG ke folder /dags di bucket lingkungan.
Konsol
Di konsol Google Cloud , buka halaman Environments.
Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda dan di kolom DAGs folder , klik link DAGs. Halaman Bucket details akan terbuka. Halaman ini menampilkan konten folder
/dagsdi bucket lingkungan Anda.Klik Upload files. Kemudian, pilih file
.pyPython untuk DAG menggunakan dialog browser dan konfirmasi.
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
Ganti:
ENVIRONMENT_NAMEdengan nama lingkungan.LOCATIONdengan region tempat lingkungan berada.LOCAL_FILE_TO_UPLOADadalah file.pyPython untuk DAG.
Contoh:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
Memperbarui DAG yang memiliki operasi DAG aktif
Jika Anda memperbarui DAG yang memiliki operasi DAG aktif:
- Semua tugas yang saat ini sedang dieksekusi akan selesai menggunakan file DAG asli.
- Semua tugas yang dijadwalkan tetapi saat ini tidak berjalan akan menggunakan file DAG yang diperbarui.
- Semua tugas yang tidak lagi ada dalam file DAG yang diperbarui akan ditandai sebagai dihapus.
Memperbarui DAG yang berjalan sesuai jadwal yang sering
Setelah Anda mengupload file DAG, Airflow memerlukan waktu untuk memuat file ini dan memperbarui DAG. Jika DAG Anda berjalan sesuai jadwal yang sering, sebaiknya pastikan DAG menggunakan versi file DAG yang diperbarui. Untuk melakukannya:
Jeda DAG di UI Airflow.
Upload file DAG yang diperbarui.
Tunggu hingga Anda melihat pembaruan di UI Airflow. Artinya, DAG diuraikan dengan benar oleh penjadwal dan diperbarui dalam database Airflow.
Jika UI Airflow menampilkan DAG yang diperbarui, hal ini tidak menjamin bahwa pekerja Airflow memiliki versi file DAG yang diperbarui. Hal ini terjadi karena file DAG disinkronkan secara terpisah untuk penjadwal dan pekerja.
Anda mungkin ingin memperpanjang waktu tunggu untuk memastikan file DAG disinkronkan dengan semua pekerja di lingkungan Anda. Sinkronisasi terjadi beberapa kali per menit. Di lingkungan yang sehat, menunggu sekitar 20-30 detik sudah cukup bagi semua pekerja untuk melakukan sinkronisasi.
(Opsional) Jika Anda ingin memastikan sepenuhnya bahwa semua pekerja memiliki versi baru file DAG, periksa log untuk setiap pekerja. Untuk melakukannya:
Buka tab Logs untuk lingkungan Anda di Google Cloud konsol.
Buka item Composer logs > Infrastructure > Cloud Storage sync dan periksa log untuk setiap pekerja di lingkungan Anda. Cari item log
Syncing dags directoryterbaru yang memiliki stempel waktu setelah Anda mengupload file DAG baru. Jika Anda melihat itemFinished syncingyang mengikutinya, DAG akan berhasil disinkronkan di pekerja ini.
Batalkan jeda DAG.
Menguraikan ulang DAG
Karena DAG disimpan di bucket lingkungan, setiap DAG disinkronkan ke prosesor DAG terlebih dahulu, lalu prosesor DAG menguraikannya dengan penundaan singkat. Jika Anda menguraikan ulang DAG secara manual, misalnya, melalui UI Airflow, prosesor DAG akan menguraikan ulang versi DAG saat ini yang tersedia untuknya, yang mungkin bukan versi DAG terbaru yang Anda upload ke bucket lingkungan.
Sebaiknya gunakan penguraian ulang sesuai permintaan hanya jika Anda mengalami waktu penguraian yang lama. Misalnya, hal ini dapat terjadi jika lingkungan Anda memiliki banyak file, atau jika interval penguraian DAG yang panjang dikonfigurasi dalam opsi konfigurasi Airflow.
Menghapus DAG di lingkungan Anda
Untuk menghapus DAG, hapus file .py Python untuk DAG dari folder /dags lingkungan di bucket lingkungan Anda.
Konsol
Di konsol Google Cloud , buka halaman Environments.
Dalam daftar lingkungan, temukan baris dengan nama lingkungan Anda dan di kolom DAGs folder , klik link DAGs. Halaman Bucket details akan terbuka. Halaman ini menampilkan konten folder
/dagsdi bucket lingkungan Anda.Pilih file DAG, klik Delete, lalu konfirmasi operasi.
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
Ganti:
ENVIRONMENT_NAMEdengan nama lingkungan.LOCATIONdengan region tempat lingkungan berada.DAG_FILEdengan file.pyPython untuk DAG.
Contoh:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
Menghapus DAG dari UI Airflow
Untuk menghapus metadata DAG dari UI Airflow:
UI Airflow
- Buka UI Airflow untuk lingkungan Anda.
- Untuk DAG, klik Delete DAG.
gcloud
Jalankan perintah berikut di gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
Ganti:
ENVIRONMENT_NAMEdengan nama lingkungan.LOCATIONdengan region tempat lingkungan berada.DAG_NAMEadalah nama DAG yang akan dihapus.