Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Halaman ini menjelaskan cara kerja penjadwalan dan pemicuan DAG di Airflow, cara menentukan jadwal untuk DAG, dan cara memicu DAG secara manual atau menjedanya.
Tentang DAG Airflow di Managed Airflow
DAG Airflow di Managed Airflow dijalankan di satu atau beberapa lingkungan Managed Airflow di project Anda. Anda mengupload file sumber DAG Airflow ke bucket Cloud Storage yang terkait dengan lingkungan. Instance Airflow lingkungan kemudian menguraikan file ini dan menjadwalkan operasi DAG, seperti yang ditentukan oleh jadwal setiap DAG. Selama operasi DAG, Airflow menjadwalkan dan menjalankan tugas individual yang membentuk DAG dalam urutan yang ditentukan oleh DAG.
Untuk mempelajari lebih lanjut konsep inti Airflow seperti DAG Airflow, operasi DAG, tugas, atau operator, lihat halaman Konsep Inti di dokumentasi Airflow.
Tentang penjadwalan DAG di Airflow
Airflow menyediakan konsep berikut untuk mekanisme penjadwalannya:
- Tanggal logis
Menunjukkan tanggal saat operasi DAG tertentu dijalankan.
Ini bukan tanggal sebenarnya saat Airflow menjalankan DAG, tetapi jangka waktu yang harus diproses oleh operasi DAG tertentu. Misalnya, untuk DAG yang dijadwalkan berjalan setiap hari pukul 12.00, tanggal logisnya juga akan pukul 12.00 pada hari tertentu. Karena berjalan dua kali sehari, jangka waktu yang harus diproses adalah 12 jam terakhir. Pada saat yang sama, logika yang ditentukan dalam DAG itu sendiri mungkin tidak menggunakan tanggal logis atau interval waktu sama sekali. Misalnya, DAG dapat menjalankan skrip yang sama sekali sehari tanpa menggunakan nilai tanggal logis.
Di versi Airflow sebelum 2.2, tanggal ini disebut tanggal eksekusi.
- Tanggal operasi
Menunjukkan tanggal saat operasi DAG tertentu dijalankan.
Misalnya, untuk DAG yang dijadwalkan berjalan setiap hari pukul 12.00, eksekusi DAG yang sebenarnya mungkin terjadi pada pukul 12.05, beberapa saat setelah tanggal logis berlalu.
- Interval jadwal
Menunjukkan kapan dan seberapa sering DAG harus dijalankan, dalam hal tanggal logis.
Misalnya, jadwal harian berarti DAG dijalankan sekali sehari, dan tanggal logis untuk operasi DAG-nya memiliki interval 24 jam.
- Tanggal mulai
Menentukan kapan Anda ingin Airflow mulai menjadwalkan DAG.
Tugas di DAG Anda dapat memiliki tanggal mulai individual, atau Anda dapat menentukan satu tanggal mulai untuk semua tugas. Berdasarkan tanggal mulai minimum untuk tugas di DAG Anda dan pada interval jadwal, Airflow menjadwalkan operasi DAG.
- Catchup, backfill, dan percobaan ulang
Mekanisme untuk menjalankan operasi DAG untuk tanggal sebelumnya.
Catchup menjalankan operasi DAG yang belum berjalan, misalnya, jika DAG dijeda untuk jangka waktu yang lama, lalu tidak dijeda. Anda dapat menggunakan backfill untuk menjalankan operasi DAG untuk rentang tanggal tertentu. Percobaan ulang menentukan berapa kali Airflow harus mencoba saat menjalankan tugas dari DAG.
Penjadwalan berfungsi dengan cara berikut:
Setelah tanggal mulai berlalu, Airflow menunggu kemunculan berikutnya dari interval jadwal.
Airflow menjadwalkan operasi DAG pertama yang akan terjadi di akhir interval jadwal ini.
Misalnya, jika DAG dijadwalkan berjalan setiap jam dan tanggal mulainya adalah pukul 12.00 hari ini, operasi DAG pertama akan terjadi pada pukul 13.00 hari ini.
Bagian Menjadwalkan DAG Airflow dalam dokumen ini menjelaskan cara menyiapkan penjadwalan untuk DAG Anda menggunakan konsep ini. Untuk mengetahui informasi selengkapnya tentang operasi DAG dan penjadwalan, lihat Operasi DAG di dokumentasi Airflow.
Tentang cara memicu DAG
Airflow menyediakan cara berikut untuk memicu DAG:
Memicu sesuai jadwal. Airflow memicu DAG secara otomatis berdasarkan jadwal yang ditentukan untuknya dalam file DAG.
Memicu secara manual. Anda dapat memicu DAG secara manual dari Google Cloud konsol, UI Airflow, atau dengan menjalankan perintah Airflow CLI dari Google Cloud CLI.
Memicu sebagai respons terhadap peristiwa. Cara standar untuk memicu DAG sebagai respons terhadap peristiwa adalah menggunakan sensor.
Cara lain untuk memicu DAG:
Memicu secara terprogram. Anda dapat memicu DAG menggunakan Airflow REST API. Misalnya, dari skrip Python.
Memicu secara terprogram sebagai respons terhadap peristiwa. Anda dapat memicu DAG sebagai respons terhadap peristiwa dengan menggunakan Cloud Run Functions dan Airflow REST API.
Sebelum memulai
- Pastikan akun Anda memiliki peran yang dapat mengelola objek di bucket lingkungan serta melihat dan memicu DAG. Untuk mengetahui informasi selengkapnya, lihat Kontrol akses.
Menjadwalkan DAG Airflow
Anda menentukan jadwal untuk DAG dalam file DAG. Edit definisi DAG dengan cara berikut:
Temukan dan edit file DAG di komputer Anda. Jika tidak memiliki file DAG, Anda dapat mendownload salinannya dari bucket lingkungan. Untuk DAG baru, Anda dapat menentukan semua parameter saat membuat file DAG.
Di parameter
schedule, tentukan jadwalnya. Anda dapat menggunakan ekspresi Cron, seperti0 0 * * *, atau preset, seperti@daily. Untuk mengetahui informasi selengkapnya, lihat Cron dan Interval Waktu di dokumentasi Airflow.Airflow menentukan tanggal logis untuk operasi DAG berdasarkan jadwal yang Anda tetapkan.
Di parameter
start_date, tentukan tanggal mulai.Airflow menentukan tanggal logis operasi DAG pertama menggunakan parameter ini.
(Opsional) Di parameter
catchup, tentukan apakah Airflow harus menjalankan semua operasi DAG sebelumnya dari tanggal mulai hingga tanggal saat ini yang belum dijalankan.Operasi DAG yang dijalankan selama catchup akan memiliki tanggal logis di masa lalu dan tanggal operasinya akan mencerminkan waktu saat operasi DAG benar-benar dijalankan.
(Opsional) Di parameter
retries, tentukan berapa kali Airflow harus mencoba ulang tugas yang gagal (setiap DAG terdiri dari satu atau beberapa tugas individual). Secara default, tugas di Managed Airflow dicoba ulang dua kali.Unggah versi baru DAG ke bucket lingkungan.
Tunggu hingga Airflow berhasil menguraikan DAG. Misalnya, Anda dapat memeriksa daftar DAG di lingkungan Anda di Google Cloud konsol atau di UI Airflow.
Contoh definisi DAG berikut berjalan dua kali sehari pada pukul 00.00 dan 12.00. Tanggal mulainya ditetapkan ke 1 Januari 2024, tetapi Airflow tidak menjalankannya untuk tanggal sebelumnya setelah Anda mengupload atau menjedanya karena catchup dinonaktifkan.
DAG berisi satu tugas bernama insert_query_job, yang menyisipkan baris ke dalam tabel dengan operator BigQueryInsertJobOperator. Operator ini adalah salah satu
Google Cloud Operator BigQuery,
yang dapat Anda gunakan untuk mengelola set data dan tabel, menjalankan kueri, serta memvalidasi data.
Jika eksekusi tugas ini gagal, Airflow akan mencoba ulang empat kali lagi dengan interval percobaan ulang default. Tanggal logis untuk percobaan ulang ini tetap sama.
Kueri SQL untuk baris ini menggunakan template Airflow untuk menulis tanggal logis DAG dan nama ke baris.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Untuk menguji DAG ini, Anda dapat memicunya secara manual dan lalu melihat log eksekusi tugas.
Contoh parameter penjadwalan lainnya
Contoh parameter penjadwalan berikut mengilustrasikan cara kerja penjadwalan dengan berbagai kombinasi parameter:
Jika
start_dateadalahdatetime(2024, 4, 4, 16, 25)danscheduleadalah30 16 * * *, operasi DAG pertama akan terjadi pada pukul 16.30 pada 5 April 2024.Jika
start_dateadalahdatetime(2024, 4, 4, 16, 35)danscheduleadalah30 16 * * *, operasi DAG pertama akan terjadi pada pukul 16.30 pada 6 April 2024. Karena tanggal mulai setelah interval jadwal pada 4 April 2024, operasi DAG tidak terjadi pada 5 April 2024. Sebagai gantinya, interval jadwal berakhir pada pukul 16.35 pada 5 April 2024, sehingga operasi DAG berikutnya dijadwalkan pada pukul 16.30 pada hari berikutnya.Jika
start_dateadalahdatetime(2024, 4, 4), danscheduleadalah@daily, operasi DAG pertama dijadwalkan pada pukul 00.00 pada 5 April 2024.Jika
start_dateadalahdatetime(2024, 4, 4, 16, 30), danscheduleadalah0 * * * *, operasi DAG pertama dijadwalkan pada pukul 18.00 pada 4 April 2024. Setelah tanggal dan waktu yang ditentukan berlalu, Airflow menjadwalkan operasi DAG yang akan terjadi pada menit ke-0 setiap jam. Titik waktu terdekat saat hal ini terjadi adalah pukul 17.00. Pada saat ini, Airflow menjadwalkan operasi DAG yang akan terjadi di akhir interval jadwal, yaitu pada pukul 18.00.
Memicu DAG secara manual
Saat Anda memicu DAG Airflow secara manual, Airflow akan menjalankan DAG sekali, secara independen dari jadwal yang ditentukan dalam file DAG.
Konsol
Untuk memicu DAG dari Google Cloud konsol:
Di Google Cloud konsol, buka halaman Environments.
Pilih lingkungan untuk melihat detailnya.
Di halaman Environment details, buka tab DAGs.
Klik nama DAG.
Di halaman DAG details, klik Trigger DAG. Operasi DAG baru akan dibuat.
UI Airflow
Untuk memicu DAG dari UI Airflow:
Di Google Cloud konsol, buka halaman Environments.
Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.
Login dengan Akun Google yang memiliki izin yang sesuai.
Di UI Airflow, di halaman DAGs, klik tombol Trigger DAG untuk DAG Anda.
gcloud
Jalankan perintah Airflow CLI dags trigger:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Ganti kode berikut:
ENVIRONMENT_NAME: nama lingkungan Anda.LOCATION: region tempat lingkungan berada.DAG_ID: nama DAG.
Untuk mengetahui informasi selengkapnya tentang cara menjalankan perintah Airflow CLI di lingkungan Managed Airflow, lihat Menjalankan perintah Airflow CLI.
Untuk mengetahui informasi selengkapnya tentang perintah Airflow CLI yang tersedia, lihat
referensi perintah gcloud composer environments run.
Melihat log dan detail operasi DAG
Di Google Cloud konsol, Anda dapat:
- Melihat status operasi DAG sebelumnya dan detail DAG.
- Menjelajahi log mendetail dari semua operasi DAG dan semua tugas dari DAG ini.
- Melihat statistik DAG.
Selain itu, Managed Airflow menyediakan akses ke UI Airflow, yang merupakan antarmuka web Airflow sendiri.
Menjeda DAG
Konsol
Untuk menjeda DAG dari Google Cloud konsol:
Di Google Cloud konsol, buka halaman Environments.
Pilih lingkungan untuk melihat detailnya.
Di halaman Environment details, buka tab DAGs.
Klik nama DAG.
Di halaman DAG details, klik Pause DAG.
UI Airflow
Untuk menjeda DAG dari UI Airflow:
- Di Google Cloud konsol, buka halaman Environments.
Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.
Login dengan Akun Google yang memiliki izin yang sesuai.
Di antarmuka web Airflow, di halaman DAGs, klik tombol di samping nama DAG.
gcloud
Jalankan perintah Airflow CLI dags pause:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Ganti kode berikut:
ENVIRONMENT_NAME: nama lingkungan Anda.LOCATION: region tempat lingkungan berada.DAG_ID: nama DAG.
Untuk mengetahui informasi selengkapnya tentang cara menjalankan perintah Airflow CLI di lingkungan Managed Airflow, lihat Menjalankan perintah Airflow CLI.
Untuk mengetahui informasi selengkapnya tentang perintah Airflow CLI yang tersedia, lihat
referensi perintah gcloud composer environments run.