Airflow géré (3e génération) | Airflow géré (2e génération) | Airflow géré (1re génération, version héritée)
Cette page explique comment transférer des données depuis d'autres services avec les opérateurs de transfert Google dans vos DAG.
À propos des opérateurs de transfert Google
Les opérateurs de transfert Google sont un ensemble d'opérateurs Airflow que vous pouvez utiliser pour extraire des données d'autres services dans Google Cloud.
Ce guide présente les opérateurs pour Azure FileShare Storage et Amazon S3 qui fonctionnent avec Cloud Storage. Il existe de nombreux autres opérateurs de transfert qui fonctionnent avec des services dans Google Cloud et avec des services autres que Google Cloud.
Avant de commencer
- Ce guide est destiné à Airflow 2. Si votre environnement utilise Airflow 1, usez des packages de fournisseur de rétroportage pour importer des opérateurs et rendre les types de connexion requis disponibles dans votre environnement.
Amazon S3 vers Cloud Storage
Cette section explique comment synchroniser des données d'Amazon S3 vers un bucket Cloud Storage.
Installer le package de fournisseur Amazon
Le package apache-airflow-providers-amazon contient les types de connexion et les fonctionnalités qui interagissent avec Amazon S3.
Installez ce package PyPI dans votre
environnement.
Configurer une connexion à Amazon S3
Le package de fournisseur Amazon fournit un type de connexion pour Amazon S3. Vous créez une connexion de ce type. La connexion pour Cloud Storage, nommée google_cloud_default, est déjà configurée dans votre environnement.
Configurez une connexion à Amazon S3 de la manière suivante :
- Dans l'interface utilisateur Airflow, accédez à Admin > Connections.
- Créez une connexion.
- Sélectionnez
Amazon S3comme type de connexion. - L'exemple suivant utilise une connexion nommée
aws_s3. Vous pouvez utiliser ce nom ou tout autre nom pour la connexion. - Spécifiez les paramètres de connexion comme décrit dans la documentation Airflow pour la connexion Amazon Web Services. Par exemple, pour configurer une connexion avec des clés d'accès AWS, générez une clé d'accès pour votre compte sur AWS, puis fournissez l'ID de clé d'accès AWS comme identifiant et la clé d'accès secrète AWS comme mot de passe pour la connexion.
Transférer des données depuis Amazon S3
Si vous souhaitez utiliser les données synchronisées ultérieurement dans un autre DAG ou une autre tâche, extrayez-les dans le dossier /data du bucket de votre environnement. Ce dossier est synchronisé avec d'autres nœuds de calcul Airflow, de sorte que les tâches de votre DAG peuvent l'utiliser.
L'exemple de DAG suivant effectue les opérations suivantes :
- Synchronise le contenu du répertoire
/data-for-gcsd'un bucket S3 avec le dossier/data/from-s3/data-for-gcs/dans le bucket de votre environnement. - Attend deux minutes que les données soient synchronisées avec tous les nœuds de calcul Airflow de votre environnement.
- Affiche la liste des fichiers de ce répertoire à l'aide de la commande
ls. Remplacez cette tâche par d'autres opérateurs Airflow qui fonctionnent avec vos données.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Azure FileShare vers Cloud Storage
Cette section explique comment synchroniser des données d'Azure FileShare vers un bucket Cloud Storage.
Installer le package de fournisseur Microsoft Azure
Le package apache-airflow-providers-microsoft-azure contient les types de connexion et les fonctionnalités qui interagissent avec Microsoft Azure.
Installez ce package PyPI dans votre
environnement.
Configurer une connexion à Azure FileShare
Le package de fournisseur Microsoft Azure fournit un type de connexion pour Azure File Share. Vous créez une connexion de ce type. La connexion pour Cloud Storage, nommée google_cloud_default, est déjà configurée dans votre environnement.
Configurez une connexion à Azure FileShare de la manière suivante :
- Dans l'interface utilisateur Airflow, accédez à Admin > Connections.
- Créez une connexion.
- Sélectionnez
Azure FileSharecomme type de connexion. - L'exemple suivant utilise une connexion nommée
azure_fileshare. Vous pouvez utiliser ce nom ou tout autre nom pour la connexion. - Spécifiez les paramètres de connexion comme décrit dans la documentation Airflow pour la connexion Microsoft Azure File Share. Par exemple, vous pouvez spécifier une chaîne de connexion pour votre clé d'accès au compte de stockage.
Transférer des données depuis Azure FileShare
Si vous souhaitez utiliser les données synchronisées ultérieurement dans un autre DAG ou une autre tâche, extrayez-les dans le dossier /data du bucket de votre environnement. Ce dossier est synchronisé avec d'autres nœuds de calcul Airflow, de sorte que les tâches de votre DAG peuvent l'utiliser.
Le DAG suivant effectue les opérations suivantes :
L'exemple de DAG suivant effectue les opérations suivantes :
- Synchronise le contenu du répertoire
/data-for-gcsd'Azure File Share avec le dossier/data/from-azuredans le bucket de votre environnement. - Attend deux minutes que les données soient synchronisées avec tous les nœuds de calcul Airflow de votre environnement.
- Affiche la liste des fichiers de ce répertoire à l'aide de la commande
ls. Remplacez cette tâche par d'autres opérateurs Airflow qui fonctionnent avec vos données.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files