Executar um DAG do Apache Airflow no Airflow Gerenciado (Geração 2) (Google Cloud CLI)

Airflow Gerenciado (Geração 3) | Airflow Gerenciado (Geração 2) | Airflow Gerenciado (Geração 1 legada)

Este guia de início rápido mostra como criar um ambiente do Serviço Gerenciado para Apache Airflow e executar um DAG do Apache Airflow no Airflow Gerenciado (Geração 2).

Antes de começar

  1. Faça login na sua Google Cloud conta do. Se você começou a usar o Google Cloud, crie uma conta para avaliar o desempenho dos nossos produtos em situações reais. Clientes novos também recebem US $300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a Google Cloud CLI.

  3. Ao usar um provedor de identidade (IdP) externo, primeiro faça login na CLI gcloud com sua identidade federada.

  4. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  5. Crie ou selecione um Google Cloud projeto.

    Papéis necessários para selecionar ou criar um projeto

    • Selecionar um projeto: a seleção de um projeto não exige um papel específico do IAM. Você pode selecionar qualquer projeto em que tenha recebido um papel.
    • Criar um projeto: para criar um projeto, você precisa do papel Criador de projetos (roles/resourcemanager.projectCreator), que contém a resourcemanager.projects.create permissão. Saiba como conceder papéis.
    • Crie um Google Cloud projeto do:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o Google Cloud projeto do que você está criando.

    • Selecione o Google Cloud projeto do que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do Google Cloud projeto do.

  6. Verifique se o faturamento está ativado para o Google Cloud projeto.

  7. Instale a Google Cloud CLI.

  8. Ao usar um provedor de identidade (IdP) externo, primeiro faça login na CLI gcloud com sua identidade federada.

  9. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  10. Crie ou selecione um Google Cloud projeto.

    Papéis necessários para selecionar ou criar um projeto

    • Selecionar um projeto: a seleção de um projeto não exige um papel específico do IAM. Você pode selecionar qualquer projeto em que tenha recebido um papel.
    • Criar um projeto: para criar um projeto, você precisa do papel Criador de projetos (roles/resourcemanager.projectCreator), que contém a resourcemanager.projects.create permissão. Saiba como conceder papéis.
    • Crie um Google Cloud projeto do:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o Google Cloud projeto do que você está criando.

    • Selecione o Google Cloud projeto do que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do Google Cloud projeto do.

  11. Verifique se o faturamento está ativado para o Google Cloud projeto.

  12. Ative a API do Airflow Gerenciado:

    Funções necessárias para ativar APIs

    Para ativar as APIs, é necessário ter o papel do IAM de administrador de Service Usage role (roles/serviceusage.serviceUsageAdmin), que contém a serviceusage.services.enable permissão. Saiba como conceder papéis.

    gcloud services enable composer.googleapis.com
  13. Para conseguir as permissões necessárias a fim de concluir o guia de início rápido, peça ao administrador para conceder a você os seguintes papéis do IAM no projeto:

    Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

    Também é possível conseguir as permissões necessárias usando personalizados papéis ou outros predefinidos papéis.

Criar uma conta de serviço do ambiente

Ao criar um ambiente, você especifica uma conta de serviço. Essa conta de serviço é chamada de conta de serviço do ambiente. O ambiente usa essa conta de serviço para realizar a maioria das operações.

A conta de serviço do ambiente não é uma conta de usuário. Uma conta de serviço é um tipo especial de conta usada por um aplicativo ou uma instância de máquina virtual (VM), não uma pessoa.

Para criar uma conta de serviço para o ambiente:

  1. Crie uma nova conta de serviço, conforme descrito em a documentação do Identity and Access Management.

  2. Conceda um papel a ela, conforme descrito na documentação do Identity and Access Management. O papel necessário é Worker do Composer (composer.worker).

Criar um ambiente

Se este for o primeiro ambiente no seu projeto, adicione a conta do agente de serviço do Airflow Gerenciado como um novo principal na conta de serviço do ambiente e conceda o papel roles/composer.ServiceAgentV2Ext a ela.

Por padrão, o ambiente usa a conta de serviço padrão do Compute Engine, e o exemplo a seguir mostra como adicionar essa permissão a ela.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    ENVIRONMENT_SERVICE_ACCOUNT \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Substitua ENVIRONMENT_SERVICE_ACCOUNT pela conta de serviço do seu ambiente que você criou anteriormente.

Crie um novo ambiente chamado example-environment na us-central1 região, com a versão mais recente do Airflow Gerenciado (Geração 2) versão.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.17.2-airflow-2.11.1 \
    --service-account ENVIRONMENT_SERVICE_ACCOUNT

Substitua ENVIRONMENT_SERVICE_ACCOUNT pela conta de serviço do seu ambiente que você criou anteriormente.

Criar um arquivo DAG

Um Airflow DAG é uma coleção de tarefas organizadas que você quer programar e executar. Os DAGs são definidos em arquivos Python padrão.

Este guia usa um exemplo de DAG do Airflow definido no arquivo quickstart.py. O código Python nesse arquivo faz o seguinte:

  1. Cria um DAG, composer_sample_dag. Esse DAG é executado todos os dias.
  2. Executa uma tarefa, print_dag_run_conf. A tarefa imprime a configuração da execução do DAG usando o operador bash.

Salve uma cópia do arquivo quickstart.py na máquina local:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Fazer upload do arquivo DAG para o bucket do ambiente

Cada ambiente do Airflow Gerenciado tem um bucket do Cloud Storage associado a ele. O Airflow no Airflow Gerenciado programa apenas DAGs localizados na pasta /dags nesse bucket.

Para programar o DAG, faça upload de quickstart.py da máquina local para a pasta /dags do ambiente:

Para fazer upload de quickstart.py com a Google Cloud CLI, execute o comando a seguir na pasta em que o arquivo quickstart.py está localizado:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

Conferir o DAG

Depois de fazer o upload do arquivo DAG, o Airflow faz o seguinte:

  1. Analisa o arquivo DAG que você fez upload. Pode levar alguns minutos para que o DAG fique disponível para o Airflow.
  2. Adiciona o DAG à lista de DAGs disponíveis.
  3. Executa o DAG de acordo com a programação fornecida no arquivo DAG.

Verifique se o DAG é processado sem erros e está disponível no Airflow visualizando-o na interface do DAG. A interface do DAG é a interface do Airflow Gerenciado para visualizar informações do DAG no Google Cloud console. O Airflow Gerenciado também fornece acesso à interface do Airflow, que é uma interface da Web nativa do Airflow.

  1. Aguarde cerca de cinco minutos para que o Airflow processe o arquivo DAG que você fez upload anteriormente e conclua a primeira execução do DAG (explicada mais adiante).

  2. Execute o comando a seguir na Google Cloud CLI. Esse comando executa o dags list comando da CLI do Airflow que lista os DAGs no seu ambiente.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Verifique se o DAG composer_quickstart está listado na saída do comando.

    Exemplo de saída:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Conferir detalhes da execução do DAG

Uma única execução de um DAG é chamada de execução de DAG. O Airflow executa imediatamente uma execução de DAG para o DAG de exemplo porque a data de início no arquivo DAG está definida como ontem. Dessa forma, o Airflow acompanha a programação do DAG especificado.

O DAG de exemplo contém uma tarefa, print_dag_run_conf, que executa o comando echo no console. Esse comando gera informações de metadados sobre o DAG (identificador numérico da execução do DAG).

Execute o comando a seguir na Google Cloud CLI. Esse comando lista as execuções de DAG para o DAG composer_quickstart:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Exemplo de saída:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

A CLI do Airflow não fornece um comando para visualizar registros de tarefas. É possível usar outros métodos para visualizar registros de tarefas do Airflow: interface do DAG do Airflow Gerenciado, interface do Airflow ou Cloud Logging. Este guia mostra uma maneira de consultar o Cloud Logging para registros de uma execução de DAG específica.

Execute o comando a seguir na Google Cloud CLI. Esse comando lê registros do Cloud Logging para uma execução de DAG específica do DAG composer_quickstart. O argumento --format formata a saída para que apenas o texto da mensagem de registro seja exibido.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Substitua:

  • RUN_ID pelo valor run_id da saída do comando tasks states-for-dag-run que você executou anteriormente. Por exemplo, 2024-02-17T15:38:38.969307+00:00.

Exemplo de saída:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Limpar

Para evitar cobranças na conta do Google Cloud pelos recursos usados nesta página, exclua o Google Cloud projeto do e os recursos.

Exclua os recursos usados neste tutorial:

  1. Exclua o ambiente do Airflow Gerenciado:

    1. Noconsole, acesse a página Ambientes. Google Cloud

      Acessar "Ambientes"

    2. Selecione example-environment e clique em Excluir.

    3. Aguarde até o ambiente ser excluído.

  2. Exclua o bucket do ambiente. A exclusão do ambiente do Airflow Gerenciado não exclui o bucket.

    1. Noconsole, acesse a página Armazenamento > Navegador. Google Cloud

      Acesse Armazenamento > Navegador

    2. Selecione o bucket do ambiente e clique em Excluir. Por exemplo, esse bucket pode ser chamado de us-central1-example-environ-c1616fe8-bucket.

A seguir