Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
O Apache Airflow tem uma interface de API REST que pode usar para realizar tarefas como obter informações sobre execuções e tarefas de DAGs, atualizar DAGs, obter a configuração do Airflow, adicionar e eliminar associações e listar utilizadores.
Para ver um exemplo de utilização da API REST do Airflow com funções do Cloud Run, consulte o artigo Acionar DAGs com funções do Cloud Run.
Versões da API REST do Airflow
- O Airflow 3 usa a API REST v2 do Airflow.
- O Airflow 2 usa a API REST v1 do Airflow.
Configure a API REST do Airflow
Airflow 3
O Airflow 3 usa a API REST do Airflow v2.
O Cloud Composer usa o seu próprio backend de autenticação de API.A autorização funciona da forma padrão fornecida pelo Airflow 3.
Quando um novo utilizador concede autorização através da API, a conta do utilizador recebe a função Op
por predefinição.
A API REST Airflow no Airflow 3 está sempre ativada e não é possível desativá-la. Pode alterar a função de utilizador predefinida substituindo a seguinte opção de configuração do Airflow:
| Secção | Chave | Valor | Notas |
|---|---|---|---|
api
|
composer_auth_user_registration_role
|
Op
|
Pode especificar qualquer outra função. |
Airflow 2
A API REST Airflow v1 está ativada por predefinição no Airflow 2.
O Cloud Composer usa o seu próprio backend de autenticação de API.A autorização funciona da forma padrão fornecida pelo Airflow. Quando um novo utilizador autoriza através da API, a conta do utilizador recebe a função Op por predefinição.
Pode ativar ou desativar a API REST do Airflow ou alterar a função do utilizador predefinida substituindo as seguintes opções de configuração do Airflow:
| Secção | Chave | Valor | Notas |
|---|---|---|---|
api
|
auth_backends
|
airflow.composer.api.backend.composer_auth
|
Para desativar a API REST do Airflow, altere para:
airflow.api.auth.backend.deny_all |
api
|
composer_auth_user_registration_role
|
Op
|
Pode especificar qualquer outra função. |
Permitir chamadas à API REST do Airflow através do controlo de acesso do servidor Web
Consoante o método usado para chamar a API REST do Airflow, o método de chamador pode usar um endereço IPv4 ou IPv6. Lembre-se de desbloquear o tráfego de IP para a API REST do Airflow através do controlo de acesso do servidor Web.
Use a opção de configuração predefinida, que é
All IP addresses have access (default), se não tiver a certeza de que endereços IP
serão enviados os seus pedidos para a API REST do Airflow.
Faça chamadas para a API REST do Airflow
Esta secção fornece um exemplo de script Python que pode usar para acionar DAGs com a API REST do Airflow.
No script, defina as seguintes variáveis:
dag_id: nome de um DAG, conforme definido no ficheiro de origem do DAG.dag_config: configuração da execução do DAG.web_server_url: o URL do servidor Web do Airflow. O formato éhttps://<web-server-id>.composer.googleusercontent.com.(Airflow 3)
logical_date: A data lógica para a execução do DAG.
Airflow 3
"""
Trigger a DAG in Cloud Composer 3 environment with Airflow 3 using the Airflow REST API v2.
"""
from __future__ import annotations
from typing import Any
import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests
# Following best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
def make_composer3_web_server_request(
url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
"""
Make a request to Cloud Composer 3 environment's web server with Airflow 3.
Args:
url: The URL to fetch.
method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
'PATCH', 'DELETE')
**kwargs: Any of the parameters defined for the request function:
https://github.com/requests/requests/blob/master/requests/api.py
If no timeout is provided, it is set to 90 by default.
"""
authed_session = AuthorizedSession(CREDENTIALS)
# Set the default timeout, if missing
if "timeout" not in kwargs:
kwargs["timeout"] = 90
return authed_session.request(method, url, **kwargs)
def trigger_dag(web_server_url: str, dag_id: str, data: dict, logical_date: str) -> str:
"""
Make a request to trigger a DAG using the Airflow REST API v2.
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
Args:
web_server_url: The URL of the Airflow 3 web server.
dag_id: The DAG ID.
data: Additional configuration parameters for the DAG run (json).
"""
endpoint = f"api/v2/dags/{dag_id}/dagRuns"
request_url = f"{web_server_url}/{endpoint}"
json_data = {"conf": data, "logical_date": logical_date}
response = make_composer3_web_server_request(
request_url, method="POST", json=json_data
)
if response.status_code == 403:
raise requests.HTTPError(
"You do not have a permission to perform this operation. "
"Check Airflow RBAC roles for your account."
f"{response.headers} / {response.text}"
)
elif response.status_code != 200:
response.raise_for_status()
else:
return response.text
if __name__ == "__main__":
# TODO(developer): replace with your values
dag_id = "airflow_monitoring" # Replace with the ID of the DAG that you want to run.
dag_config = {
"your-key": "your-value"
} # Replace with configuration parameters for the DAG run.
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
logical_date = "2025-01-01T14:00:00Z" # Replace with the data interval for which to run the DAG
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer-staging.googleusercontent.com"
)
response_text = trigger_dag(
web_server_url=web_server_url, dag_id=dag_id, data=dag_config, logical_date=logical_date
)
print(response_text)
Airflow 2
Aceda à API REST do Airflow através de uma conta de serviço
A base de dados do Airflow nas versões do Airflow anteriores à 2.3.0 limita o comprimento do campo de email a 64 carateres. Por vezes, as contas de serviço têm endereços de email com mais de 64 carateres. Não é possível criar utilizadores do Airflow para essas contas de serviço da forma habitual. Se não existir um utilizador do Airflow para essa conta de serviço, o acesso à API REST do Airflow resulta em erros HTTP 401 e 403.
Como solução alternativa, pode pré-registar um utilizador do Airflow para uma conta de serviço. Para
isso, use accounts.google.com:NUMERIC_USER_ID como nome de utilizador e qualquer
string única como email.
Para obter o
NUMERIC_USER_IDpara uma conta de serviço, execute o seguinte comando:gcloud iam service-accounts describe \ SA_NAME@PROJECT_ID.iam.gserviceaccount.com \ --format="value(oauth2ClientId)"Substituir:
SA_NAMEcom o nome da conta de serviço.PROJECT_IDcom o ID do projeto.
Crie um utilizador do Airflow com a função
Oppara a conta de serviço:IU do Airflow
Aceda a Segurança > Listar utilizadores e clique em Adicionar novo registo. O utilizador do Airflow tem de ter a função
Adminpara abrir esta página.Especifique
accounts.google.com:NUMERIC_USER_IDcomo nome de utilizador. SubstituaNUMERIC_USER_IDpelo ID de utilizador obtido no passo anterior.Especifique um identificador exclusivo como o email. Pode usar qualquer string única.
Especifique a função do utilizador. Por exemplo,
Op.Certifique-se de que a caixa de verificação Está ativo? está selecionada.
Especifique o nome próprio e o apelido do utilizador. Pode usar qualquer string.
Clique em Guardar.
gcloud
Execute o seguinte comando da CLI do Airflow:
gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ users create -- \ -u accounts.google.com:NUMERIC_USER_ID \ -e UNIQUE_ID \ -f UNIQUE_ID \ -l - -r Op --use-random-passwordSubstituir:
ENVIRONMENT_NAMEcom o nome do ambiente.LOCATIONcom a região onde o ambiente está localizado.NUMERIC_USER_IDcom o ID do utilizador obtido no passo anterior.UNIQUE_IDcom o identificador do utilizador do Airflow. Pode usar qualquer string única.
Depois de criar um utilizador do Airflow para uma conta de serviço, um autor da chamada autenticado como a conta de serviço é reconhecido como um utilizador pré-registado e tem sessão iniciada no Airflow.
Dimensionar o componente da API REST do Airflow
A API REST Airflow e os pontos finais da IU do Airflow são executados no servidor Web do Airflow. Se usar a API REST de forma intensiva, pondere aumentar a quantidade de CPU e memória disponíveis para o servidor Web do Airflow, com base na carga esperada.