Exécuter un DAG Apache Airflow dans Managed Airflow (2e génération) (Google Cloud CLI)

Managed Airflow (3e génération) | Managed Airflow (2e génération) | Managed Airflow (ancienne 1re génération)

Ce guide de démarrage rapide vous explique comment créer un environnement Managed Service pour Apache Airflow et exécuter un DAG Apache Airflow dans Managed Airflow (2e génération).

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud . Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits sans frais pour exécuter, tester et déployer des charges de travail.
  2. Installez la Google Cloud CLI.

  3. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

  4. Pour initialiser la gcloud CLI, exécutez la commande suivante :

    gcloud init
  5. Créez ou sélectionnez un projet Google Cloud .

    Rôles requis pour sélectionner ou créer un projet

    • Sélectionnez un projet : la sélection d'un projet ne nécessite pas de rôle IAM spécifique. Vous pouvez sélectionner n'importe quel projet pour lequel un rôle vous a été attribué.
    • Créer un projet : pour créer un projet, vous devez disposer du rôle Créateur de projet (roles/resourcemanager.projectCreator), qui contient l'autorisation resourcemanager.projects.create. Découvrez comment attribuer des rôles.
    • Créez un projet Google Cloud  :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud .

  6. Vérifiez que la facturation est activée pour votre projet Google Cloud .

  7. Installez la Google Cloud CLI.

  8. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

  9. Pour initialiser la gcloud CLI, exécutez la commande suivante :

    gcloud init
  10. Créez ou sélectionnez un projet Google Cloud .

    Rôles requis pour sélectionner ou créer un projet

    • Sélectionnez un projet : la sélection d'un projet ne nécessite pas de rôle IAM spécifique. Vous pouvez sélectionner n'importe quel projet pour lequel un rôle vous a été attribué.
    • Créer un projet : pour créer un projet, vous devez disposer du rôle Créateur de projet (roles/resourcemanager.projectCreator), qui contient l'autorisation resourcemanager.projects.create. Découvrez comment attribuer des rôles.
    • Créez un projet Google Cloud  :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud .

  11. Vérifiez que la facturation est activée pour votre projet Google Cloud .

  12. Activez l'API Managed Airflow :

    Rôles requis pour activer les API

    Pour activer les API, vous avez besoin du rôle IAM Administrateur Service Usage (roles/serviceusage.serviceUsageAdmin), qui contient l'autorisation serviceusage.services.enable. Découvrez comment attribuer des rôles.

    gcloud services enable composer.googleapis.com
  13. Pour obtenir les autorisations nécessaires pour suivre ce guide de démarrage rapide, demandez à votre administrateur de vous accorder les rôles IAM suivants sur votre projet :

    Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

    Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.

Créer le compte de service d'un environnement

Lorsque vous créez un environnement, vous spécifiez un compte de service. Ce compte de service est appelé compte de service de l'environnement. Votre environnement utilise ce compte de service pour effectuer la plupart des opérations.

Le compte de service de votre environnement n'est pas un compte utilisateur. Un compte de service est un type particulier de compte utilisé par une application ou une instance de machine virtuelle (VM), et non par une personne.

Pour créer un compte de service pour votre environnement :

  1. Créez un compte de service, comme décrit dans la documentation Identity and Access Management.

  2. Attribuez-lui un rôle, comme décrit dans la documentation Identity and Access Management. Le rôle requis est Nœud de calcul Composer (composer.worker).

Créez un environnement

S'il s'agit du premier environnement de votre projet, ajoutez le compte de l'agent de service Managed Airflow en tant que principal sur le compte de service de votre environnement et accordez-lui le rôle roles/composer.ServiceAgentV2Ext.

Par défaut, votre environnement utilise le compte de service Compute Engine par défaut. L'exemple suivant montre comment ajouter cette autorisation à ce compte.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    ENVIRONMENT_SERVICE_ACCOUNT \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Remplacez ENVIRONMENT_SERVICE_ACCOUNT par le compte de service de votre environnement que vous avez créé précédemment.

Créez un environnement nommé example-environment dans la région us-central1, avec la dernière version de Managed Airflow (2e génération).

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.17.0-airflow-2.11.1 \
    --service-account ENVIRONMENT_SERVICE_ACCOUNT

Remplacez ENVIRONMENT_SERVICE_ACCOUNT par le compte de service de votre environnement que vous avez créé précédemment.

Créer un fichier DAG

Un DAG Airflow est un ensemble de tâches organisées que vous souhaitez programmer et exécuter. Les DAG sont définis dans des fichiers Python standards.

Ce guide utilise un exemple de DAG Airflow défini dans le fichier quickstart.py. Le code Python de ce fichier effectue les opérations suivantes :

  1. Il crée un DAG, composer_sample_dag, Ce DAG s'exécute tous les jours.
  2. Il exécute une tâche, print_dag_run_conf, qui imprime la configuration de l'exécution du DAG à l'aide de l'opérateur bash.

Enregistrez une copie du fichier quickstart.py sur votre ordinateur local :

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Importer le fichier DAG dans le bucket de votre environnement

Chaque environnement Managed Airflow est associé à un bucket Cloud Storage. Airflow dans Managed Airflow ne planifie que les DAG qui se trouvent dans le dossier /dags de ce bucket.

Pour programmer votre DAG, importez le fichier quickstart.py à partir de votre ordinateur local dans le dossier /dags de votre environnement :

Pour importer quickstart.py avec Google Cloud CLI, exécutez la commande suivante dans le dossier où se trouve le fichier quickstart.py :

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

Afficher le DAG

Une fois le fichier DAG importé, Airflow effectue les opérations suivantes :

  1. Analyse le fichier DAG que vous avez importé. L'affichage du DAG dans Airflow peut prendre quelques minutes.
  2. Ajoute le DAG à la liste des DAG disponibles.
  3. Exécute le DAG selon le calendrier que vous avez fourni dans le fichier DAG.

Vérifiez que votre DAG est traité sans erreur et qu'il est disponible dans Airflow en l'affichant dans l'UI DAG. L'UI DAG est l'interface Managed Airflow permettant d'afficher les informations sur les DAG dans la console Google Cloud . Managed Airflow permet également d'accéder à l'interface utilisateur Airflow, qui est une interface Web Airflow native.

  1. Attendez environ cinq minutes pour laisser à Airflow le temps de traiter le fichier DAG que vous avez importé précédemment et d'effectuer la première exécution du DAG (expliquée plus loin).

  2. Exécutez la commande suivante dans Google Cloud CLI. Cette commande exécute la commande de CLI Airflow dags list qui liste les DAG dans votre environnement.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Vérifiez que le DAG composer_quickstart est listé dans le résultat de la commande.

    Exemple de résultat :

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Afficher les détails d'une exécution de DAG

Une seule exécution d'un DAG est appelée exécution de DAG. Airflow exécute immédiatement une exécution DAG pour l'exemple de DAG, car la date de début dans le fichier DAG est définie sur "hier". Airflow rattrape ainsi la planification du DAG spécifié.

L'exemple de DAG contient une tâche, print_dag_run_conf, qui exécute la commande echo dans la console. Cette commande génère des métadonnées sur le DAG (identifiant numérique de l'exécution du DAG).

Exécutez la commande suivante dans Google Cloud CLI. Cette commande liste les exécutions de DAG pour le DAG composer_quickstart :

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Exemple de résultat :

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

La CLI Airflow ne fournit pas de commande permettant d'afficher les journaux des tâches. Vous pouvez utiliser d'autres méthodes pour afficher les journaux des tâches Airflow : l'UI DAG Managed Airflow, l'UI Airflow ou Cloud Logging. Ce guide explique comment interroger Cloud Logging pour obtenir les journaux d'une exécution de DAG spécifique.

Exécutez la commande suivante dans Google Cloud CLI. Cette commande lit les journaux de Cloud Logging pour une exécution de DAG spécifique du DAG composer_quickstart. L'argument --format met en forme la sortie de sorte que seul le texte du message de journal soit affiché.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Remplacez :

  • RUN_ID par la valeur run_id issue du résultat de la commande tasks states-for-dag-run que vous avez exécutée précédemment. Par exemple, 2024-02-17T15:38:38.969307+00:00.

Exemple de résultat :

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Effectuer un nettoyage

Pour éviter que les ressources utilisées dans cette démonstration soient facturées sur votre compte Google Cloud , supprimez le projet Google Cloud qui les contient.

Supprimez les ressources utilisées dans ce tutoriel :

  1. Supprimez l'environnement Managed Airflow :

    1. Dans la console Google Cloud , accédez à la page Environnements.

      Accéder à la page Environnements

    2. Sélectionnez example-environment, puis cliquez sur Supprimer.

    3. Attendez que l'environnement soit supprimé.

  2. Supprimez le bucket de votre environnement. La suppression de l'environnement Managed Airflow ne supprime pas son bucket.

    1. Dans la console Google Cloud , accédez à la page Stockage > Navigateur.

      Accéder à Cloud Storage > Navigateur

    2. Sélectionnez le bucket de l'environnement, puis cliquez sur Supprimer. Par exemple, ce bucket peut être nommé us-central1-example-environ-c1616fe8-bucket.

Étapes suivantes