Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Auf dieser Seite wird gezeigt, wie Sie mit Google Transfer Operators in Ihren DAGs Daten aus anderen Diensten übertragen.
Google Transfer Operators
Google Transfer Operators sind eine Reihe von Airflow-Operatoren, mit denen Sie Daten aus anderen Diensten in abrufen können Google Cloud.
In dieser Anleitung werden Operatoren für Azure FileShare Storage und Amazon S3 gezeigt, die mit Cloud Storage funktionieren. Es gibt viele weitere Übertragungsoperatoren, die mit Diensten in und mit anderen Diensten als funktionieren. Google Cloud Google Cloud
Amazon S3 für Cloud Storage
In diesem Abschnitt wird gezeigt, wie Sie Daten von Amazon S3 mit einem Cloud Storage-Bucket synchronisieren.
Amazon-Anbieterpaket installieren
Das Paket apache-airflow-providers-amazon enthält die Verbindungstypen und Funktionen, die mit Amazon S3 interagieren.
Installieren Sie dieses PyPI-Paket in Ihrer
Umgebung.
Verbindung zu Amazon S3 konfigurieren
Das Amazon-Anbieterpaket bietet einen Verbindungstyp für Amazon S3. Erstellen Sie eine Verbindung dieses Typs. Die Verbindung für Cloud Storage mit dem Namen google_cloud_default ist bereits in Ihrer Umgebung eingerichtet.
So richten Sie eine Verbindung zu Amazon S3 ein:
- Wechseln Sie in der Airflow-UI zu Admin > Verbindungen.
- Erstellen Sie eine neue Verbindung.
- Wählen Sie
Amazon S3als Verbindungstyp aus. - Im folgenden Beispiel wird eine Verbindung mit dem Namen
aws_s3verwendet. Sie können diesen oder einen anderen Namen für die Verbindung verwenden. - Geben Sie die Verbindungsparameter wie in der Airflow-Dokumentation für die Amazon Web Services-Verbindung beschrieben an. Wenn Sie beispielsweise eine Verbindung mit AWS-Zugriffsschlüsseln einrichten möchten, generieren Sie einen Zugriffsschlüssel für Ihr Konto in AWS und geben dann die AWS-Zugriffsschlüssel-ID als Anmeldenamen und den geheimen AWS-Zugriffsschlüssel als Passwort für die Verbindung an.
Daten von Amazon S3 übertragen
Wenn Sie die synchronisierten Daten später in einem anderen DAG oder einer anderen Aufgabe verwenden möchten, verschieben Sie sie in den Ordner /data des Buckets Ihrer Umgebung. Dieser Ordner wird mit anderen Airflow-Workern synchronisiert, sodass Aufgaben in Ihrem DAG darauf zugreifen können.
Der folgende Beispiel-DAG führt Folgendes aus:
- Synchronisiert die Inhalte des Verzeichnisses
/data-for-gcsaus einem S3-Bucket mit dem Ordner/data/from-s3/data-for-gcs/im Bucket Ihrer Umgebung. - Wartet zwei Minuten, bis die Daten mit allen Airflow-Workern in Ihrer Umgebung synchronisiert wurden.
- Gibt die Liste der Dateien in diesem Verzeichnis mit dem Befehl
lsaus. Ersetzen Sie diese Aufgabe durch andere Airflow-Operatoren, die mit Ihren Daten funktionieren.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Azure FileShare für Cloud Storage
In diesem Abschnitt wird gezeigt, wie Sie Daten von Azure FileShare mit einem Cloud Storage-Bucket synchronisieren.
Microsoft Azure-Anbieterpaket installieren
Das Paket apache-airflow-providers-microsoft-azure enthält die Verbindungstypen und Funktionen, die mit Microsoft Azure interagieren.
Installieren Sie dieses PyPI-Paket in Ihrer
Umgebung.
Verbindung zu Azure FileShare konfigurieren
Das Microsoft Azure-Anbieterpaket bietet einen Verbindungstyp für Azure File Share. Erstellen Sie eine Verbindung dieses Typs. Die Verbindung für Cloud Storage mit dem Namen google_cloud_default ist bereits in Ihrer Umgebung eingerichtet.
So richten Sie eine Verbindung zu Azure FileShare ein:
- Wechseln Sie in der Airflow-UI zu Admin > Verbindungen.
- Erstellen Sie eine neue Verbindung.
- Wählen Sie
Azure FileShareals Verbindungstyp aus. - Im folgenden Beispiel wird eine Verbindung mit dem Namen
azure_fileshareverwendet. Sie können diesen oder einen anderen Namen für die Verbindung verwenden. - Geben Sie die Verbindungsparameter wie in der Airflow-Dokumentation für die Microsoft Azure File Share-Verbindung beschrieben an. Sie können beispielsweise eine Verbindungs-String für Ihren Zugriffsschlüssel für das Speicherkonto angeben.
Daten von Azure FileShare übertragen
Wenn Sie die synchronisierten Daten später in einem anderen DAG oder einer anderen Aufgabe verwenden möchten, verschieben Sie sie in den Ordner /data des Buckets Ihrer Umgebung. Dieser Ordner wird mit anderen Airflow-Workern synchronisiert, sodass Aufgaben in Ihrem DAG darauf zugreifen können.
Der folgende DAG führt Folgendes aus:
Der folgende Beispiel-DAG führt Folgendes aus:
- Synchronisiert die Inhalte des Verzeichnisses
/data-for-gcsaus Azure File Share mit dem Ordner/data/from-azureim Bucket Ihrer Umgebung. - Wartet zwei Minuten, bis die Daten mit allen Airflow-Workern in Ihrer Umgebung synchronisiert wurden.
- Gibt die Liste der Dateien in diesem Verzeichnis mit dem Befehl
lsaus. Ersetzen Sie diese Aufgabe durch andere Airflow-Operatoren, die mit Ihren Daten funktionieren.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files