Memecahkan masalah DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Halaman ini memberikan langkah-langkah dan informasi pemecahan masalah untuk masalah alur kerja umum.

Banyak masalah eksekusi DAG disebabkan oleh performa lingkungan yang tidak optimal. Anda dapat mengoptimalkan lingkungan dengan mengikuti panduan Mengoptimalkan performa dan biaya lingkungan.

Beberapa masalah eksekusi DAG mungkin disebabkan oleh penjadwal Airflow yang tidak berfungsi dengan benar atau optimal. Ikuti petunjuk pemecahan masalah Penjadwal untuk menyelesaikan masalah ini.

Memecahkan masalah alur kerja

Untuk memulai pemecahan masalah:

  1. Periksa log Airflow.

    Anda dapat meningkatkan tingkat logging Airflow dengan mengganti opsi konfigurasi Airflow berikut.

    Bagian Kunci Nilai
    logging logging_level Nilai defaultnya adalah INFO. Setel ke DEBUG untuk mendapatkan lebih banyak informasi dalam pesan log.
  2. Periksa Dasbor Monitoring.

  3. Tinjau Cloud Monitoring.

  4. Di konsol Google Cloud , periksa apakah ada error di halaman untuk komponen lingkungan Anda.

  5. Di antarmuka web Airflow, periksa Tampilan Grafik DAG untuk instance tugas yang gagal.

    Bagian Kunci Nilai
    webserver dag_orientation LR, TB, RL, atau BT

Menyelidiki kegagalan tugas Airflow dengan Gemini Cloud Assist

Investigasi Gemini Cloud Assist adalah alat analisis akar masalah (RCA) untuk memecahkan masalah infrastruktur dan aplikasi Anda di lingkungan cloud yang kompleks dan terdistribusi. Investigasi dapat membantu Anda memahami, mendiagnosis, dan menyelesaikan masalah di Google Cloud. Dengan investigasi, Anda dapat menyederhanakan respons insiden dengan mengurangi waktu penyelesaian dan meningkatkan ketersediaan secara keseluruhan, semuanya dengan lebih sedikit upaya.

Di Cloud Composer, Anda dapat memulai penyelidikan Gemini Cloud Assist untuk tugas Airflow yang gagal dari UI DAG. Cloud Composer akan otomatis mengisi detail seperti deskripsi masalah dan rentang waktu, serta menyertakan lingkungan Anda sebagai resource yang relevan.

Memulai dan melihat investigasi

Untuk memulai investigasi Gemini Cloud Assist baru untuk tugas Airflow yang gagal, atau untuk melihat investigasi yang ada:

  1. Di konsol Google Cloud , buka halaman Environments.

    Buka Environments

  2. Pilih lingkungan untuk melihat detailnya.

  3. Di halaman Environment details, buka tab DAGs.

  4. Klik nama DAG.

  5. Di halaman DAG details, buka tab Run history, lalu klik eksekusi DAG yang memiliki tugas yang gagal.

  6. Di kolom State tugas Airflow yang gagal, klik Investigate:

    1. Untuk tugas yang baru diselidiki, klik Selidiki.
    2. Jika tugas sudah memiliki investigasi, klik Lihat investigasi untuk meninjau investigasi yang ada. Sebagai alternatif, Anda dapat memulai investigasi lain dengan mengklik Investigasi baru.
  7. Lanjutkan ke membuat, menjalankan, dan meninjau investigasi dengan Gemini Cloud Assist.

Contoh penyelidikan

Contoh ini menunjukkan proses menyelidiki tugas yang gagal.

  1. Di dasbor Monitoring > Statistik DAG, amati eksekusi DAG yang gagal:

    Grafik Operasi DAG selesai di dasbor Monitoring menampilkan
    beberapa operasi DAG yang gagal
    Gambar 1. Grafik operasi DAG selesai (klik untuk memperbesar)
  2. Buka DAG. Kolom Failed runs (1h) menunjukkan bahwa DAG create_large_txt_file_print_logs mengalami beberapa operasi yang gagal dalam satu jam terakhir. Klik nama DAG.

    Daftar DAG menampilkan operasi DAG yang gagal untuk DAG
    create_large_txt_file_print_logs
    Gambar 2. Daftar DAG dengan statistik pengoperasian DAG (klik untuk memperbesar)
  3. Klik salah satu DAG yang gagal dijalankan, lalu klik Selidiki di samping entri tugas Airflow yang gagal dan mulai penyelidikan.

    Daftar tugas dalam DAG menampilkan tugas Airflow yang gagal dengan tombol
    selidiki di sampingnya
    Gambar 3. Daftar tugas di DAG yang gagal (klik untuk memperbesar)
  4. Tunggu hingga penyelidikan selesai.

  5. Daftar pengamatan yang relevan menjelaskan proses penyelidikan. Dalam contoh khusus ini, tugas gagal tanpa menghasilkan log, tetapi Gemini Cloud Assist dapat menemukan penyebab kegagalan di log penjadwal Airflow, tempat tugas dihentikan sebagai zombie.

    Pengamatan berjudul Zobmie Job Detected, dengan detail tentang temuan yang relevan dalam log scheduler Airflow
    Gambar 4. Pengamatan Zombie Job Detected (klik untuk memperbesar)
  6. Terakhir, Gemini Cloud Assist meringkas temuan dan memberikan hipotesis beserta rekomendasi untuk memperbaiki masalah. Dalam contoh ini, tugas gagal karena pekerja Airflow tidak memiliki cukup resource untuk memprosesnya. Hal ini didukung oleh pengamatan Pod pekerja yang dimulai ulang beberapa kali dengan error OOM, dan tugas yang kemudian dihentikan oleh penjadwal sebagai tugas zombie.

    Hipotesis berjudul Airflow Worker Pod Resource Exhaustion, dengan detail dan perbaikan yang direkomendasikan
    Gambar 5. Hipotesis Kehabisan Resource Pod Airflow Worker (klik untuk memperbesar)

Men-debug kegagalan operator

Untuk men-debug kegagalan operator:

  1. Periksa error khusus tugas.
  2. Periksa log Airflow.
  3. Tinjau Cloud Monitoring.
  4. Periksa log khusus operator.
  5. Perbaiki error.
  6. Upload DAG ke folder /dags.
  7. Di antarmuka web Airflow, hapus status sebelumnya untuk DAG.
  8. Lanjutkan atau jalankan DAG.

Memecahkan masalah eksekusi tugas

Airflow adalah sistem terdistribusi dengan banyak entitas seperti scheduler, executor, dan pekerja yang berkomunikasi satu sama lain melalui antrean tugas dan database Airflow serta mengirim sinyal (seperti SIGTERM). Diagram berikut menunjukkan ringkasan interkoneksi antara komponen Airflow.

Interaksi antara komponen Airflow
Gambar 6. Interaksi antara komponen Airflow (klik untuk memperbesar)

Dalam sistem terdistribusi seperti Airflow, mungkin ada beberapa masalah konektivitas jaringan, atau infrastruktur yang mendasarinya mungkin mengalami masalah sesekali; hal ini dapat menyebabkan situasi ketika tugas dapat gagal dan dijadwalkan ulang untuk dieksekusi, atau tugas mungkin tidak berhasil diselesaikan (misalnya, tugas Zombie, atau tugas yang macet dalam eksekusi). Airflow memiliki mekanisme untuk mengatasi situasi tersebut dan otomatis melanjutkan fungsi normal. Bagian berikut menjelaskan masalah umum yang terjadi selama eksekusi tugas oleh Airflow.

Memecahkan masalah tugas KubernetesExecutor

CeleryKubernetesExecutor adalah jenis eksekutor di Cloud Composer 3 yang dapat menggunakan CeleryExecutor dan KubernetesExecutor secara bersamaan.

Lihat halaman Menggunakan CeleryKubernetesExecutor untuk mengetahui informasi selengkapnya tentang cara memecahkan masalah tugas yang dijalankan dengan KubernetesExecutor.

Tugas gagal tanpa mengeluarkan log apa pun

Tugas gagal tanpa mengeluarkan log karena error penguraian DAG

Terkadang ada error DAG yang tidak terlihat jelas yang menyebabkan situasi di mana penjadwal Airflow dapat menjadwalkan tugas untuk dieksekusi, pemroses DAG dapat mem-parsing file DAG, tetapi kemudian pekerja Airflow gagal mengeksekusi tugas dari DAG karena ada error pemrograman dalam file DAG. Hal ini dapat menyebabkan tugas Airflow ditandai sebagai Failed dan tidak ada log dari eksekusinya.

Solusi:

  • Verifikasi di log pekerja Airflow bahwa tidak ada error yang dimunculkan oleh pekerja Airflow yang terkait dengan DAG yang tidak ada atau error parsing DAG.

  • Tingkatkan parameter yang terkait dengan penguraian DAG:

    • Tingkatkan [dagbag-import-timeout][ext-airflow-dagrun-import-timeout] menjadi setidaknya 120 detik (atau lebih, jika diperlukan).

    • Tingkatkan dag-file-processor-timeout menjadi setidaknya 180 detik (atau lebih, jika diperlukan). Nilai ini harus lebih tinggi dari dagbag-import-timeout.

  • Lihat juga Memecahkan masalah DAG Processor.

Tugas terganggu secara tiba-tiba

Selama eksekusi tugas, pekerja Airflow dapat berhenti secara tiba-tiba karena masalah yang tidak secara khusus terkait dengan tugas itu sendiri. Lihat Penyebab masalah umum untuk mengetahui daftar skenario tersebut dan kemungkinan solusinya. Bagian berikut mencakup beberapa gejala tambahan yang dapat berasal dari akar penyebab tersebut:

Tugas zombie

Airflow mendeteksi dua jenis ketidakcocokan antara tugas dan proses yang menjalankan tugas:

  • Tugas zombie adalah tugas yang seharusnya berjalan, tetapi tidak berjalan. Hal ini dapat terjadi jika proses tugas dihentikan atau tidak merespons, jika pekerja Airflow tidak melaporkan status tugas tepat waktu karena kelebihan beban, atau jika VM tempat tugas dijalankan dimatikan. Airflow menemukan tugas tersebut secara berkala, dan gagal atau mencoba ulang tugas, bergantung pada setelan tugas.

    Menemukan tugas zombie

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Tugas yang tidak aktif adalah tugas yang seharusnya tidak berjalan. Airflow menemukan tugas tersebut secara berkala dan menghentikannya.

Lihat Penyebab utama umum untuk mengetahui informasi tambahan tentang cara memecahkan masalah tugas Zombie.

Sinyal SIGTERM

Sinyal SIGTERM digunakan oleh Linux, Kubernetes, penjadwal Airflow, dan Celery untuk menghentikan proses yang bertanggung jawab untuk menjalankan tugas Airflow atau pekerja Airflow.

Ada beberapa alasan mengapa sinyal SIGTERM dikirim di suatu lingkungan:

  • Tugas menjadi tugas Zombie dan harus dihentikan.

  • Scheduler menemukan duplikat tugas dan mengirimkan sinyal Terminating instance dan SIGTERM ke tugas untuk menghentikannya.

  • Dalam Horizontal Pod Autoscaling, Bidang Kontrol GKE mengirimkan sinyal SIGTERM untuk menghapus Pod yang tidak lagi diperlukan.

  • Penjadwal dapat mengirim sinyal SIGTERM ke proses DagFileProcessorManager. Sinyal SIGTERM tersebut digunakan oleh Penjadwal untuk mengelola siklus proses DagFileProcessorManager dan dapat diabaikan dengan aman.

    Contoh:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Kondisi persaingan antara callback detak jantung dan callback keluar di local_task_job, yang memantau eksekusi tugas. Jika detak jantung mendeteksi bahwa tugas ditandai sebagai berhasil, detak jantung tidak dapat membedakan apakah tugas itu sendiri berhasil atau Airflow diberi tahu untuk menganggap tugas tersebut berhasil. Namun, perintah ini akan menghentikan peluncur tugas, tanpa menunggu hingga keluar.

    Sinyal SIGTERM tersebut dapat diabaikan dengan aman. Tugas sudah dalam status berhasil dan eksekusi proses DAG secara keseluruhan tidak akan terpengaruh.

    Entri log Received SIGTERM. adalah satu-satunya perbedaan antara keluar normal dan penghentian tugas dalam status berhasil.

    Kondisi persaingan antara callback keluar dan detak jantung
    Gambar 7. Kondisi persaingan antara heartbeat dan callback keluar (klik untuk memperbesar)
  • Komponen Airflow menggunakan lebih banyak resource (CPU, memori) daripada yang diizinkan oleh node cluster.

  • Layanan GKE melakukan operasi pemeliharaan dan mengirim sinyal SIGTERM ke Pod yang berjalan di node yang akan diupgrade.

    Saat instance tugas dihentikan dengan SIGTERM, Anda dapat melihat entri log berikut di log worker Airflow yang menjalankan tugas:

    {local_task_job.py:211} WARNING - State of this instance has been externally
    set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
    SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
    with exception
    

Kemungkinan solusi:

Masalah ini terjadi saat VM yang menjalankan tugas kehabisan memori. Hal ini tidak terkait dengan konfigurasi Airflow, tetapi dengan jumlah memori yang tersedia untuk VM.

  • Di Cloud Composer 3, Anda dapat menetapkan lebih banyak resource CPU dan memori ke worker Airflow.

  • Anda dapat menurunkan nilai opsi konfigurasi Airflow serentak [celery]worker_concurrency. Opsi ini menentukan jumlah tugas yang dijalankan secara serentak oleh worker Airflow tertentu.

Untuk mengetahui informasi selengkapnya tentang cara mengoptimalkan lingkungan Anda, lihat Mengoptimalkan performa dan biaya lingkungan.

Tugas Airflow terganggu oleh Negsignal.SIGKILL

Terkadang, tugas Anda mungkin menggunakan lebih banyak memori daripada yang dialokasikan untuk pekerja Airflow. Dalam situasi seperti itu, operasi mungkin terganggu oleh Negsignal.SIGKILL. Sistem mengirimkan sinyal ini untuk menghindari konsumsi memori lebih lanjut yang dapat memengaruhi eksekusi tugas Airflow lainnya. Di log pekerja Airflow, Anda mungkin melihat entri log berikut:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL juga dapat muncul sebagai kode -9.

Kemungkinan solusi:

  • worker_concurrency pekerja Airflow yang lebih rendah.

  • Tingkatkan jumlah memori yang tersedia untuk pekerja Airflow.

  • Kelola tugas yang memerlukan banyak resource di Cloud Composer menggunakan KubernetesPodOperator atau GKEStartPodOperator untuk pemisahan tugas dan alokasi resource yang disesuaikan.

  • Optimalkan tugas Anda agar menggunakan lebih sedikit memori.

Tugas gagal karena tekanan resource

Gejala: selama eksekusi tugas, subproses pekerja Airflow yang bertanggung jawab untuk eksekusi tugas Airflow terganggu secara tiba-tiba. Error yang terlihat di log pekerja Airflow mungkin terlihat mirip dengan yang di bawah ini:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solusi:

Tugas gagal karena pengusiran Pod

Pod Google Kubernetes Engine tunduk pada Siklus Proses Pod Kubernetes dan pengusiran Pod. Lonjakan tugas adalah penyebab paling umum pengusiran Pod di Cloud Composer.

Pengusiran Pod dapat terjadi saat Pod tertentu menggunakan resource node secara berlebihan, dibandingkan dengan ekspektasi konsumsi resource yang dikonfigurasi untuk node tersebut. Misalnya, pengusiran dapat terjadi saat beberapa tugas yang menggunakan banyak memori berjalan di Pod, dan beban gabungannya menyebabkan node tempat Pod ini berjalan melebihi batas konsumsi memori.

Jika Pod pekerja Airflow dikeluarkan, semua instance tugas yang berjalan di Pod tersebut akan terganggu, dan kemudian ditandai sebagai gagal oleh Airflow.

Log di-buffer. Jika Pod pekerja dikeluarkan sebelum buffer dikosongkan, log tidak akan dikeluarkan. Kegagalan tugas tanpa log menunjukkan bahwa pekerja Airflow dimulai ulang karena kehabisan memori (OOM). Beberapa log mungkin ada di Cloud Logging meskipun log Airflow tidak dipancarkan.

Untuk melihat log:

  1. Di konsol Google Cloud , buka halaman Environments.

    Buka Environments

  2. Dalam daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.

  3. Buka tab Log.

  4. Lihat log setiap pekerja Airflow di bagian Semua log > Log Airflow > Pekerja.

Solusi:

  • Meningkatkan batas memori untuk pekerja Airflow.

  • Pastikan tugas dalam DAG bersifat idempoten dan dapat dicoba ulang.

  • Hindari mendownload file yang tidak perlu ke sistem file lokal pekerja Airflow.

    Worker Airflow memiliki kapasitas sistem file lokal yang terbatas. Pekerja Airflow dapat memiliki penyimpanan dari 1 GB hingga 10 GB. Jika ruang penyimpanan habis, Pod pekerja Airflow akan dikeluarkan oleh Bidang Kontrol GKE. Tindakan ini akan membuat semua tugas yang sedang dieksekusi oleh pekerja yang dikeluarkan gagal.

    Contoh operasi yang bermasalah:

    • Mendownload file atau objek dan menyimpannya secara lokal di pekerja Airflow. Sebagai gantinya, simpan objek ini langsung di layanan yang sesuai seperti bucket Cloud Storage.
    • Mengakses objek besar di folder /data dari pekerja Airflow. Pekerja Airflow mendownload objek ke sistem file lokalnya. Sebagai gantinya, terapkan DAG Anda sehingga file besar diproses di luar Pod pekerja Airflow.

Penyebab utama yang umum

Worker Airflow kehabisan memori

Setiap pekerja Airflow dapat menjalankan hingga [celery]worker_concurrency instance tugas secara bersamaan. Jika konsumsi memori kumulatif dari instance tugas tersebut melebihi batas memori untuk pekerja Airflow, proses acak di instance tersebut akan dihentikan untuk membebaskan resource.

Terkadang, kekurangan memori pada pekerja Airflow dapat menyebabkan paket yang salah bentuk dikirim selama sesi SQL Alchemy ke database, ke server DNS, atau ke layanan lain yang dipanggil oleh DAG. Dalam hal ini, ujung koneksi lainnya mungkin menolak atau menghentikan koneksi dari pekerja Airflow. Contoh:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

Solusi:

Pekerja Airflow dikeluarkan

Penghapusan pod adalah bagian yang normal saat menjalankan workload di Kubernetes. GKE mengeluarkan pod jika kehabisan penyimpanan atau untuk membebaskan resource bagi workload dengan prioritas yang lebih tinggi.

Solusi:

Worker Airflow dihentikan

Worker Airflow mungkin dihapus secara eksternal. Jika tugas yang sedang berjalan tidak selesai selama periode penghentian yang benar, tugas tersebut akan terganggu dan mungkin terdeteksi sebagai zombie.

Kemungkinan skenario dan solusi:

  • Worker Airflow dimulai ulang selama modifikasi lingkungan, seperti upgrade atau penginstalan paket:

    Menemukan modifikasi lingkungan Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Anda dapat melakukan operasi tersebut saat tidak ada tugas penting yang sedang berjalan atau mengaktifkan percobaan ulang tugas.

  • Berbagai komponen mungkin tidak tersedia untuk sementara selama operasi pemeliharaan.

    Anda dapat menentukan masa pemeliharaan untuk meminimalkan

    tumpang-tindih dengan eksekusi tugas penting.

Worker Airflow mengalami beban berat

Jumlah resource CPU dan memori yang tersedia untuk pekerja Airflow dibatasi oleh konfigurasi lingkungan. Jika pemanfaatan resource mendekati batas, hal ini dapat menyebabkan perebutan resource dan penundaan yang tidak perlu selama eksekusi tugas. Dalam situasi ekstrem, jika kekurangan sumber daya selama jangka waktu yang lebih lama, hal ini dapat menyebabkan tugas zombie.

Solusi:

Database Airflow mengalami beban berat

Database digunakan oleh berbagai komponen Airflow untuk berkomunikasi satu sama lain dan, khususnya, untuk menyimpan detak jantung instance tugas. Kekurangan resource di database menyebabkan waktu kueri yang lebih lama dan dapat memengaruhi eksekusi tugas.

Terkadang, error berikut ada di log pekerja Airflow:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

Solusi:

Database Airflow tidak tersedia untuk sementara

Worker Airflow mungkin memerlukan waktu untuk mendeteksi dan menangani error intermiten dengan baik, seperti masalah konektivitas sementara. Mungkin melebihi nilai minimum deteksi zombie default.

Menemukan waktu tunggu heartbeat Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Solusi:

  • Tingkatkan waktu tunggu untuk tugas zombie dan ganti nilai opsi konfigurasi Airflow [scheduler]scheduler_zombie_task_threshold:

    Bagian Kunci Nilai Catatan
    scheduler scheduler_zombie_task_threshold New timeout (in seconds) Nilai defaultnya adalah 300

Tugas gagal karena terjadi error selama eksekusi

Menghentikan instance

Airflow menggunakan mekanisme menghentikan instance untuk mematikan tugas Airflow. Mekanisme ini digunakan dalam situasi berikut:

  • Saat penjadwal menghentikan tugas yang tidak selesai tepat waktu.
  • Saat tugas mengalami waktu tunggu atau dieksekusi terlalu lama.

Saat Airflow menghentikan instance tugas, Anda dapat melihat entri log berikut dalam log worker Airflow yang menjalankan tugas:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Terminating instance.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Kemungkinan solusi:

  • Periksa kode tugas untuk mengetahui apakah ada error yang dapat menyebabkan tugas berjalan terlalu lama.

  • Tingkatkan CPU dan memori untuk pekerja Airflow, sehingga tugas dapat dieksekusi lebih cepat.

  • Tingkatkan nilai opsi konfigurasi Airflow [celery_broker_transport_options]visibility_timeout.

    Akibatnya, penjadwal menunggu lebih lama hingga tugas selesai, sebelum menganggap tugas tersebut sebagai tugas Zombie. Opsi ini sangat berguna untuk tugas yang memakan waktu berjam-jam. Jika nilainya terlalu rendah (misalnya, 3 jam), penjadwal akan menganggap tugas yang berjalan selama 5 atau 6 jam sebagai "tergantung" (tugas Zombie).

  • Tingkatkan nilai opsi konfigurasi Airflow [core]killed_task_cleanup_time.

    Nilai yang lebih panjang memberi pekerja Airflow lebih banyak waktu untuk menyelesaikan tugas mereka dengan baik. Jika nilainya terlalu rendah, tugas Airflow dapat terganggu secara tiba-tiba, tanpa cukup waktu untuk menyelesaikan tugasnya dengan baik.

Eksekusi DAG tidak berakhir dalam waktu yang diharapkan

Gejala:

Terkadang, eksekusi DAG tidak berakhir karena tugas Airflow macet dan eksekusi DAG berlangsung lebih lama dari yang diharapkan. Dalam kondisi normal, tugas Airflow tidak akan berada dalam status antrean atau berjalan tanpa batas waktu, karena Airflow memiliki prosedur waktu tunggu dan pembersihan yang membantu menghindari situasi ini.

Perbaikan:

  • Gunakan parameter dagrun_timeout untuk DAG. Misalnya: dagrun_timeout=timedelta(minutes=120) Akibatnya, setiap proses DAG harus diselesaikan dalam waktu tunggu proses DAG. Untuk mengetahui informasi selengkapnya tentang status tugas Airflow, lihat dokumentasi Apache Airflow.

  • Gunakan parameter task execution timeout untuk menentukan waktu tunggu default bagi tugas yang berjalan berdasarkan operator Apache Airflow.

Koneksi ke server Postgres terputus selama pengecualian kueri terjadi selama eksekusi tugas atau tepat setelahnya

Pengecualian Lost connection to Postgres server during query sering terjadi jika kondisi berikut terpenuhi:

  • DAG Anda menggunakan PythonOperator atau operator kustom.
  • DAG Anda membuat kueri ke database Airflow.

Jika beberapa kueri dibuat dari fungsi yang dapat dipanggil, traceback mungkin salah menunjuk ke baris self.refresh_from_db(lock_for_update=True) dalam kode Airflow; ini adalah kueri database pertama setelah eksekusi tugas. Penyebab sebenarnya pengecualian terjadi sebelum ini, saat sesi SQLAlchemy tidak ditutup dengan benar.

Sesi SQLAlchemy dicakup ke thread dan dibuat dalam fungsi yang dapat dipanggil. Sesi dapat dilanjutkan nanti di dalam kode Airflow. Jika ada penundaan yang signifikan antara kueri dalam satu sesi, koneksi mungkin sudah ditutup oleh server Postgres. Waktu tunggu koneksi di lingkungan Cloud Composer ditetapkan sekitar 10 menit.

Solusi:

  • Gunakan dekorator airflow.utils.db.provide_session. Dekorator ini menyediakan sesi yang valid ke database Airflow dalam parameter session dan menutup sesi dengan benar di akhir fungsi.
  • Jangan gunakan satu fungsi yang berjalan lama. Sebagai gantinya, pindahkan semua kueri database ke fungsi terpisah, sehingga ada beberapa fungsi dengan dekorator airflow.utils.db.provide_session. Dalam hal ini, sesi ditutup secara otomatis setelah mengambil hasil kueri.

Gangguan sementara saat menghubungkan ke DB Metadata Airflow

Cloud Composer berjalan di atas infrastruktur terdistribusi. Artinya, dari waktu ke waktu, beberapa masalah sementara dapat muncul dan dapat mengganggu eksekusi tugas Airflow Anda.

Dalam situasi seperti itu, Anda mungkin melihat pesan error berikut di log pekerja Airflow:

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

atau

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Masalah terputus-putus seperti ini juga dapat disebabkan oleh operasi pemeliharaan yang dilakukan untuk lingkungan Cloud Composer Anda.

Biasanya, error tersebut bersifat sementara dan jika tugas Airflow Anda bersifat idempoten dan Anda telah mengonfigurasi percobaan ulang, error tersebut tidak akan memengaruhi Anda. Anda juga dapat mempertimbangkan menentukan masa pemeliharaan.

Alasan tambahan untuk error tersebut mungkin adalah kurangnya resource di cluster lingkungan Anda. Dalam kasus seperti itu, Anda dapat menskalakan atau mengoptimalkan lingkungan seperti yang dijelaskan dalam petunjuk Menskalakan lingkungan atau Mengoptimalkan lingkungan.

Operasi DAG ditandai sebagai berhasil, tetapi tidak memiliki tugas yang dijalankan

Jika eksekusi DAG execution_date lebih awal dari start_date DAG, Anda mungkin melihat eksekusi DAG yang tidak memiliki eksekusi tugas, tetapi masih ditandai sebagai berhasil.

Operasi DAG berhasil tanpa tugas yang dijalankan
Gambar 8. Operasi DAG berhasil tanpa tugas yang dijalankan (klik untuk memperbesar)

Penyebab

Situasi ini mungkin terjadi dalam salah satu kasus berikut:

  • Ketidakcocokan disebabkan oleh perbedaan zona waktu antara execution_date dan start_date DAG. Misalnya, hal ini dapat terjadi saat menggunakan pendulum.parse(...) untuk menetapkan start_date.

  • start_date DAG ditetapkan ke nilai dinamis, misalnya airflow.utils.dates.days_ago(1)

Solusi

  • Pastikan execution_date dan start_date menggunakan zona waktu yang sama.

  • Tentukan start_date statis dan gabungkan dengan catchup=False untuk menghindari DAG yang berjalan dengan tanggal mulai di masa lalu.

Praktik Terbaik

Dampak operasi update atau upgrade pada eksekusi tugas Airflow

Operasi update atau upgrade mengganggu tugas Airflow yang sedang dijalankan, kecuali jika tugas dijalankan dalam mode yang dapat ditangguhkan.

Sebaiknya lakukan operasi ini saat Anda memperkirakan dampak minimal pada eksekusi tugas Airflow dan siapkan mekanisme percobaan ulang yang sesuai di DAG dan tugas Anda.

Jangan menjadwalkan DAG yang dibuat secara terprogram pada waktu yang sama

Membuat objek DAG secara terprogram dari file DAG adalah metode yang efisien untuk membuat banyak DAG serupa yang hanya memiliki sedikit perbedaan.

Penting untuk tidak menjadwalkan semua DAG tersebut untuk segera dieksekusi. Ada kemungkinan besar pekerja Airflow tidak memiliki resource CPU dan memori yang cukup untuk menjalankan semua tugas yang dijadwalkan pada waktu yang sama.

Untuk menghindari masalah saat menjadwalkan DAG terprogram:

  • Tingkatkan konkurensi pekerja dan tingkatkan skala lingkungan Anda, sehingga lingkungan tersebut dapat mengeksekusi lebih banyak tugas secara bersamaan.
  • Buat DAG dengan cara mendistribusikan jadwalnya secara merata dari waktu ke waktu, untuk menghindari penjadwalan ratusan tugas secara bersamaan, sehingga pekerja Airflow memiliki waktu untuk menjalankan semua tugas terjadwal.

Mengontrol waktu eksekusi DAG, tugas, dan eksekusi paralel DAG yang sama

Jika Anda ingin mengontrol durasi eksekusi DAG tunggal untuk DAG tertentu, Anda dapat menggunakan parameter DAG dagrun_timeout untuk melakukannya. Misalnya, jika Anda memperkirakan bahwa satu kali eksekusi DAG (terlepas dari apakah eksekusi selesai dengan berhasil atau gagal) tidak boleh berlangsung lebih dari 1 jam, tetapkan parameter ini ke 3.600 detik.

Anda juga dapat mengontrol durasi yang diizinkan untuk satu tugas Airflow. Untuk melakukannya, Anda dapat menggunakan execution_timeout.

Jika ingin mengontrol jumlah run DAG aktif yang Anda inginkan untuk DAG tertentu, Anda dapat menggunakan opsi konfigurasi Airflow [core]max-active-runs-per-dag untuk melakukannya.

Jika Anda hanya ingin menjalankan satu instance DAG dalam waktu tertentu, tetapkan parameter max-active-runs-per-dag ke 1.

Menghindari peningkatan traffic jaringan ke dan dari database Airflow

Jumlah traffic jaringan antara cluster GKE lingkungan Anda dan database Airflow bergantung pada jumlah DAG, jumlah tugas dalam DAG, dan cara DAG mengakses data dalam database Airflow. Faktor-faktor berikut dapat memengaruhi penggunaan jaringan:

  • Kueri ke database Airflow. Jika DAG Anda melakukan banyak kueri, DAG tersebut akan menghasilkan traffic dalam jumlah besar. Contoh: memeriksa status tugas sebelum melanjutkan tugas lain, membuat kueri tabel XCom, membuang konten database Airflow.

  • Jumlah tugas yang besar. Makin banyak tugas yang dijadwalkan, makin banyak traffic jaringan yang dihasilkan. Pertimbangan ini berlaku untuk jumlah total tugas dalam DAG dan frekuensi penjadwalan. Saat penjadwal Airflow menjadwalkan eksekusi DAG, penjadwal akan membuat kueri ke database Airflow dan menghasilkan traffic.

  • Antarmuka web Airflow menghasilkan traffic jaringan karena membuat kueri ke database Airflow. Penggunaan halaman dengan grafik, tugas, dan diagram secara intensif dapat menghasilkan volume traffic jaringan yang besar.

Langkah berikutnya