Aceda à interface de linhas de comando do Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

O Apache Airflow tem uma interface de linhas de comando (CLI) que pode usar para realizar tarefas como acionar e gerir DAGs, obter informações sobre execuções de DAGs e tarefas, bem como adicionar e eliminar ligações e utilizadores.

Comandos da CLI do Airflow suportados

O Airflow usa a sintaxe da CLI do Airflow, que é descrita na documentação do Airflow.

Consulte a referência do comando gcloud composer environments run para ver a lista completa de comandos da CLI do Airflow suportados.

Antes de começar

Para executar comandos da CLI do Airflow através da CLI do Google Cloud:

  • A sua Conta Google tem de ter autorizações para usar a CLI do Google Cloud com o Cloud Composer e executar comandos da CLI do Airflow.

  • Os comandos da CLI do Airflow executados através da CLI do Google Cloud consomem a environments.executeAirflowCommand quota.

  • Nas versões do Cloud Composer anteriores à 2.4.0, precisa de acesso ao plano de controlo do cluster do seu ambiente para executar comandos da CLI do Airflow.

Execute comandos da CLI Airflow com a CLI gcloud

Para executar comandos da CLI do Airflow nos seus ambientes, use a CLI gcloud:

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

Substitua o seguinte:

  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.
  • SUBCOMMAND: um dos comandos da CLI do Airflow suportados.
  • SUBCOMMAND_ARGUMENTS com argumentos para o comando da CLI do Airflow.

Separador de argumentos de subcomando

Separe os argumentos do comando da CLI do Airflow especificado com --:

  • Especifique comandos da CLI compostos como um subcomando.
  • Especifique quaisquer argumentos para comandos compostos como argumentos de subcomando, após um separador --.

Exemplo:

gcloud composer environments run example-environment \
    dags list -- --output=json

Localização predefinida

A maioria dos comandos do gcloud composer requer uma localização. Pode especificar a localização com a flag --location ou definindo a localização predefinida.

Por exemplo, para acionar um DAG denominado sample_quickstart com o ID 5077 no seu ambiente do Cloud Composer:

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

Executar comandos num ambiente de IP privado

Nas versões do Cloud Composer anteriores à 2.4.0:

Para executar comandos da CLI do Airflow num ambiente de IP privado, execute-os numa máquina que possa aceder ao ponto final do plano de controlo do cluster do GKE. As suas opções podem variar consoante a configuração do cluster privado.

Se o acesso ao ponto final público estiver desativado no cluster do seu ambiente, não é possível usar comandos gcloud composer para executar a CLI do Airflow. Para poder executar comandos da CLI do Airflow, siga estes passos:

  1. Crie uma VM na sua rede VPC
  2. Adquirir credenciais do cluster. Execute o seguinte comando:

    gcloud container clusters get-credentials CLUSTER_NAME \
      --region REGION \
      --project PROJECT \
      --internal-ip
    

Use kubectl para executar o comando do Airflow. Por exemplo:

kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

Substitua COMPOSER_NAMESPACE por um espaço de nomes semelhante a: composer-2-0-28-airflow-2-3-394zxc12411. Pode encontrar o Cloud Composer na lista de cargas de trabalho ou usando o comando kubectl get namespaces.

Se o acesso ao ponto final público estiver ativado no cluster do seu ambiente, também pode executar comandos da CLI do Airflow a partir de uma máquina com um endereço IP externo que seja adicionado às redes autorizadas. Para ativar o acesso a partir do seu computador, adicione o endereço externo do computador à lista de redes autorizadas do seu ambiente.

Quando executar comandos gcloud composer environments run ou kubectl, pode encontrar o seguinte erro:

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

Sintoma: esta mensagem de erro indica que não existe conetividade de rede a partir de um computador onde executa estes comandos.

Solução: siga as diretrizes apresentadas na secção Executar comandos num ambiente de IP privado ou use as instruções disponíveis na secção O comando kubectlexcede o tempo limite.

Execute comandos da CLI do Airflow através da Cloud Composer API

A partir da versão 2.4.0 do Cloud Composer, pode executar comandos da CLI do Airflow através da API Cloud Composer.

Executar um comando

Construa um pedido de API environments.executeAirflowCommand:

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto.
  • LOCATION: a região onde o ambiente está localizado.
  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • AIRFLOW_COMMAND: comando da CLI do Airflow que quer executar, como dags.
  • AIRFLOW_SUBCOMMAND: subcomando do comando da CLI do Airflow que quer executar, como list.
  • (opcional) SUBCOMMAND_PARAMETER: parâmetros para o subcomando. Se quiser usar mais do que um parâmetro, adicione mais itens à lista.

Exemplo:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

Verifique o estado do comando

Depois de executar um comando da CLI do Airflow através da API Cloud Composer, verifique se o comando foi concluído com êxito fazendo um pedido PollAirflowCommand e inspecionando os campos em exitInfo quanto a erros e códigos de estado. O campo output contém linhas de registo.

Para obter o estado de execução do comando e obter registos, forneça os valores executionId, pod e podNamespace devolvidos por ExecuteAirflowCommandRequest:

Exemplo:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

O que se segue?