Airflow Gerenciado (Geração 3) | Airflow Gerenciado (Geração 2) | Airflow Gerenciado (Geração 1 legada)
Nesta página, explicamos como a programação e o acionamento de DAGs funcionam no Airflow, como definir uma programação para um DAG e como acionar ou pausar um DAG manualmente.
Sobre os DAGs do Airflow no Airflow Gerenciado
Os DAGs do Airflow no Airflow Gerenciado são executados em um ou mais ambientes do Airflow Gerenciado no seu projeto. Faça upload dos arquivos de origem dos DAGs do Airflow para um bucket do Cloud Storage associado a um ambiente. A instância do Airflow do ambiente analisa esses arquivos e programa execuções de DAGs, conforme definido pela programação de cada DAG. Durante uma execução de DAG, o Airflow programa e executa tarefas individuais que compõem um DAG em uma sequência definida pelo DAG.
Para saber mais sobre os conceitos básicos do Airflow, como DAGs, execuções de DAGs, tarefas ou operadores, consulte a página Conceitos básicos na documentação do Airflow.
Sobre a programação de DAGs no Airflow
O Airflow oferece os seguintes conceitos para o mecanismo de programação:
- Data lógica
Representa uma data para a qual uma execução de DAG específica é executada.
Essa não é a data real em que o Airflow executa um DAG, mas um período de tempo que uma execução de DAG específica precisa processar. Por exemplo, para um DAG programado para ser executado todos os dias às 12h, a data lógica também seria 12h de um dia específico. Como ele é executado duas vezes por dia, o período de tempo que ele precisa processar é de 12 horas. Ao mesmo tempo, a lógica definida no próprio DAG pode não usar a data lógica ou o intervalo de tempo. Por exemplo, um DAG pode executar o mesmo script uma vez por dia sem usar o valor da data lógica.
Em versões do Airflow anteriores à 2.2, essa data é chamada de data de execução.
- Data de execução
Representa uma data em que uma execução de DAG específica é executada.
Por exemplo, para um DAG programado para ser executado todos os dias às 12h, a execução real do DAG pode acontecer às 12h05, algum tempo depois da data lógica.
- Intervalo da programação
Representa quando e com que frequência um DAG precisa ser executado, em termos de datas lógicas.
Por exemplo, uma programação diária significa que um DAG é executado uma vez por dia, e as datas lógicas para as execuções de DAG têm intervalos de 24 horas.
- Data de início
Especifica quando você quer que o Airflow comece a programar o DAG.
As tarefas no DAG podem ter datas de início individuais ou você pode especificar uma única data de início para todas as tarefas. Com base na data de início mínima das tarefas no DAG e no intervalo da programação, o Airflow programa as execuções de DAGs.
- Recuperação, preenchimento retroativo e novas tentativas
Mecanismos para executar DAGs para datas anteriores.
A recuperação executa DAGs que ainda não foram executados, por exemplo, se o DAG foi pausado por um longo período e depois retomado. É possível usar o preenchimento retroativo para executar DAGs para um determinado período. As novas tentativas especificam quantas tentativas o Airflow precisa fazer ao executar tarefas de um DAG.
A programação funciona da seguinte maneira:
Depois que a data de início passar, o Airflow vai aguardar a próxima ocorrência do intervalo da programação.
O Airflow programa a primeira execução do DAG para acontecer no final desse intervalo.
Por exemplo, se um DAG estiver programado para ser executado a cada hora e a data de início for às 12h de hoje, a primeira execução do DAG ocorrerá às 13h de hoje.
A seção Programar um DAG do Airflow neste documento descreve como configurar a programação dos DAGs usando esses conceitos. Para mais informações sobre execuções e programação de DAGs, consulte Execuções de DAGs na documentação do Airflow.
Sobre as maneiras de acionar um DAG
O Airflow fornece as seguintes maneiras de acionar um DAG:
Acionar com base em uma programação. O Airflow aciona o DAG automaticamente com base na programação especificada para ele no arquivo DAG.
Acionar manualmente. É possível acionar um DAG manualmente no Google Cloud console, na interface do Airflow ou executando um comando da CLI do Airflow na Google Cloud CLI.
Acionamento em resposta a eventos. A maneira padrão de acionar um DAG em resposta a eventos é usando um sensor.
Outras maneiras de acionar DAGs:
Acionar de maneira programática. É possível acionar um DAG usando a API REST do Airflow. Por exemplo, em um script em Python.
Acionar de maneira programática em resposta a eventos. É possível acionar DAGs em resposta a eventos usando funções do Cloud Run e a API REST do Airflow.
Antes de começar
- Verifique se sua conta tem um papel que possa gerenciar objetos nos buckets do ambiente e visualizar e acionar DAGs. Para mais informações, consulte Controle de acesso.
Programar um DAG do Airflow
Você define uma programação para um DAG no arquivo DAG. Edite a definição do DAG da seguinte maneira:
Localize e edite o arquivo DAG no computador. Se você não tiver o arquivo DAG, faça o download da cópia dele no bucket do ambiente. Para um novo DAG, é possível definir todos os parâmetros ao criar o arquivo DAG.
No parâmetro
schedule, defina a programação. É possível usar uma expressão Cron, como0 0 * * *, ou uma predefinição, como@daily. Para mais informações, consulte Cron e intervalos de tempo na documentação do Airflow.O Airflow determina as datas lógicas para execuções de DAGs com base na programação definida.
No parâmetro
start_date, defina a data de início.O Airflow determina a data lógica da primeira execução do DAG usando esse parâmetro.
(Opcional) No parâmetro
catchup, defina se o Airflow precisa executar todas as execuções anteriores desse DAG da data de início até a data atual que ainda não foram executadas.As execuções de DAGs realizadas durante a recuperação terão a data lógica no passado e a data de execução vai refletir o momento em que a execução do DAG foi realizada.
(Opcional) No parâmetro
retries, defina quantas vezes o Airflow precisa tentar novamente as tarefas com falha (cada DAG consiste em uma ou mais tarefas individuais). Por padrão, as tarefas no Airflow Gerenciado são repetidas duas vezes.Faça upload da nova versão do DAG para o bucket do ambiente.
Aguarde até que o Airflow analise o DAG. Por exemplo, é possível conferir a lista de DAGs no ambiente no Google Cloud console ou na interface do Airflow.
A definição de DAG de exemplo a seguir é executada duas vezes por dia, às 00h e às 12h. A data de início está definida como 1º de janeiro de 2024, mas o Airflow não a executa para datas anteriores depois que você faz upload ou pausa, porque a recuperação está desativada.
O DAG contém uma tarefa chamada insert_query_job, que insere uma linha em uma tabela com o operador BigQueryInsertJobOperator. Esse operador é um dos
Google Cloud operadores do BigQuery,
que podem ser usados para gerenciar conjuntos de dados e tabelas, executar consultas e validar dados.
Se uma execução específica dessa tarefa falhar, o Airflow vai tentar novamente mais quatro vezes com o intervalo de novas tentativas padrão. A data lógica dessas novas tentativas permanece a mesma.
A consulta SQL para essa linha usa modelos do Airflow para gravar a data lógica do DAG e o nome na linha.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Para testar esse DAG, é possível acioná-lo manualmente e então visualizar os registros de execução de tarefas.
Mais exemplos de parâmetros de programação
Os exemplos de parâmetros de programação a seguir ilustram como a programação funciona com diferentes combinações de parâmetros:
Se
start_datefordatetime(2024, 4, 4, 16, 25)eschedulefor30 16 * * *, a primeira execução do DAG acontecerá às 16h30 de 5 de abril de 2024.Se
start_datefordatetime(2024, 4, 4, 16, 35)eschedulefor30 16 * * *, a primeira execução do DAG acontecerá às 16h30 de 6 de abril de 2024. Como a data de início é após o intervalo da programação em 4 de abril de 2024, a execução do DAG não acontecerá no dia 5 de abril de 2024. Em vez disso, o intervalo da programação termina às 16h35 do dia 5 de abril de 2024. Portanto, a próxima execução do DAG é programada para às 16h30 do dia seguinte.Se
start_datefordatetime(2024, 4, 4)eschedulefor@daily, a primeira execução do DAG será programada para às 00h do dia 5 de abril de 2024.Se
start_datefordatetime(2024, 4, 4, 16, 30)eschedulefor0 * * * *, a primeira execução do DAG será programada para 18h do dia 4 de abril de 2024. Após a data e hora especificadas, o Airflow programa uma execução do DAG para ocorrer no minuto 0 de cada hora. O momento mais próximo em que isso acontece é 17h. Nesse momento, o Airflow programa uma execução do DAG para acontecer no final do intervalo da programação, ou seja, às 18h.
Acionar um DAG manualmente
Quando você aciona manualmente um DAG do Airflow, o Airflow executa o DAG uma vez, independente da programação especificada no arquivo DAG.
Console
Para acionar um DAG no Google Cloud console:
No Google Cloud console, acesse a página Ambientes.
Selecione um ambiente para ver os detalhes.
Na página Detalhes do ambiente, acesse a guia DAGs.
Clique no nome de um DAG.
Na página Detalhes do DAG, clique em Acionar DAG. Uma nova execução de DAG é criada.
IU do Airflow
Para acionar um DAG na interface do Airflow:
No Google Cloud console, acesse a página Ambientes.
Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.
Faça login com a Conta do Google que tem as permissões apropriadas.
Na interface do Airflow, na página DAGs, clique no botão Acionar DAG do DAG.
gcloud
Execute o comando dags trigger da CLI do Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Substitua:
ENVIRONMENT_NAME: o nome do ambiente;LOCATION: a região em que o ambiente está localizado;DAG_ID: o nome do DAG.
Para mais informações sobre como executar comandos da CLI do Airflow em ambientes do Airflow Gerenciado, consulte Como executar comandos da CLI do Airflow.
Para mais informações sobre os comandos da CLI do Airflow disponíveis, consulte
a referência do comando gcloud composer environments run.
Conferir registros e detalhes da execução do DAG
No Google Cloud console, é possível:
- Conferir os status das execuções de DAGs anteriores e os detalhes do DAG.
- Conferir registros detalhados de todas as execuções de DAGs e todas as tarefas desses DAGs.
- Conferir estatísticas do DAG.
Além disso, o Airflow Gerenciado oferece acesso à interface do Airflow, que é a interface da Web do Airflow.
Pausar um DAG
Console
Para pausar um DAG no Google Cloud console:
No Google Cloud console, acesse a página Ambientes.
Selecione um ambiente para ver os detalhes.
Na página Detalhes do ambiente, acesse a guia DAGs.
Clique no nome de um DAG.
Na página Detalhes do DAG, clique em Pausar DAG.
IU do Airflow
Para pausar um DAG na interface do Airflow:
- No Google Cloud console, acesse a página Ambientes.
Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.
Faça login com a Conta do Google que tem as permissões apropriadas.
Na interface da Web do Airflow, na página DAGs, clique na alternância ao lado do nome do DAG.
gcloud
Execute o comando dags pause da CLI do Airflow:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Substitua:
ENVIRONMENT_NAME: o nome do ambiente;LOCATION: a região em que o ambiente está localizado;DAG_ID: o nome do DAG.
Para mais informações sobre como executar comandos da CLI do Airflow em ambientes do Airflow Gerenciado, consulte Como executar comandos da CLI do Airflow.
Para mais informações sobre os comandos da CLI do Airflow disponíveis, consulte
a referência do comando gcloud composer environments run.