Acceder a la API REST de Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow tiene una interfaz de API REST que puedes usar para realizar tareas como obtener información sobre ejecuciones de DAGs y tareas, actualizar DAGs, obtener la configuración de Airflow, añadir y eliminar conexiones, y listar usuarios.

Para ver un ejemplo de cómo usar la API REST de Airflow con funciones de Cloud Run, consulta Activar DAGs con funciones de Cloud Run.

Versiones de la API REST de Airflow

Configurar la API REST de Airflow

Airflow 3

Airflow 3 usa la API REST de Airflow v2.

Cloud Composer usa su propio backend de autenticación de API.

La autorización funciona de la forma estándar que proporciona Airflow 3. Cuando un usuario nuevo autoriza a través de la API, su cuenta obtiene el rol Op de forma predeterminada.

La API REST de Airflow en Airflow 3 siempre está habilitada y no se puede inhabilitar. Puedes cambiar el rol de usuario predeterminado sustituyendo la siguiente opción de configuración de Airflow:

Sección Clave Valor Notas
api composer_auth_user_registration_role Op Puedes especificar cualquier otro rol.

Airflow 2

La API REST de Airflow v1 está habilitada de forma predeterminada en Airflow 2.

Cloud Composer usa su propio backend de autenticación de API.

La autorización funciona de la forma estándar que proporciona Airflow. Cuando un usuario nuevo autoriza el acceso a través de la API, su cuenta recibe el rol Op de forma predeterminada.

Puedes habilitar o inhabilitar la API REST de Airflow, o cambiar el rol de usuario predeterminado anulando las siguientes opciones de configuración de Airflow:

Sección Clave Valor Notas
api auth_backends airflow.composer.api.backend.composer_auth Para inhabilitar la API REST de Airflow, cambia a airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op Puedes especificar cualquier otro rol.

Permitir llamadas a la API REST de Airflow mediante el control de acceso al servidor web

En función del método utilizado para llamar a la API REST de Airflow, el método de llamada puede usar una dirección IPv4 o IPv6. Recuerda desbloquear el tráfico IP a la API REST de Airflow mediante el control de acceso al servidor web.

Usa la opción de configuración predeterminada, que es All IP addresses have access (default), si no sabes desde qué direcciones IP se enviarán tus llamadas a la API REST de Airflow.

Hacer llamadas a la API REST de Airflow

En esta sección se proporciona un ejemplo de secuencia de comandos de Python que puede usar para activar DAGs con la API REST de Airflow.

En la secuencia de comandos, define las siguientes variables:

  • dag_id: nombre de un DAG, tal como se define en el archivo de origen del DAG.
  • dag_config: configuración de la ejecución del DAG.
  • web_server_url: URL de tu servidor web de Airflow. El formato es https://<web-server-id>.composer.googleusercontent.com.

  • (Airflow 3) logical_date: la fecha lógica de la ejecución del DAG.

Airflow 3

"""
Trigger a DAG in Cloud Composer 3 environment with Airflow 3 using the Airflow REST API v2.
"""

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests

# Following best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])

def make_composer3_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 3 environment's web server with Airflow 3.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)

def trigger_dag(web_server_url: str, dag_id: str, data: dict, logical_date: str) -> str:
    """
    Make a request to trigger a DAG using the Airflow REST API v2.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 3 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v2/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data, "logical_date": logical_date}

    response = make_composer3_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text

if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "airflow_monitoring"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    logical_date = "2025-01-01T14:00:00Z" # Replace with the data interval for which to run the DAG
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer-staging.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config, logical_date=logical_date
    )

    print(response_text)

Airflow 2

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

Acceder a la API REST de Airflow mediante una cuenta de servicio

La base de datos de Airflow en las versiones anteriores a la 2.3.0 limita la longitud del campo de correo a 64 caracteres. A veces, las cuentas de servicio tienen direcciones de correo electrónico de más de 64 caracteres. No es posible crear usuarios de Airflow para estas cuentas de servicio de la forma habitual. Si no hay ningún usuario de Airflow para esa cuenta de servicio, al acceder a la API REST de Airflow se producirán errores HTTP 401 y 403.

Como solución alternativa, puedes preregistrar un usuario de Airflow para una cuenta de servicio. Para ello, usa accounts.google.com:NUMERIC_USER_ID como nombre de usuario y cualquier cadena única como correo.

  1. Para obtener NUMERIC_USER_ID de una cuenta de servicio, ejecuta el siguiente comando:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Sustituye:

    • SA_NAME con el nombre de la cuenta de servicio.
    • PROJECT_ID con el ID del proyecto.
  2. Crea un usuario de Airflow con el rol Op para la cuenta de servicio:

    Interfaz de usuario de Airflow

    1. Ve a la interfaz de usuario de Airflow.

    2. Ve a Seguridad > Lista de usuarios y haz clic en Añadir un nuevo registro. Tu usuario de Airflow debe tener el rol Admin para abrir esta página.

    3. Especifica accounts.google.com:NUMERIC_USER_ID como nombre de usuario. Sustituye NUMERIC_USER_ID por el ID de usuario obtenido en el paso anterior.

    4. Especifica un identificador único como correo electrónico. Puedes usar cualquier cadena única.

    5. Especifica el rol del usuario. Por ejemplo, Op.

    6. Asegúrese de que la casilla ¿Está activo? esté marcada.

    7. Especifica el nombre y los apellidos del usuario. Puedes usar cualquier cadena.

    8. Haz clic en Guardar.

    gcloud

    Ejecuta el siguiente comando de la CLI de Airflow:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Sustituye:

    • ENVIRONMENT_NAME con el nombre del entorno.
    • LOCATION con la región en la que se encuentra el entorno.
    • NUMERIC_USER_ID con el ID de usuario obtenido en el paso anterior.
    • UNIQUE_ID con el identificador del usuario de Airflow. Puedes usar cualquier cadena única.
  3. Después de crear un usuario de Airflow para una cuenta de servicio, se reconoce a la persona que llama autenticada como la cuenta de servicio como un usuario pre-registrado y se inicia sesión en Airflow.

Escalar el componente de la API REST de Airflow

Los endpoints de la API REST de Airflow y de la interfaz de usuario de Airflow se ejecutan en el servidor web de Airflow. Si usas la API REST de forma intensiva, considera la posibilidad de aumentar la cantidad de CPU y memoria disponibles para el servidor web de Airflow en función de la carga prevista.

Siguientes pasos