Raggruppamento di attività all'interno dei DAG

Managed Airflow (terza generazione) | Managed Airflow (seconda generazione) | Managed Airflow (prima generazione legacy)

Questa pagina descrive come raggruppare le attività nelle pipeline Airflow utilizzando i seguenti pattern di progettazione:

  • Raggruppamento delle attività nel grafico DAG.
  • Attivazione dei DAG secondari da un DAG principale.
  • Raggruppamento delle attività con l'operatore TaskGroup.

Raggruppare le attività nel grafico DAG

Per raggruppare le attività in determinate fasi della pipeline, puoi utilizzare le relazioni tra le attività nel file DAG.

Considera il seguente esempio:

Il grafico delle attività Airflow che mostra le attività di ramificazione
Figura 1. Le attività possono essere raggruppate in un Airflow DAG (fai clic per ingrandire)

In questo flusso di lavoro, le attività op-1 e op-2 vengono eseguite insieme dopo l'attività iniziale start. Puoi ottenere questo risultato raggruppando le attività con l'istruzione start >> [task_1, task_2].

L'esempio seguente fornisce un'implementazione completa di questo DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Attivare i DAG secondari da un DAG principale

Puoi attivare un DAG da un altro DAG con l' TriggerDagRunOperator operatore.

Considera il seguente esempio:

Il grafico delle attività di Airflow che mostra i DAG secondari attivati come parte di un grafico DAG
Figura 2. I DAG possono essere attivati dall'interno di un DAG con l'operatore TriggerDagRunOperator (fai clic per ingrandire)

In questo flusso di lavoro, i blocchi dag_1 e dag_2 rappresentano una serie di attività raggruppate in un DAG separato nell'ambiente Airflow gestito.

L'implementazione di questo flusso di lavoro richiede due file DAG separati. Il file DAG di controllo ha il seguente aspetto:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

L'implementazione del DAG secondario, attivato dal DAG di controllo, ha il seguente aspetto:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

Per il corretto funzionamento del DAG, devi caricare entrambi i file DAG nell'ambiente Airflow gestito.

Raggruppare le attività con l'operatore TaskGroup

Puoi utilizzare l' TaskGroupoperatore per raggruppare le attività nel DAG. Le attività definite all'interno di un blocco TaskGroup fanno comunque parte del DAG principale.

Considera il seguente esempio:

Il grafico delle attività Airflow che mostra due gruppi di attività
Figura 3. Le attività possono essere raggruppate visivamente nell' UI con l'operatore TaskGroup (fai clic per ingrandire)

Le attività op-1 e op-2 sono raggruppate in un blocco con ID taskgroup_1. Un'implementazione di questo flusso di lavoro ha il seguente aspetto:

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=":")
        task_4 = BashOperator(task_id="op-4", bash_command=":")

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> section_1 >> some_other_task >> section_2 >> end

Passaggi successivi