Ajouter et mettre à jour des DAG

Airflow géré (3e génération) | Airflow géré (2e génération) | Airflow géré (1re génération héritée)

Cette page explique comment gérer les DAG dans votre environnement Airflow géré.

Managed Airflow utilise un bucket Cloud Storage pour stocker les DAGs de votre environnement Managed Airflow. Votre environnement synchronise les DAG de ce bucket avec les composants Airflow, tels que les nœuds de calcul Airflow et les programmeurs.

Avant de commencer

  • Étant donné qu'Apache Airflow n'offre pas une forte isolation DAG, nous vous recommandons de maintenir des environnements de production et de test distincts afin d'éviter les interférences au niveau des DAG. Pour en savoir plus, reportez-vous à la section Tester les DAG.
  • Assurez-vous que votre compte dispose des autorisations suffisantes pour gérer les DAG.
  • Les modifications apportées aux DAG sont propagées à Airflow dans un délai de 3 à 5 minutes. Vous pouvez consulter l'état des tâches dans l'interface utilisateur d'Airflow.

Accéder au bucket de votre environnement

Pour accéder au bucket associé à votre environnement :

Console

  1. Dans la Google Cloud console, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne portant le nom de votre environnement, puis cliquez sur le lien DAG dans la colonne Dossier des DAG. La page Détails du bucket s'ouvre. Elle affiche le contenu du dossier /dags dans le bucket de votre environnement.

gcloud

gcloud CLI dispose de commandes distinctes pour ajouter et supprimer des DAG dans le bucket de votre environnement.

Si vous souhaitez interagir avec le bucket de votre environnement, vous pouvez également utiliser Google Cloud CLI. Pour obtenir l'adresse du bucket de votre environnement, exécutez la commande suivante de gcloud CLI :

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.

Exemple :

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Rédigez une requête API environments.get. Dans la ressource Environment, dans la EnvironmentConfig, dans la ressource dagGcsPrefix se trouve l'adresse du bucket de votre environnement.

Exemple :

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Utilisez la bibliothèque google-auth pour obtenir des identifiants et la bibliothèque requests pour appeler l'API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Ajouter ou mettre à jour un DAG

Pour ajouter ou mettre à jour un DAG, déplacez le fichier Python .py du DAG vers le dossier /dags dans le bucket de l'environnement.

Console

  1. Dans la Google Cloud console, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne portant le nom de votre environnement, puis cliquez sur le lien DAG dans la colonne Dossier des DAG. La page Détails du bucket s'ouvre. Elle affiche le contenu du dossier /dags dans le bucket de votre environnement.

  3. Cliquez sur Importer des fichiers. Sélectionnez ensuite le fichier Python .py du DAG à l'aide de la boîte de dialogue du navigateur, puis confirmez.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • LOCAL_FILE_TO_UPLOAD par le fichier Python .py du DAG.

Exemple :

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Mettre à jour un DAG contenant des exécutions de DAG actives

Si vous mettez à jour un DAG contenant des exécutions DAG actives :

  • Toutes les tâches en cours d'exécution finissent par utiliser le fichier DAG d'origine.
  • Toutes les tâches planifiées, mais qui ne sont pas en cours d'exécution, utiliseront le fichier DAG mis à jour.
  • Toutes les tâches qui ne sont plus présentes dans le fichier DAG mis à jour sont marquées comme supprimées.

Mettre à jour des DAG qui s'exécutent régulièrement

Une fois que vous avez importé un fichier DAG, Airflow met un certain temps à le charger et à mettre à jour le DAG. Si votre DAG s'exécute régulièrement, vous pouvez vous assurer que le DAG utilise la version mise à jour du fichier DAG. Pour ce faire :

  1. Suspendez le DAG dans l'interface utilisateur d'Airflow.

  2. Importez un fichier DAG mis à jour.

  3. Attendez que les mises à jour s'affichent dans l'interface utilisateur d'Airflow. Cela signifie que le DAG a été correctement analysé par le programmeur et mis à jour dans la base de données Airflow.

    Si l'interface utilisateur d'Airflow affiche les DAG mis à jour, cela ne garantit pas qu'ils utilisent bien la version mise à jour du fichier DAG. Ce comportement est dû au fait que les fichiers DAG sont synchronisés indépendamment pour les programmeurs et les nœuds de calcul.

  4. Vous pouvez prolonger le temps d'attente pour vous assurer que le fichier DAG est synchronisé avec tous les nœuds de calcul de votre environnement. La synchronisation a lieu plusieurs fois par minute. Dans un environnement qui fonctionne normalement, il suffit d'attendre 20 à 30 secondes pour que tous les nœuds de calcul soient synchronisés.

  5. (Facultatif) Pour être certain que tous les nœuds de calcul disposent de la nouvelle version du fichier DAG, consultez les journaux de chaque nœud de calcul. Pour ce faire :

    1. Ouvrez l'onglet Journaux de votre environnement dans Google Cloud la console.

    2. Accédez à l'élément Journaux Composer > Infrastructure > Synchronisation Cloud Storage et inspectez les journaux de chaque nœud de calcul de votre environnement. Recherchez l'élément de journal Syncing dags directory le plus récent dont l'horodatage est postérieur à l'importation du nouveau fichier DAG. Si Finished syncing s'affiche après l'élément de journal, les DAG sont correctement synchronisés sur ce nœud de calcul.

  6. Relancez le DAG.

Analyser à nouveau un DAG

Étant donné que les DAG sont stockés dans le bucket de l'environnement, chaque DAG est d'abord synchronisé avec le processeur DAG, puis le processeur DAG l'analyse avec un court délai. Si vous analysez à nouveau un DAG manuellement, par exemple, via l'interface utilisateur d'Airflow, le processeur DAG analyse à nouveau la version actuelle du DAG qui lui est disponible, qui n'est peut-être pas la dernière version du DAG que vous avez importée dans le bucket de l'environnement.

Nous vous recommandons d'utiliser l'analyse à la demande uniquement si les temps d'analyse sont longs. Par exemple, cela peut se produire si votre environnement comporte un grand nombre de fichiers ou si un long intervalle d'analyse de DAG est configuré dans les options de configuration d'Airflow.

Supprimer un DAG dans votre environnement

Pour supprimer un DAG, supprimez le fichier Python .py du DAG du dossier /dags de l'environnement dans le bucket de votre environnement.

Console

  1. Dans la Google Cloud console, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, recherchez une ligne portant le nom de votre environnement, puis cliquez sur le lien DAG dans la colonne Dossier des DAG. La page Détails du bucket s'ouvre. Elle affiche le contenu du dossier /dags dans le bucket de votre environnement.

  3. Sélectionnez le fichier DAG, cliquez sur Supprimer, puis confirmez l'opération.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • DAG_FILE par le fichier Python .py du DAG.

Exemple :

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Supprimer un DAG de l'interface utilisateur d'Airflow

Pour supprimer les métadonnées d'un DAG à partir de l'interface utilisateur d'Airflow :

Interface utilisateur d'Airflow

  1. Accédez à l'interface utilisateur d'Airflow pour votre environnement.
  2. Pour le DAG, cliquez sur Supprimer le DAG.

gcloud

Exécutez la commande suivante dans gcloud CLI :

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région dans laquelle se trouve l'environnement.
  • DAG_NAME est le nom du DAG à supprimer.

Étape suivante