Ejecuta un DAG de Apache Airflow en Managed Airflow (gen. 2) (Google Cloud CLI)

Airflow administrado (gen. 3) | Airflow administrado (gen. 2) | Airflow administrado (gen. 1 heredada)

En esta guía de inicio rápido, se muestra cómo crear un entorno de Managed Service para Apache Airflow y ejecutar un DAG de Apache Airflow en Managed Airflow (Gen 2).

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud . Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Instala Google Cloud CLI.

  3. Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.

  4. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  5. Crea o selecciona un Google Cloud proyecto.

    Roles necesarios para seleccionar o crear un proyecto

    • Selecciona un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
    • Crear un proyecto: Para crear un proyecto, necesitas el rol de Creador de proyectos (roles/resourcemanager.projectCreator), que contiene el permiso resourcemanager.projects.create. Obtén más información para otorgar roles.
    • Crea un proyecto de Google Cloud :

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto Google Cloud que estás creando.

    • Selecciona el proyecto Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre de tu Google Cloud proyecto.

  6. Verifica que la facturación esté habilitada para tu proyecto de Google Cloud .

  7. Instala Google Cloud CLI.

  8. Si usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.

  9. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  10. Crea o selecciona un Google Cloud proyecto.

    Roles necesarios para seleccionar o crear un proyecto

    • Selecciona un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
    • Crear un proyecto: Para crear un proyecto, necesitas el rol de Creador de proyectos (roles/resourcemanager.projectCreator), que contiene el permiso resourcemanager.projects.create. Obtén más información para otorgar roles.
    • Crea un proyecto de Google Cloud :

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto Google Cloud que estás creando.

    • Selecciona el proyecto Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre de tu Google Cloud proyecto.

  11. Verifica que la facturación esté habilitada para tu proyecto de Google Cloud .

  12. Habilita la API de Managed Airflow:

    Roles necesarios para habilitar las APIs

    Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (roles/serviceusage.serviceUsageAdmin), que contiene el permiso serviceusage.services.enable. Obtén más información para otorgar roles.

    gcloud services enable composer.googleapis.com
  13. Si quieres obtener los permisos que necesitas para completar esta guía de inicio rápido, pídele a tu administrador que te otorgue los siguientes roles de IAM en tu proyecto:

    Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

    También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.

Crea una cuenta de servicio del entorno

Cuando creas un entorno, especificas una cuenta de servicio. Esta cuenta de servicio se denomina cuenta de servicio del entorno. Tu entorno usa esta cuenta de servicio para realizar la mayoría de las operaciones.

La cuenta de servicio de tu entorno no es una cuenta de usuario. Una cuenta de servicio es un tipo especial de cuenta que usa una aplicación o una instancia de máquina virtual (VM), no una persona.

Para crear una cuenta de servicio para tu entorno, haz lo siguiente:

  1. Crea una cuenta de servicio nueva, como se describe en la documentación de Identity and Access Management.

  2. Otorga un rol, como se describe en la documentación de Identity and Access Management. El rol requerido es Trabajador de Composer (composer.worker).

Crear un entorno

Si este es el primer entorno de tu proyecto, agrega la cuenta del agente de servicio de Managed Airflow como una nueva entidad en la cuenta de servicio de tu entorno y otórgale el rol de roles/composer.ServiceAgentV2Ext.

De forma predeterminada, tu entorno usa la cuenta de servicio predeterminada de Compute Engine, y el siguiente ejemplo muestra cómo agregarle este permiso.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    ENVIRONMENT_SERVICE_ACCOUNT \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Reemplaza ENVIRONMENT_SERVICE_ACCOUNT por la cuenta de servicio de tu entorno que creaste anteriormente.

Crea un entorno nuevo llamado example-environment en la región us-central1, con la versión más reciente de Airflow administrado (2ª gen.).

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.16.10-airflow-2.10.5 \
    --service-account ENVIRONMENT_SERVICE_ACCOUNT

Reemplaza ENVIRONMENT_SERVICE_ACCOUNT por la cuenta de servicio de tu entorno que creaste anteriormente.

Crea un archivo DAG

Un DAG de Airflow es una colección de tareas organizadas que deseas programar y ejecutar. Los DAG se definen en archivos estándares de Python.

En esta guía, se usa un DAG de Airflow de ejemplo definido en el archivo quickstart.py. El código de Python en este archivo hace lo siguiente:

  1. Crea un DAG composer_sample_dag. Este DAG se ejecuta todos los días.
  2. Ejecuta una tarea print_dag_run_conf. La tarea imprime la configuración de ejecución del DAG con el operador bash.

Guarda una copia del archivo quickstart.py en tu máquina local:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Sube el archivo de DAG al bucket de tu entorno

Cada entorno de Airflow administrado tiene un bucket de Cloud Storage asociado. Airflow en Managed Airflow programa solo los DAG que se encuentran en la carpeta /dags de este bucket.

Para programar tu DAG, sube quickstart.py desde tu máquina local a la carpeta /dags del entorno:

Para subir quickstart.py con Google Cloud CLI, ejecuta el siguiente comando en la carpeta en la que se encuentra el archivo quickstart.py:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

Cómo ver el DAG

Después de subir el archivo DAG, Airflow hace lo siguiente:

  1. Analiza el archivo DAG que subiste. Es posible que el DAG tarde unos minutos en estar disponible para Airflow.
  2. Agrega el DAG a la lista de DAGs disponibles.
  3. Ejecuta el DAG según el programa que proporcionaste en el archivo del DAG.

Comprueba que tu DAG se procese sin errores y esté disponible en Airflow. Para ello, míralo en la IU del DAG. La IU de DAG es la interfaz de Airflow administrado para ver la información del DAG en la consola de Google Cloud . Airflow administrado también proporciona acceso a la IU de Airflow, que es una interfaz web nativa de Airflow.

  1. Espera unos cinco minutos para que Airflow tenga tiempo de procesar el archivo DAG que subiste anteriormente y completar la primera ejecución del DAG (que se explica más adelante).

  2. Ejecuta el siguiente comando en Google Cloud CLI. Este comando ejecuta el comando de la CLI de Airflow dags list que enumera los DAGs en tu entorno.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Verifica que el DAG composer_quickstart aparezca en el resultado del comando.

    Resultado de ejemplo:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Cómo ver los detalles de la ejecución del DAG

Una sola ejecución de un DAG se denomina ejecución de DAG. Airflow ejecuta de inmediato una ejecución de DAG para el DAG de ejemplo porque la fecha de inicio en el archivo DAG está establecida en ayer. De esta manera, Airflow se pone al día con la programación del DAG especificado.

El DAG de ejemplo contiene una tarea, print_dag_run_conf, que ejecuta el comando echo en la consola. Este comando genera información de metadatos sobre el DAG (identificador numérico de la ejecución del DAG).

Ejecuta el siguiente comando en Google Cloud CLI. Este comando enumera las ejecuciones del DAG para el DAG composer_quickstart:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Resultado de ejemplo:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

La CLI de Airflow no proporciona un comando para ver los registros de tareas. Puedes usar otros métodos para ver los registros de tareas de Airflow: la IU de DAG de Airflow administrado, la IU de Airflow o Cloud Logging. En esta guía, se muestra una forma de consultar Cloud Logging para obtener registros de una ejecución de DAG específica.

Ejecuta el siguiente comando en Google Cloud CLI. Este comando lee los registros de Cloud Logging para una ejecución de DAG específica del DAG composer_quickstart. El argumento --format da formato al resultado para que solo se muestre el texto del mensaje de registro.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Reemplaza lo siguiente:

  • RUN_ID con el valor run_id del resultado del comando tasks states-for-dag-run que ejecutaste anteriormente Por ejemplo, 2024-02-17T15:38:38.969307+00:00.

Resultado de ejemplo:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Realiza una limpieza

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta página, borra el proyecto de Google Cloud que tiene los recursos.

Borra los recursos que se usaron en este instructivo:

  1. Borra el entorno de Managed Airflow:

    1. En la consola de Google Cloud , ve a la página Entornos.

      Ir a Entornos

    2. Selecciona example-environment y haz clic en Borrar.

    3. Espera hasta que se borre el entorno.

  2. Borra el bucket de tu entorno. Si borras el entorno de Managed Airflow, no se borrará el bucket.

    1. En la consola de Google Cloud , ve a la página Almacenamiento > Navegador.

      Ir a Almacenamiento > Navegador

    2. Selecciona el bucket del entorno y haz clic en Borrar. Por ejemplo, este bucket puede llamarse us-central1-example-environ-c1616fe8-bucket.

¿Qué sigue?