Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Apache Airflow tiene una interfaz de la API de REST que puedes usar para realizar tareas como obtener información sobre ejecuciones y tareas de DAG, actualizar DAG, obtener la configuración de Airflow, agregar y borrar conexiones, y enumerar usuarios.
Para ver un ejemplo del uso de la API de REST de Airflow con Cloud Run Functions, consulta Activa los DAG con Cloud Run Functions.
Versiones de la API de REST de Airflow
- Airflow 3 usa la API de REST de Airflow v2.
- Airflow 2 usa la API de REST de Airflow v1.
Configura la API de REST de Airflow
Airflow 3
Airflow 3 usa la versión 2 de la API de REST de Airflow.
Cloud Composer usa su propio backend de autenticación de API.La autorización funciona de la manera estándar que proporciona Airflow 3.
Cuando un usuario nuevo se autoriza a través de la API, la cuenta del usuario obtiene el rol Op de forma predeterminada.
La API de REST de Airflow en Airflow 3 siempre está habilitada y no es posible inhabilitarla. Puedes cambiar el rol de usuario predeterminado si anulas la siguiente opción de configuración de Airflow:
| Sección | Clave | Valor | Notas |
|---|---|---|---|
api
|
composer_auth_user_registration_role
|
Op
|
Puedes especificar cualquier otra función. |
Airflow 2
La API de REST v1 de Airflow está habilitada de forma predeterminada en Airflow 2.
Cloud Composer usa su propio backend de autenticación de API.La autorización funciona de la manera estándar que proporciona Airflow. Cuando un usuario nuevo se autoriza a través de la API, la cuenta del usuario obtiene la función Op de forma predeterminada.
Puedes habilitar o inhabilitar la API de REST de Airflow, o cambiar el rol de usuario predeterminado si anulas las siguientes opciones de configuración de Airflow:
| Sección | Clave | Valor | Notas |
|---|---|---|---|
api
|
auth_backends
|
airflow.composer.api.backend.composer_auth
|
Para inhabilitar la API de REST de Airflow, cambia a airflow.api.auth.backend.deny_all. |
api
|
composer_auth_user_registration_role
|
Op
|
Puedes especificar cualquier otra función. |
Permite llamadas a la API de REST de Airflow con el control de acceso del servidor web
Según el método que se use para llamar a la API de REST de Airflow, el método de llamada puede usar una dirección IPv4 o IPv6. Recuerda desbloquear el tráfico de IP a la API de REST de Airflow con el control de acceso al servidor web.
Usa la opción de configuración predeterminada, que es All IP addresses have access (default), si no sabes desde qué direcciones IP se enviarán tus llamadas a la API de REST de Airflow.
Realiza llamadas a la API de REST de Airflow
En esta sección, se proporciona una secuencia de comandos de Python de ejemplo que puedes usar para activar los DAG con la API de REST de Airflow.
En la secuencia de comandos, configura las siguientes variables:
dag_id: Nombre de un DAG, como se define en el archivo fuente del DAG.dag_config: Es la configuración de la ejecución del DAG.web_server_url: Es la URL de tu servidor web de Airflow. El formato eshttps://<web-server-id>.composer.googleusercontent.com.(Airflow 3)
logical_date: Es la fecha lógica de la ejecución del DAG.
Airflow 3
"""
Trigger a DAG in Cloud Composer 3 environment with Airflow 3 using the Airflow REST API v2.
"""
from __future__ import annotations
from typing import Any
import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests
# Following best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
def make_composer3_web_server_request(
url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
"""
Make a request to Cloud Composer 3 environment's web server with Airflow 3.
Args:
url: The URL to fetch.
method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
'PATCH', 'DELETE')
**kwargs: Any of the parameters defined for the request function:
https://github.com/requests/requests/blob/master/requests/api.py
If no timeout is provided, it is set to 90 by default.
"""
authed_session = AuthorizedSession(CREDENTIALS)
# Set the default timeout, if missing
if "timeout" not in kwargs:
kwargs["timeout"] = 90
return authed_session.request(method, url, **kwargs)
def trigger_dag(web_server_url: str, dag_id: str, data: dict, logical_date: str) -> str:
"""
Make a request to trigger a DAG using the Airflow REST API v2.
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
Args:
web_server_url: The URL of the Airflow 3 web server.
dag_id: The DAG ID.
data: Additional configuration parameters for the DAG run (json).
"""
endpoint = f"api/v2/dags/{dag_id}/dagRuns"
request_url = f"{web_server_url}/{endpoint}"
json_data = {"conf": data, "logical_date": logical_date}
response = make_composer3_web_server_request(
request_url, method="POST", json=json_data
)
if response.status_code == 403:
raise requests.HTTPError(
"You do not have a permission to perform this operation. "
"Check Airflow RBAC roles for your account."
f"{response.headers} / {response.text}"
)
elif response.status_code != 200:
response.raise_for_status()
else:
return response.text
if __name__ == "__main__":
# TODO(developer): replace with your values
dag_id = "airflow_monitoring" # Replace with the ID of the DAG that you want to run.
dag_config = {
"your-key": "your-value"
} # Replace with configuration parameters for the DAG run.
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
logical_date = "2025-01-01T14:00:00Z" # Replace with the data interval for which to run the DAG
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer-staging.googleusercontent.com"
)
response_text = trigger_dag(
web_server_url=web_server_url, dag_id=dag_id, data=dag_config, logical_date=logical_date
)
print(response_text)
Airflow 2
Accede a la API de REST de Airflow con una cuenta de servicio
La base de datos de Airflow en las versiones anteriores a la 2.3.0 limita la longitud del campo de correo electrónico a 64 caracteres. A veces, las cuentas de servicio tienen direcciones de correo electrónico de más de 64 caracteres. No es posible crear usuarios de Airflow para esas cuentas de servicio de la manera habitual. Si no hay un usuario de Airflow para esa cuenta de servicio, acceder a la API de REST de Airflow genera errores HTTP 401 y 403.
Como solución alternativa, puedes preregistrar un usuario de Airflow para una cuenta de servicio. Para ello, usa accounts.google.com:NUMERIC_USER_ID como nombre de usuario y cualquier cadena única como correo electrónico.
Para obtener
NUMERIC_USER_IDpara una cuenta de servicio, ejecuta el siguiente comando:gcloud iam service-accounts describe \ SA_NAME@PROJECT_ID.iam.gserviceaccount.com \ --format="value(oauth2ClientId)"Reemplaza lo siguiente:
SA_NAMEpor el nombre de la cuenta de servicio.PROJECT_IDpor el ID del proyecto.
Crea un usuario de Airflow con el rol
Oppara la cuenta de servicio:IU de Airflow
Ve a Security > List Users y haz clic en Add a new record. Tu usuario de Airflow debe tener el rol
Adminpara abrir esta página.Especifica
accounts.google.com:NUMERIC_USER_IDcomo el nombre de usuario. ReemplazaNUMERIC_USER_IDpor el ID de usuario que obtuviste en el paso anterior.Especifica un identificador único como el correo electrónico. Puedes usar cualquier cadena única.
Especifica el rol del usuario. Por ejemplo,
Op.Asegúrate de que esté seleccionada la casilla de verificación ¿Está activo?.
Especifica el nombre y el apellido del usuario. Puedes usar cualquier cadena.
Haz clic en Guardar.
gcloud
Ejecuta el siguiente comando de la CLI de Airflow:
gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ users create -- \ -u accounts.google.com:NUMERIC_USER_ID \ -e UNIQUE_ID \ -f UNIQUE_ID \ -l - -r Op --use-random-passwordReemplaza lo siguiente:
ENVIRONMENT_NAMEpor el nombre del entorno.LOCATIONpor la región en la que se encuentra el entorno.NUMERIC_USER_IDcon el ID de usuario que obtuviste en el paso anteriorUNIQUE_IDcon el identificador del usuario de Airflow. Puedes usar cualquier cadena única.
Después de crear un usuario de Airflow para una cuenta de servicio, se reconoce a un llamador autenticado como la cuenta de servicio como un usuario pre registrado y se accede a Airflow.
Cómo escalar el componente de la API de REST de Airflow
Los extremos de la API de REST de Airflow y de la IU de Airflow se ejecutan dentro del servidor web de Airflow. Si usas la API de REST de forma intensiva, considera aumentar la cantidad de CPU y memoria disponibles para el servidor web de Airflow, según la carga esperada.