Menjalankan DAG Apache Airflow di Managed Airflow (Gen 2) (Google Cloud CLI)

Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Gen 1 Lama)

Panduan memulai ini menunjukkan cara membuat lingkungan Managed Service untuk Apache Airflow dan menjalankan DAG Apache Airflow di Managed Airflow (Gen 2).

Sebelum memulai

  1. Login keakun Anda. Google Cloud Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Instal Google Cloud CLI.

  3. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  4. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  5. Buat atau pilih Google Cloud project.

    Peran yang diperlukan untuk memilih atau membuat project

    • Memilih project: Memilih project tidak memerlukan peran IAM tertentu Anda dapat memilih project mana pun yang telah diberi peran.
    • Membuat project: Untuk membuat project, Anda memerlukan peran Pembuat Project (roles/resourcemanager.projectCreator), yang berisi izin resourcemanager.projects.create. Pelajari cara memberikan peran.
    • Buat Google Cloud project:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk Google Cloud project yang Anda buat.

    • Pilih Google Cloud project yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama Google Cloud project Anda.

  6. Pastikan penagihan diaktifkan untuk Google Cloud project Anda.

  7. Instal Google Cloud CLI.

  8. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  9. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  10. Buat atau pilih Google Cloud project.

    Peran yang diperlukan untuk memilih atau membuat project

    • Memilih project: Memilih project tidak memerlukan peran IAM tertentu Anda dapat memilih project mana pun yang telah diberi peran.
    • Membuat project: Untuk membuat project, Anda memerlukan peran Pembuat Project (roles/resourcemanager.projectCreator), yang berisi izin resourcemanager.projects.create. Pelajari cara memberikan peran.
    • Buat Google Cloud project:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk Google Cloud project yang Anda buat.

    • Pilih Google Cloud project yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama Google Cloud project Anda.

  11. Pastikan penagihan diaktifkan untuk Google Cloud project Anda.

  12. Aktifkan Managed Airflow API:

    Peran yang diperlukan untuk mengaktifkan API

    Untuk mengaktifkan API, Anda memerlukan peran IAM Service Usage Admin (roles/serviceusage.serviceUsageAdmin), yang berisi izin serviceusage.services.enable. Pelajari cara memberikan peran.

    gcloud services enable composer.googleapis.com
  13. Untuk mendapatkan izin yang Anda perlukan untuk menyelesaikan panduan memulai ini, minta administrator Anda untuk memberi Anda peran IAM berikut di project Anda:

    Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project, folder, dan organisasi.

    Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Membuat akun layanan lingkungan

Saat membuat lingkungan, Anda menentukan akun layanan. Akun layanan ini disebut akun layanan lingkungan. Lingkungan Anda menggunakan akun layanan ini untuk melakukan sebagian besar operasi.

Akun layanan untuk lingkungan Anda bukan akun pengguna. Akun layanan adalah jenis akun khusus yang digunakan oleh aplikasi atau instance virtual machine (VM), bukan orang.

Untuk membuat akun layanan untuk lingkungan Anda:

  1. Buat akun layanan baru, seperti yang dijelaskan dalam dokumentasi Identity and Access Management.

  2. Berikan peran ke akun tersebut, seperti yang dijelaskan dalam dokumentasi Identity and Access Management. Peran yang diperlukan adalah Composer Worker (composer.worker).

Membuat lingkungan

Jika ini adalah lingkungan pertama di project Anda, tambahkan akun Managed Airflow Service Agent sebagai pokok baru di akun layanan lingkungan Anda dan berikan peran roles/composer.ServiceAgentV2Ext ke akun tersebut.

Secara default, lingkungan Anda menggunakan akun layanan Compute Engine default, dan contoh berikut menunjukkan cara menambahkan izin ini ke akun tersebut.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    ENVIRONMENT_SERVICE_ACCOUNT \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Ganti ENVIRONMENT_SERVICE_ACCOUNT dengan akun layanan untuk lingkungan Anda yang telah Anda buat sebelumnya.

Buat lingkungan baru bernama example-environment di us-central1 region, dengan versi Managed Airflow (Gen 2) terbaru.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.17.0-airflow-2.11.1 \
    --service-account ENVIRONMENT_SERVICE_ACCOUNT

Ganti ENVIRONMENT_SERVICE_ACCOUNT dengan akun layanan untuk lingkungan Anda yang telah Anda buat sebelumnya.

Membuat file DAG

DAG Airflow adalah kumpulan tugas terorganisir yang ingin Anda jadwalkan dan jalankan. DAG ditentukan dalam file Python standar.

Panduan ini menggunakan contoh DAG Airflow yang ditentukan dalam file quickstart.py. Kode Python dalam file ini melakukan hal berikut:

  1. Membuat DAG, composer_sample_dag. DAG ini berjalan setiap hari.
  2. Menjalankan satu tugas, print_dag_run_conf. Tugas ini mencetak konfigurasi operasi DAG menggunakan operator bash.

Simpan salinan file quickstart.py di komputer lokal Anda:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Mengupload file DAG ke bucket lingkungan Anda

Setiap lingkungan Managed Airflow memiliki bucket Cloud Storage yang terkait dengannya. Airflow di Managed Airflow hanya menjadwalkan DAG yang berada di folder /dags di bucket ini.

Untuk menjadwalkan DAG, upload quickstart.py dari komputer lokal Anda ke folder /dags lingkungan Anda:

Untuk mengupload quickstart.py dengan Google Cloud CLI, jalankan perintah berikut di folder tempat file quickstart.py berada:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

Melihat DAG

Setelah Anda mengupload file DAG, Airflow akan melakukan hal berikut:

  1. Mengurai file DAG yang Anda upload. DAG mungkin memerlukan waktu beberapa menit agar tersedia untuk Airflow.
  2. Menambahkan DAG ke daftar DAG yang tersedia.
  3. Menjalankan DAG sesuai dengan jadwal yang Anda berikan dalam file DAG.

Pastikan DAG Anda diproses tanpa error dan tersedia di Airflow dengan melihatnya di UI DAG. UI DAG adalah antarmuka Managed Airflow untuk melihat informasi DAG di Google Cloud konsol. Managed Airflow juga menyediakan akses ke UI Airflow, yang merupakan antarmuka web Airflow native.

  1. Tunggu sekitar lima menit untuk memberi Airflow waktu memproses file DAG yang Anda upload sebelumnya, dan untuk menyelesaikan operasi DAG pertama (dijelaskan nanti).

  2. Jalankan perintah berikut di Google Cloud CLI. Perintah ini menjalankan perintah Airflow CLI dags list yang mencantumkan DAG di lingkungan Anda.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Pastikan DAG composer_quickstart tercantum dalam output perintah.

    Contoh output:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Melihat detail operasi DAG

Satu kali eksekusi DAG disebut operasi DAG. Airflow segera menjalankan operasi DAG untuk contoh DAG karena tanggal mulai dalam file DAG ditetapkan ke kemarin. Dengan cara ini, Airflow akan mengejar jadwal DAG yang ditentukan.

Contoh DAG berisi satu tugas, print_dag_run_conf, yang menjalankan perintah echo di konsol. Perintah ini menampilkan informasi meta tentang DAG (ID numerik operasi DAG).

Jalankan perintah berikut di Google Cloud CLI. Perintah ini mencantumkan operasi DAG untuk DAG composer_quickstart:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Contoh output:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI tidak menyediakan perintah untuk melihat log tugas. Anda dapat menggunakan metode lain untuk melihat log tugas Airflow: UI DAG Managed Airflow, UI Airflow, atau Cloud Logging. Panduan ini menunjukkan cara mengkueri Cloud Logging untuk mendapatkan log dari operasi DAG tertentu.

Jalankan perintah berikut di Google Cloud CLI. Perintah ini membaca log dari Cloud Logging untuk operasi DAG tertentu dari DAG composer_quickstart. Argumen --format memformat output sehingga hanya teks pesan log yang ditampilkan.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Ganti:

  • RUN_ID dengan nilai run_id dari output perintah tasks states-for-dag-run yang Anda jalankan sebelumnya. Misalnya, 2024-02-17T15:38:38.969307+00:00.

Contoh output:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Pembersihan

Agar akun Anda tidak dikenai biaya untuk resource yang digunakan pada halaman ini, hapus project yang berisi resource tersebut. Google Cloud Google Cloud

Hapus resource yang digunakan dalam tutorial ini:

  1. Hapus lingkungan Managed Airflow:

    1. Di Google Cloud konsol, buka halaman Environments.

      Buka Environments

    2. Pilih example-environment, lalu klik Delete.

    3. Tunggu hingga lingkungan dihapus.

  2. Hapus bucket lingkungan Anda. Menghapus lingkungan Managed Airflow tidak akan menghapus bucket-nya.

    1. Di Google Cloud konsol, buka halaman Storage > Browser.

      Buka Storage > Browser

    2. Pilih bucket lingkungan, lalu klik Delete. Misalnya, bucket ini dapat diberi nama us-central1-example-environ-c1616fe8-bucket.

Langkah berikutnya