Menjadwalkan dan memicu DAG Airflow

Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)

Halaman ini menjelaskan cara kerja penjadwalan dan pemicuan DAG di Airflow, cara menentukan jadwal untuk DAG, dan cara memicu DAG secara manual atau menjedanya.

Tentang DAG Airflow di Managed Airflow

DAG Airflow di Managed Airflow dijalankan di satu atau beberapa lingkungan Managed Airflow di project Anda. Anda mengupload file sumber DAG Airflow ke bucket Cloud Storage yang terkait dengan lingkungan. Instance Airflow lingkungan kemudian menguraikan file ini dan menjadwalkan operasi DAG, seperti yang ditentukan oleh jadwal setiap DAG. Selama operasi DAG, Airflow menjadwalkan dan menjalankan tugas individual yang membentuk DAG dalam urutan yang ditentukan oleh DAG.

Untuk mempelajari lebih lanjut konsep inti Airflow seperti DAG Airflow, operasi DAG, tugas, atau operator, lihat halaman Konsep Inti di dokumentasi Airflow.

Tentang penjadwalan DAG di Airflow

Airflow menyediakan konsep berikut untuk mekanisme penjadwalannya:

Tanggal logis

Menunjukkan tanggal saat operasi DAG tertentu dijalankan.

Ini bukan tanggal sebenarnya saat Airflow menjalankan DAG, tetapi jangka waktu yang harus diproses oleh operasi DAG tertentu. Misalnya, untuk DAG yang dijadwalkan berjalan setiap hari pukul 12.00, tanggal logisnya juga akan pukul 12.00 pada hari tertentu. Karena berjalan dua kali sehari, jangka waktu yang harus diproses adalah 12 jam terakhir. Pada saat yang sama, logika yang ditentukan dalam DAG itu sendiri mungkin tidak menggunakan tanggal logis atau interval waktu sama sekali. Misalnya, DAG dapat menjalankan skrip yang sama sekali sehari tanpa menggunakan nilai tanggal logis.

Di versi Airflow sebelum 2.2, tanggal ini disebut tanggal eksekusi.

Tanggal operasi

Menunjukkan tanggal saat operasi DAG tertentu dijalankan.

Misalnya, untuk DAG yang dijadwalkan berjalan setiap hari pukul 12.00, eksekusi DAG yang sebenarnya mungkin terjadi pada pukul 12.05, beberapa saat setelah tanggal logis berlalu.

Interval jadwal

Menunjukkan kapan dan seberapa sering DAG harus dijalankan, dalam hal tanggal logis.

Misalnya, jadwal harian berarti DAG dijalankan sekali sehari, dan tanggal logis untuk operasi DAG-nya memiliki interval 24 jam.

Tanggal mulai

Menentukan kapan Anda ingin Airflow mulai menjadwalkan DAG.

Tugas di DAG Anda dapat memiliki tanggal mulai individual, atau Anda dapat menentukan satu tanggal mulai untuk semua tugas. Berdasarkan tanggal mulai minimum untuk tugas di DAG Anda dan pada interval jadwal, Airflow menjadwalkan operasi DAG.

Catchup, backfill, dan percobaan ulang

Mekanisme untuk menjalankan operasi DAG untuk tanggal sebelumnya.

Catchup menjalankan operasi DAG yang belum berjalan, misalnya, jika DAG dijeda untuk jangka waktu yang lama, lalu tidak dijeda. Anda dapat menggunakan backfill untuk menjalankan operasi DAG untuk rentang tanggal tertentu. Percobaan ulang menentukan berapa kali Airflow harus mencoba saat menjalankan tugas dari DAG.

Penjadwalan berfungsi dengan cara berikut:

  1. Setelah tanggal mulai berlalu, Airflow menunggu kemunculan berikutnya dari interval jadwal.

  2. Airflow menjadwalkan operasi DAG pertama yang akan terjadi di akhir interval jadwal ini.

    Misalnya, jika DAG dijadwalkan berjalan setiap jam dan tanggal mulainya adalah pukul 12.00 hari ini, operasi DAG pertama akan terjadi pada pukul 13.00 hari ini.

Bagian Menjadwalkan DAG Airflow dalam dokumen ini menjelaskan cara menyiapkan penjadwalan untuk DAG Anda menggunakan konsep ini. Untuk mengetahui informasi selengkapnya tentang operasi DAG dan penjadwalan, lihat Operasi DAG di dokumentasi Airflow.

Tentang cara memicu DAG

Airflow menyediakan cara berikut untuk memicu DAG:

Cara lain untuk memicu DAG:

Sebelum memulai

  • Pastikan akun Anda memiliki peran yang dapat mengelola objek di bucket lingkungan serta melihat dan memicu DAG. Untuk mengetahui informasi selengkapnya, lihat Kontrol akses.

Menjadwalkan DAG Airflow

Anda menentukan jadwal untuk DAG dalam file DAG. Edit definisi DAG dengan cara berikut:

  1. Temukan dan edit file DAG di komputer Anda. Jika tidak memiliki file DAG, Anda dapat mendownload salinannya dari bucket lingkungan. Untuk DAG baru, Anda dapat menentukan semua parameter saat membuat file DAG.

  2. Di parameter schedule, tentukan jadwalnya. Anda dapat menggunakan ekspresi Cron, seperti 0 0 * * *, atau preset, seperti @daily. Untuk mengetahui informasi selengkapnya, lihat Cron dan Interval Waktu di dokumentasi Airflow.

    Airflow menentukan tanggal logis untuk operasi DAG berdasarkan jadwal yang Anda tetapkan.

  3. Di parameter start_date, tentukan tanggal mulai.

    Airflow menentukan tanggal logis operasi DAG pertama menggunakan parameter ini.

  4. (Opsional) Di parameter catchup, tentukan apakah Airflow harus menjalankan semua operasi DAG sebelumnya dari tanggal mulai hingga tanggal saat ini yang belum dijalankan.

    Operasi DAG yang dijalankan selama catchup akan memiliki tanggal logis di masa lalu dan tanggal operasinya akan mencerminkan waktu saat operasi DAG benar-benar dijalankan.

  5. (Opsional) Di parameter retries, tentukan berapa kali Airflow harus mencoba ulang tugas yang gagal (setiap DAG terdiri dari satu atau beberapa tugas individual). Secara default, tugas di Managed Airflow dicoba ulang dua kali.

  6. Unggah versi baru DAG ke bucket lingkungan.

  7. Tunggu hingga Airflow berhasil menguraikan DAG. Misalnya, Anda dapat memeriksa daftar DAG di lingkungan Anda di Google Cloud konsol atau di UI Airflow.

Contoh definisi DAG berikut berjalan dua kali sehari pada pukul 00.00 dan 12.00. Tanggal mulainya ditetapkan ke 1 Januari 2024, tetapi Airflow tidak menjalankannya untuk tanggal sebelumnya setelah Anda mengupload atau menjedanya karena catchup dinonaktifkan.

DAG berisi satu tugas bernama insert_query_job, yang menyisipkan baris ke dalam tabel dengan operator BigQueryInsertJobOperator. Operator ini adalah salah satu Google Cloud Operator BigQuery, yang dapat Anda gunakan untuk mengelola set data dan tabel, menjalankan kueri, serta memvalidasi data. Jika eksekusi tugas ini gagal, Airflow akan mencoba ulang empat kali lagi dengan interval percobaan ulang default. Tanggal logis untuk percobaan ulang ini tetap sama.

Kueri SQL untuk baris ini menggunakan template Airflow untuk menulis tanggal logis DAG dan nama ke baris.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

Untuk menguji DAG ini, Anda dapat memicunya secara manual dan lalu melihat log eksekusi tugas.

Contoh parameter penjadwalan lainnya

Contoh parameter penjadwalan berikut mengilustrasikan cara kerja penjadwalan dengan berbagai kombinasi parameter:

  • Jika start_date adalah datetime(2024, 4, 4, 16, 25) dan schedule adalah 30 16 * * *, operasi DAG pertama akan terjadi pada pukul 16.30 pada 5 April 2024.

  • Jika start_date adalah datetime(2024, 4, 4, 16, 35) dan schedule adalah 30 16 * * *, operasi DAG pertama akan terjadi pada pukul 16.30 pada 6 April 2024. Karena tanggal mulai setelah interval jadwal pada 4 April 2024, operasi DAG tidak terjadi pada 5 April 2024. Sebagai gantinya, interval jadwal berakhir pada pukul 16.35 pada 5 April 2024, sehingga operasi DAG berikutnya dijadwalkan pada pukul 16.30 pada hari berikutnya.

  • Jika start_date adalah datetime(2024, 4, 4), dan schedule adalah @daily, operasi DAG pertama dijadwalkan pada pukul 00.00 pada 5 April 2024.

  • Jika start_date adalah datetime(2024, 4, 4, 16, 30), dan schedule adalah 0 * * * *, operasi DAG pertama dijadwalkan pada pukul 18.00 pada 4 April 2024. Setelah tanggal dan waktu yang ditentukan berlalu, Airflow menjadwalkan operasi DAG yang akan terjadi pada menit ke-0 setiap jam. Titik waktu terdekat saat hal ini terjadi adalah pukul 17.00. Pada saat ini, Airflow menjadwalkan operasi DAG yang akan terjadi di akhir interval jadwal, yaitu pada pukul 18.00.

Memicu DAG secara manual

Saat Anda memicu DAG Airflow secara manual, Airflow akan menjalankan DAG sekali, secara independen dari jadwal yang ditentukan dalam file DAG.

Konsol

Untuk memicu DAG dari Google Cloud konsol:

  1. Di Google Cloud konsol, buka halaman Environments.

    Buka Environments

  2. Pilih lingkungan untuk melihat detailnya.

  3. Di halaman Environment details, buka tab DAGs.

  4. Klik nama DAG.

  5. Di halaman DAG details, klik Trigger DAG. Operasi DAG baru akan dibuat.

UI Airflow

Untuk memicu DAG dari UI Airflow:

  1. Di Google Cloud konsol, buka halaman Environments.

    Buka Environments

  2. Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.

  3. Login dengan Akun Google yang memiliki izin yang sesuai.

  4. Di UI Airflow, di halaman DAGs, klik tombol Trigger DAG untuk DAG Anda.

gcloud

Jalankan perintah Airflow CLI dags trigger:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Ganti kode berikut:

  • ENVIRONMENT_NAME: nama lingkungan Anda.
  • LOCATION: region tempat lingkungan berada.
  • DAG_ID: nama DAG.

Untuk mengetahui informasi selengkapnya tentang cara menjalankan perintah Airflow CLI di lingkungan Managed Airflow, lihat Menjalankan perintah Airflow CLI.

Untuk mengetahui informasi selengkapnya tentang perintah Airflow CLI yang tersedia, lihat referensi perintah gcloud composer environments run.

Melihat log dan detail operasi DAG

Di Google Cloud konsol, Anda dapat:

Selain itu, Managed Airflow menyediakan akses ke UI Airflow, yang merupakan antarmuka web Airflow sendiri.

Menjeda DAG

Konsol

Untuk menjeda DAG dari Google Cloud konsol:

  1. Di Google Cloud konsol, buka halaman Environments.

    Buka Environments

  2. Pilih lingkungan untuk melihat detailnya.

  3. Di halaman Environment details, buka tab DAGs.

  4. Klik nama DAG.

  5. Di halaman DAG details, klik Pause DAG.

UI Airflow

Untuk menjeda DAG dari UI Airflow:

  1. Di Google Cloud konsol, buka halaman Environments.

Buka Environments

  1. Di kolom Airflow webserver, ikuti link Airflow untuk lingkungan Anda.

  2. Login dengan Akun Google yang memiliki izin yang sesuai.

  3. Di antarmuka web Airflow, di halaman DAGs, klik tombol di samping nama DAG.

gcloud

Jalankan perintah Airflow CLI dags pause:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

Ganti kode berikut:

  • ENVIRONMENT_NAME: nama lingkungan Anda.
  • LOCATION: region tempat lingkungan berada.
  • DAG_ID: nama DAG.

Untuk mengetahui informasi selengkapnya tentang cara menjalankan perintah Airflow CLI di lingkungan Managed Airflow, lihat Menjalankan perintah Airflow CLI.

Untuk mengetahui informasi selengkapnya tentang perintah Airflow CLI yang tersedia, lihat referensi perintah gcloud composer environments run.

Langkah berikutnya