Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
O Apache Airflow tem uma interface da API REST que pode ser usada para executar tarefas como ver informações sobre execuções e tarefas do DAG, atualizar DAGs, configurar o Airflow, adicionar e excluir conexões e listar usuários.
Para um exemplo de como usar a API REST do Airflow com as funções do Cloud Run, consulte Como acionar DAGs com as funções do Cloud Run.
Versões da API REST do Airflow
- O Airflow 3 usa a API REST do Airflow v2.
- O Airflow 2 usa a API REST v1 do Airflow.
Configurar a API REST do Airflow
Airflow 3
O Airflow 3 usa a API REST v2 do Airflow.
O Cloud Composer usa o próprio back-end de autenticação da API.A autorização funciona da maneira padrão fornecida pelo Airflow 3.
Quando um novo usuário autoriza por meio da API, a conta dele recebe o papel Op
por padrão.
A API REST do Airflow 3 está sempre ativada e não é possível desativá-la. É possível mudar o função do usuário padrão substituindo a seguinte opção de configuração do Airflow:
| Seção | Chave | Valor | Observações |
|---|---|---|---|
api
|
composer_auth_user_registration_role
|
Op
|
É possível especificar qualquer outro papel. |
Airflow 2
A API REST v1 do Airflow está ativada por padrão no Airflow 2.
O Cloud Composer usa o próprio back-end de autenticação da API.A autorização funciona da maneira padrão fornecida pelo Airflow. Quando um novo usuário
autoriza por meio da API, a conta do usuário recebe o papel Op por padrão.
É possível ativar ou desativar a API REST do Airflow ou mudar o papel de usuário padrão substituindo as seguintes opções de configuração do Airflow:
| Seção | Chave | Valor | Observações |
|---|---|---|---|
api
|
auth_backends
|
airflow.composer.api.backend.composer_auth
|
Para desativar a API REST do Airflow, mude para
airflow.api.auth.backend.deny_all |
api
|
composer_auth_user_registration_role
|
Op
|
É possível especificar qualquer outro papel. |
Permitir chamadas de API para a API REST do Airflow usando o controle de acesso do servidor da Web
Dependendo do método usado para chamar a API REST do Airflow, o método de chamada pode usar um endereço IPv4 ou IPv6. Não se esqueça de desbloquear o tráfego IP para a API REST do Airflow usando o controle de acesso ao servidor da Web.
Use a opção de configuração padrão, que é
All IP addresses have access (default), se você não tiver certeza de quais endereços IP
serão usados para enviar as chamadas à API REST do Airflow.
Fazer chamadas para a API REST do Airflow
Nesta seção, fornecemos um exemplo de script Python que pode ser usado para acionar DAGs com a API REST do Airflow.
No script, defina as seguintes variáveis:
dag_id: nome de um DAG, conforme definido no arquivo de origem do DAG.dag_config: configuração da execução do DAG.web_server_url: o URL do servidor da Web do Airflow. O formato éhttps://<web-server-id>.composer.googleusercontent.com.(Airflow 3)
logical_date: a data lógica da execução do DAG.
Airflow 3
"""
Trigger a DAG in Cloud Composer 3 environment with Airflow 3 using the Airflow REST API v2.
"""
from __future__ import annotations
from typing import Any
import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests
# Following best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
def make_composer3_web_server_request(
url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
"""
Make a request to Cloud Composer 3 environment's web server with Airflow 3.
Args:
url: The URL to fetch.
method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
'PATCH', 'DELETE')
**kwargs: Any of the parameters defined for the request function:
https://github.com/requests/requests/blob/master/requests/api.py
If no timeout is provided, it is set to 90 by default.
"""
authed_session = AuthorizedSession(CREDENTIALS)
# Set the default timeout, if missing
if "timeout" not in kwargs:
kwargs["timeout"] = 90
return authed_session.request(method, url, **kwargs)
def trigger_dag(web_server_url: str, dag_id: str, data: dict, logical_date: str) -> str:
"""
Make a request to trigger a DAG using the Airflow REST API v2.
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
Args:
web_server_url: The URL of the Airflow 3 web server.
dag_id: The DAG ID.
data: Additional configuration parameters for the DAG run (json).
"""
endpoint = f"api/v2/dags/{dag_id}/dagRuns"
request_url = f"{web_server_url}/{endpoint}"
json_data = {"conf": data, "logical_date": logical_date}
response = make_composer3_web_server_request(
request_url, method="POST", json=json_data
)
if response.status_code == 403:
raise requests.HTTPError(
"You do not have a permission to perform this operation. "
"Check Airflow RBAC roles for your account."
f"{response.headers} / {response.text}"
)
elif response.status_code != 200:
response.raise_for_status()
else:
return response.text
if __name__ == "__main__":
# TODO(developer): replace with your values
dag_id = "airflow_monitoring" # Replace with the ID of the DAG that you want to run.
dag_config = {
"your-key": "your-value"
} # Replace with configuration parameters for the DAG run.
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
logical_date = "2025-01-01T14:00:00Z" # Replace with the data interval for which to run the DAG
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer-staging.googleusercontent.com"
)
response_text = trigger_dag(
web_server_url=web_server_url, dag_id=dag_id, data=dag_config, logical_date=logical_date
)
print(response_text)
Airflow 2
Acessar a API REST do Airflow usando uma conta de serviço
O banco de dados do Airflow em versões anteriores à 2.3.0 limita o comprimento do campo de e-mail a 64 caracteres. Às vezes, as contas de serviço têm endereços de e-mail com mais de 64 caracteres. No momento, não é possível criar usuários do Airflow para essas contas de serviço. Se não houver um usuário do Airflow para essa conta de serviço, o acesso à API REST do Airflow vai resultar em erros HTTP 401 e 403.
Como solução alternativa, você pode fazer o pré-registro de um usuário do Airflow para uma conta de serviço. Para
fazer isso, use accounts.google.com:NUMERIC_USER_ID como o nome de usuário e qualquer
string exclusiva como o e-mail.
Para acessar
NUMERIC_USER_IDem uma conta de serviço, execute:gcloud iam service-accounts describe \ SA_NAME@PROJECT_ID.iam.gserviceaccount.com \ --format="value(oauth2ClientId)"Substitua:
SA_NAMEpelo nome da conta de serviço.PROJECT_IDpelo ID do projeto;
Crie um usuário do Airflow com o papel
Oppara a conta de serviço:IU do Airflow
Acesse a IU do Airflow.
Acesse Segurança > Listar usuários e clique em Adicionar um novo registro. O usuário do Airflow precisa ter o papel
Adminpara abrir essa página.Especifique
accounts.google.com:NUMERIC_USER_IDcomo o nome do usuário. SubstituaNUMERIC_USER_IDpelo ID do usuário da etapa anterior.Especifique um identificador exclusivo como o e-mail. Você pode usar qualquer string exclusiva.
Especifique o papel do usuário. Por exemplo,
OpVerifique se a caixa de seleção Está ativo? está marcada.
Especifique o nome e o sobrenome do usuário. Você pode usar qualquer string.
Clique em Save.
gcloud
Execute o seguinte comando da CLI do Airflow:
gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ users create -- \ -u accounts.google.com:NUMERIC_USER_ID \ -e UNIQUE_ID \ -f UNIQUE_ID \ -l - -r Op --use-random-passwordSubstitua:
ENVIRONMENT_NAMEpelo nome do ambienteLOCATIONpela região em que o ambiente está localizado.NUMERIC_USER_IDpelo ID do usuário da etapa anterior.UNIQUE_IDpelo identificador do usuário do Airflow. Você pode usar qualquer string exclusiva.
Depois de criar um usuário do Airflow para uma conta de serviço, um autor da chamada é autenticado porque a conta de serviço é reconhecida como pré-registrada e conectada ao Airflow.
Como escalonar o componente da API REST do Airflow
Os endpoints da API REST do Airflow e da interface do Airflow são executados no servidor da Web do Airflow. Se você usa a API REST intensamente, considere aumentar a quantidade de CPU e memória disponíveis para o servidor da Web do Airflow, com base na carga esperada.