Managed Airflow (3ª gen.) | Managed Airflow (2ª gen.) | Managed Airflow (1ª gen. heredada)
En esta página, se explica cómo funciona la programación y la activación de DAG en Airflow, cómo definir un programa para un DAG y cómo activar un DAG de forma manual o pausarlo.
Acerca de los DAG de Airflow en Managed Airflow
Los DAG de Airflow en Managed Airflow se ejecutan en uno o más entornos de Managed Airflow en tu proyecto. Subes los archivos de origen de tus DAG de Airflow a un bucket de Cloud Storage asociado con un entorno. Luego, la instancia de Airflow del entorno analiza estos archivos y programa las ejecuciones de DAG, según lo definido por el programa de cada DAG. Durante una ejecución de DAG, Airflow programa y ejecuta tareas individuales que componen un DAG en una secuencia definida por el DAG.
Para obtener más información sobre los conceptos básicos de Airflow, como los DAG de Airflow, las ejecuciones de DAG, las tareas o los operadores, consulta la página Conceptos básicos en la documentación de Airflow.
Acerca de la programación de DAG en Airflow
Airflow proporciona los siguientes conceptos para su mecanismo de programación:
- Fecha lógica
Representa una fecha para la que se ejecuta una ejecución de DAG en particular.
Esta no es la fecha real en la que Airflow ejecuta un DAG, sino un período que una ejecución de DAG en particular debe procesar. Por ejemplo, para un DAG que está programado para ejecutarse todos los días a las 12:00, la fecha lógica también sería las 12:00 de un día específico. Como se ejecuta dos veces al día, el período que debe procesar son las últimas 12 horas. Al mismo tiempo, es posible que la lógica definida en el DAG en sí no use la fecha lógica ni el intervalo de tiempo. Por ejemplo, un DAG puede ejecutar la misma secuencia de comandos una vez al día sin usar el valor de la fecha lógica.
En las versiones de Airflow anteriores a la 2.2, esta fecha se denomina fecha de ejecución.
- Fecha de la ejecución
Representa una fecha en la que se ejecuta una ejecución de DAG en particular.
Por ejemplo, para un DAG que está programado para ejecutarse todos los días a las 12:00, la ejecución real del DAG puede ocurrir a las 12:05, un tiempo después de que pase la fecha lógica.
- Intervalo de programas
Representa cuándo y con qué frecuencia se debe ejecutar un DAG, en términos de fechas lógicas.
Por ejemplo, un programa diario significa que un DAG se ejecuta una vez al día y que las fechas lógicas para sus ejecuciones de DAG tienen intervalos de 24 horas.
- Fecha de inicio
Especifica cuándo deseas que Airflow comience a programar el DAG.
Las tareas del DAG pueden tener fechas de inicio individuales, o bien puedes especificar una sola fecha de inicio para todas las tareas. En función de la fecha de inicio mínima para las tareas en tu DAG y en el intervalo de programación, Airflow programa las ejecuciones de DAG.
- Puesta al día, reabastecimiento de datos y reintentos
Mecanismos para ejecutar ejecuciones de DAG para fechas pasadas.
La puesta al día ejecuta las ejecuciones de DAG que aún no se ejecutaron, por ejemplo, si el DAG se pausó durante un período prolongado y, luego, se reanudó. Puedes usar el reabastecimiento de datos para ejecutar ejecuciones de DAG para un período determinado. Los reintentos especifican cuántos intentos debe realizar Airflow cuando ejecuta tareas desde un DAG.
La programación funciona de la siguiente manera:
Después de que pase la fecha de inicio, Airflow espera la siguiente ocurrencia del intervalo de programación.
Airflow programa la primera ejecución del DAG para que se realice al final de este intervalo de programación.
Por ejemplo, si un DAG está programado para ejecutarse a cada hora y la fecha de inicio es a las 12:00 p.m. de hoy, la primera ejecución del DAG se realiza hoy a las 13:00 p.m.
En la sección Programa un DAG de Airflow de este documento, se describe cómo configurar la programación para tus DAG con estos conceptos. Para obtener más información sobre las ejecuciones de DAG y la programación, consulta Ejecuciones de DAG en la documentación de Airflow.
Acerca de las formas de activar un DAG
Airflow proporciona las siguientes formas de activar un DAG:
Activación en función de un programa. Airflow activa el DAG de forma automática según el programa especificado en el archivo DAG.
Activación manual. Puedes activar un DAG de forma manual desde Google Cloud la consola, la IU de Airflow o ejecutando un comando de la CLI de Airflow desde Google Cloud CLI.
Activación en respuesta a eventos: La forma estándar de activar un DAG en respuesta a eventos es usar un sensor.
Otras formas de activar DAG:
Activación de forma programática: Puedes activar un DAG con la API de REST de Airflow. Por ejemplo, desde una secuencia de comandos de Python.
Activación de forma programática en respuesta a eventos: Puedes activar DAG en respuesta a eventos con funciones de Cloud Run y la API de REST de Airflow.
Antes de comenzar
- Asegúrate de que tu cuenta tenga una función que pueda administrar objetos en los buckets del entorno, y ver y activar DAG. Para obtener más información, consulta Guía de control de acceso.
Programa un DAG de Airflow
Defines un programa para un DAG en el archivo DAG. Edita la definición del DAG de la siguiente manera:
Busca y edita el archivo DAG en tu computadora. Si no tienes el archivo DAG, puedes descargar su copia del bucket del entorno. Para un DAG nuevo, puedes definir todos los parámetros cuando crees el archivo DAG.
En el parámetro
schedule, define el programa. Puedes usar una expresión de Cron, como0 0 * * *, o un valor preestablecido, como@daily. Para obtener más información, consulta Cron y los intervalos de tiempo en la documentación de Airflow.Airflow determina las fechas lógicas para las ejecuciones de DAG según el programa que establezcas.
En el parámetro
start_date, define la fecha de inicio.Airflow determina la fecha lógica de la primera ejecución del DAG con este parámetro.
(Opcional) En el parámetro
catchup, define si Airflow debe ejecutar todas las ejecuciones anteriores de este DAG desde la fecha de inicio hasta la fecha actual que aún no se ejecutaron.Las ejecuciones de DAG ejecutadas durante la puesta al día tendrán su fecha lógica en el pasado y su fecha de ejecución reflejará la hora en que se ejecutó realmente la ejecución del DAG.
(Opcional) En el parámetro
retries, define cuántas veces Airflow debe reintentar las tareas que fallaron (cada DAG consta de una o más tareas individuales). De forma predeterminada, las tareas en Managed Airflow se reintentan dos veces.Sube la nueva versión del DAG al bucket del entorno.
Espera hasta que Airflow analice el DAG correctamente. Por ejemplo, puedes consultar la lista de DAG en tu entorno en la Google Cloud consola o en la IU de Airflow.
La siguiente definición de DAG de ejemplo se ejecuta dos veces al día a las 00:00 y a las 12:00. Su fecha de inicio se establece en el 1 de enero de 2024, pero Airflow no la ejecuta para fechas pasadas después de que la subes o la pausas porque la puesta al día está inhabilitada.
El DAG contiene una tarea llamada insert_query_job, que inserta una fila en una tabla con el operador BigQueryInsertJobOperator. Este operador es uno de
Google Cloud operadores de BigQuery,
que puedes usar para administrar conjuntos de datos y tablas, ejecutar consultas y validar datos.
Si falla una ejecución en particular de esta tarea, Airflow la reintenta cuatro veces más con el intervalo de reintento predeterminado. La fecha lógica para estos reintentos sigue siendo la misma.
La consulta en SQL para esta fila usa plantillas de Airflow para escribir la fecha lógica del DAG y el nombre en la fila.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Para probar este DAG, puedes activarlo de forma manual y luego ver los registros de ejecución de la tarea.
Más ejemplos de parámetros de programación
A continuación, se muestran ejemplos de cómo funciona la programación con diferentes combinaciones de parámetros:
Si
start_dateesdatetime(2024, 4, 4, 16, 25)yschedulees30 16 * * *, la primera ejecución del DAG se realiza el 5 de abril de 2024 a las 16:30.Si
start_dateesdatetime(2024, 4, 4, 16, 35)yschedulees30 16 * * *, la primera ejecución del DAG se realizará el 6 de abril de 2024 a las 16:30. Debido a que la fecha de inicio es posterior al intervalo de programación el 4 de abril de 2024, la ejecución del DAG no se realiza el 5 de abril de 2024. En cambio, el intervalo de programación finaliza a las 16:35 del 5 de abril de 2024, por lo que la próxima ejecución de DAG está programada para el día siguiente a las 16:30.Si
start_dateesdatetime(2024, 4, 4)yschedulees@daily, entonces la primera ejecución del DAG se programa para las 00:00 del 5 de abril de 2024.Si
start_dateesdatetime(2024, 4, 4, 16, 30)yschedulees0 * * * *, entonces la primera ejecución del DAG está programada para el 4 de abril de 2024 a las 18:00. Después de que pase la fecha y hora especificadas, Airflow programa una ejecución de DAG en el minuto 0 de cada hora. El momento más cercano en que esto sucede es a las 17:00. En este momento, Airflow programa una ejecución de DAG para que se realice al final del intervalo de programación, es decir, a las 18:00.
Activa un DAG de forma manual
Cuando activas un DAG de Airflow de forma manual, Airflow ejecuta el DAG una vez, independientemente del programa especificado en el archivo DAG.
Console
La IU de DAG es compatible con Managed Airflow 1.17.8 y versiones posteriores.
Para activar un DAG desde la consola de Google Cloud , sigue estos pasos:
En la Google Cloud consola de, ve a la página Entornos.
Selecciona un entorno para ver sus detalles.
En la página Detalles del entorno, ve a la pestaña DAGs.
Haz clic en el nombre de un DAG.
En la página Detalles del DAG, haz clic en Activar DAG. Se crea una nueva ejecución de DAG.
IU de Airflow
Para activar un DAG desde la IU de Airflow, sigue estos pasos:
En la Google Cloud consola de, ve a la página Entornos.
En la columna Servidor web de Airflow, sigue el vínculo de Airflow para tu entorno.
Accede con la Cuenta de Google que tiene los permisos correspondientes.
En la IU de Airflow, en la página DAGs, haz clic en el botón Activar DAG para tu DAG.
gcloud
En Airflow 1.10.12 o versiones anteriores, ejecuta el comando trigger_dag de la CLI de Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
trigger_dag -- DAG_ID
En Airflow 1.10.14 o versiones posteriores, incluido Airflow 2, ejecuta el comando dags trigger de la CLI de Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Reemplaza lo siguiente:
ENVIRONMENT_NAME: Es el nombre de tu entorno.LOCATION: Es la región en la que se encuentra el entorno.DAG_ID: Es el nombre del DAG.
Para obtener más información sobre la ejecución de comandos de la CLI de Airflow en entornos de Managed Airflow, consulta Ejecuta comandos de la CLI de Airflow.
Para obtener más información sobre los comandos disponibles de la CLI de Airflow, consulta
la referencia de comandos gcloud composer environments run.
Visualiza los registros y detalles de la ejecución del DAG
En la Google Cloud consola de, puedes hacer lo siguiente:
- Visualiza los estados de las ejecuciones de DAG anteriores y los detalles del DAG.
- Explora los registros detallados de todas las ejecuciones de DAG y todas las tareas de estos DAG.
- Visualiza las estadísticas del DAG.
Además, Managed Airflow proporciona acceso a la IU de Airflow, que es la propia interfaz web de Airflow.
Pausa un DAG
Console
La IU de DAG es compatible con Managed Airflow 1.17.8 y versiones posteriores.
Para pausar un DAG desde la consola de Google Cloud , sigue estos pasos:
En la Google Cloud consola de, ve a la página Entornos.
Selecciona un entorno para ver sus detalles.
En la página Detalles del entorno, ve a la pestaña DAGs.
Haz clic en el nombre de un DAG.
En la página Detalles del DAG, haz clic en Pausar DAG.
IU de Airflow
Para pausar un DAG desde la IU de Airflow, sigue estos pasos:
- En la Google Cloud consola de, ve a la página Entornos.
En la columna Servidor web de Airflow, sigue el vínculo de Airflow para tu entorno.
Accede con la Cuenta de Google que tiene los permisos correspondientes.
En la interfaz web de Airflow, en la página DAGs, haz clic en el botón de activación junto al nombre del DAG.
gcloud
En Airflow 1.10.12 o versiones anteriores, ejecuta el comando pause de la CLI de Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
pause -- DAG_ID
En Airflow 1.10.14 o versiones posteriores, incluido Airflow 2, ejecuta el comando dags pause de la CLI de Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Reemplaza lo siguiente:
ENVIRONMENT_NAME: Es el nombre de tu entorno.LOCATION: Es la región en la que se encuentra el entorno.DAG_ID: Es el nombre del DAG.
Para obtener más información sobre la ejecución de comandos de la CLI de Airflow en entornos de Managed Airflow, consulta Ejecuta comandos de la CLI de Airflow.
Para obtener más información sobre los comandos disponibles de la CLI de Airflow, consulta
la referencia de comandos gcloud composer environments run.