Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
Diese Anleitung ist eine Änderung von Einen Datenanalyse-DAG ausführen Google Cloud und zeigt, wie Sie Ihre Managed Airflow-Umgebung mit Microsoft Azure verbinden, um dort gespeicherte Daten zu nutzen. Außerdem wird gezeigt, wie Sie mit Managed Airflow einen Apache Airflow-DAG erstellen. Der DAG verknüpft Daten aus einem öffentlichen BigQuery-Dataset und einer CSV-Datei, die in einem Azure Blob Storage gespeichert ist, und führt dann einen Managed Service for Apache Spark-Batchjob aus, um die verknüpften Daten zu verarbeiten.
Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, eine integrierte Datenbank mit Klimazusammenfassungen aus aller Welt. Die CSV-Datei enthält Informationen zu den Daten und Namen von Feiertagen in den USA von 1997 bis 2021.
Die Frage, die wir mit dem DAG beantworten möchten, lautet: „Wie warm war es in Chicago an Thanksgiving in den letzten 25 Jahren?“
Ziele
- Managed Airflow-Umgebung in der Standardkonfiguration erstellen
- Blob in Azure erstellen
- Leeres BigQuery-Dataset erstellen
- Neuen Cloud Storage-Bucket erstellen
- DAG mit den folgenden Aufgaben erstellen und ausführen:
- Externes Dataset aus Azure Blob Storage in Cloud Storage laden
- Externes Dataset aus Cloud Storage in BigQuery laden
- Zwei Datasets in BigQuery verknüpfen
- PySpark-Job für die Datenanalyse ausführen
Hinweis
APIs aktivieren
Aktivieren Sie folgende APIs:
Console
Aktivieren Sie die Managed Service for Apache Spark, Managed Airflow, BigQuery und Cloud Storage APIs.
Rollen, die zum Aktivieren von APIs erforderlich sind
Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“
(roles/serviceusage.serviceUsageAdmin), die
die Berechtigung serviceusage.services.enable enthält. Informationen zum Zuweisen von
Rollen.
gcloud
Aktivieren Sie die Managed Service for Apache Spark, Managed Airflow, BigQuery und Cloud Storage APIs:
Rollen, die zum Aktivieren von APIs erforderlich sind
Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Informationen zum Zuweisen von
Rollen.
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Berechtigungen erteilen
Erteilen Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen:
Rollen für die Verwaltung von Managed Airflow-Umgebungen und Umgebungs-Buckets erteilen.
Erteilen Sie die Rolle BigQuery-Dateninhaber (
roles/bigquery.dataOwner), um ein BigQuery-Dataset zu erstellen.Erteilen Sie die Rolle Storage-Administrator (
roles/storage.admin), um einen Cloud Storage-Bucket zu erstellen.
Managed Airflow-Umgebung erstellen und vorbereiten
Erstellen Sie eine Managed Airflow-Umgebung mit den Standard parametern:
- Wählen Sie eine Region in den USA aus.
- Wählen Sie die neueste Managed Airflow-Version aus.
Erteilen Sie dem in Ihrer Managed Airflow-Umgebung verwendeten Dienstkonto die folgenden Rollen, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausführen können:
- BigQuery-Nutzer (
roles/bigquery.user) - BigQuery-Dateninhaber (
roles/bigquery.dataOwner) - Dienstkontonutzer (
roles/iam.serviceAccountUser) - Dataproc-Bearbeiter (
roles/dataproc.editor) - Dataproc-Worker (
roles/dataproc.worker)
- BigQuery-Nutzer (
Zugehörige Ressourcen in erstellen und ändern Google Cloud
Installieren Sie das
apache-airflow-providers-microsoft-azurePyPI-Paket in Ihrer Managed Airflow-Umgebung.Erstellen Sie ein leeres BigQuery-Dataset mit den folgenden Parametern:
- Name:
holiday_weather - Region:
US
- Name:
Erstellen Sie einen neuen Cloud Storage-Bucket in der
USMultiregion.Führen Sie den folgenden Befehl aus, um den privaten Google-Zugriff im Standardsubnetz in der Region zu aktivieren, in der Sie Managed Service for Apache Spark ausführen möchten, um die Netzwerkanforderungenzu erfüllen. Wir empfehlen, dieselbe Region wie für Ihre Managed Airflow-Umgebung zu verwenden.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Zugehörige Ressourcen in Azure erstellen
Erstellen Sie ein Speicherkonto mit den Standardeinstellungen.
Rufen Sie den Zugriffsschlüssel und die Verbindungsstring ab für Ihr Speicherkonto.
Erstellen Sie in Ihrem neu erstellten Speicherkonto einen Container mit den Standardoptionen.
Erteilen Sie die Rolle „Storage Blob Delegator“ für den im vorherigen Schritt erstellten Container.
Laden Sie holidays.csv hoch, um im Azure-Portal ein Blockblob mit den Standardoptionen zu erstellen.
Erstellen Sie ein SAS-Token für das im vorherigen Schritt im Azure-Portal erstellte Blockblob.
- Signiermethode: Schlüssel für die Nutzerdelegierung
- Berechtigungen: Lesen
- Zulässige IP-Adresse: Keine
- Zulässige Protokolle: Nur HTTPS
Von Managed Airflow aus mit Azure verbinden
Fügen Sie Ihre Microsoft Azure Verbindung über die Airflow-Benutzeroberfläche hinzu:
Rufen Sie Admin > Verbindungen auf.
Erstellen Sie eine neue Verbindung mit der folgenden Konfiguration:
- Verbindungs-ID:
azure_blob_connection - Verbindungstyp:
Azure Blob Storage - Blob Storage-Anmeldung:Name Ihres Speicherkontos
- Blob Storage-Schlüssel:Zugriffsschlüssel für Ihr Speicherkonto
- Verbindungsstring für das Blob Storage-Konto:Verbindungsstring für Ihr Speicherkonto
- SAS-Token:SAS-Token, das aus Ihrem Blob generiert wurde
- Verbindungs-ID:
Datenverarbeitung mit Managed Service for Apache Spark
Beispiel für einen PySpark-Job
Der folgende Code ist ein Beispiel für einen PySpark-Job, der die Temperatur von Zehntelgrad Celsius in Grad Celsius umwandelt. Mit diesem Job werden Temperaturdaten aus dem Dataset in ein anderes Format konvertiert.
PySpark-Datei in Cloud Storage hochladen
So laden Sie die PySpark-Datei in Cloud Storage hoch:
Speichern Sie data_analytics_process.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Cloud Storage-Browser auf:
Klicken Sie auf den Namen des zuvor erstellten Buckets.
Klicken Sie auf dem Tab Objekte für den Bucket auf die Schaltfläche Dateien hochladen , wählen Sie im angezeigten Dialogfeld
data_analytics_process.pyaus und klicken Sie auf Öffnen.
DAG für die Datenanalyse
Beispiel-DAG
Der DAG verwendet mehrere Operatoren, um die Daten zu transformieren und zu vereinheitlichen:
Der
AzureBlobStorageToGCSOperatorüberträgt die Datei „holidays.csv“ von Ihrem Azure-Blockblob in Ihren Cloud Storage-Bucket.Der
GCSToBigQueryOperatornimmt die Datei „holidays.csv“ aus Cloud Storage in eine neue Tabelle im BigQueryholidays_weatherDataset auf, das Sie zuvor erstellt haben.Der
DataprocCreateBatchOperatorerstellt und führt einen PySpark-Batchjob mit Managed Service for Apache Spark aus.Der
BigQueryInsertJobOperatorverknüpft die Daten aus holidays.csv in der Spalte „Date“ mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d. DieBigQueryInsertJobOperatorAufgaben werden dynamisch mit einer for-Schleife generiert und befinden sich in einerTaskGroupum die Lesbarkeit in der Diagrammansicht der Airflow-Benutzeroberfläche zu verbessern.
Variablen über die Airflow-Benutzeroberfläche hinzufügen
In Airflow, Variablen sind eine universelle Möglichkeit, beliebige Einstellungen oder Konfigurationen als einfachen Schlüssel-Wert-Speicher zu speichern und abzurufen. Dieser DAG verwendet Airflow-Variablen, um allgemeine Werte zu speichern. So fügen Sie sie Ihrer Umgebung hinzu:
Rufen Sie die Airflow-Benutzeroberfläche über die Managed Airflow-Konsole auf.
Rufen Sie Admin > Variablen auf.
Fügen Sie die folgenden Variablen hinzu:
gcp_project: Ihre Projekt-ID.gcs_bucket: Der Name des zuvor erstellten Buckets (ohne das Präfixgs://).gce_region: Die Region, in der Sie Ihren Managed Service for Apache Spark-Job ausführen möchten, der die Netzwerkanforderungen von Managed Service for Apache Spark erfüllt. Dies ist die Region, in der Sie zuvor den privaten Google-Zugriff aktiviert haben.dataproc_service_account: Das Dienstkonto für Ihre Managed Airflow-Umgebung. Sie finden dieses Dienstkonto auf dem Tab „Umgebungskonfiguration“ für Ihre Managed Airflow-Umgebung.azure_blob_name: Der Name des zuvor erstellten Blobs.azure_container_name: Der Name des zuvor erstellten Containers.
DAG in den Bucket Ihrer Umgebung hochladen
Managed Airflow plant DAGs, die sich im Ordner /dags im Bucket Ihrer Umgebung befinden. So laden Sie den DAG über die
Google Cloud Console hoch:
Speichern Sie „azureblobstoretogcsoperator_tutorial.py“ auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAG-Ordner Ihrer Umgebung wird geöffnet.
Klicken Sie auf Dateien hochladen.
Wählen Sie auf Ihrem lokalen Computer
azureblobstoretogcsoperator_tutorial.pyaus und klicken Sie auf Öffnen.
DAG auslösen
Klicken Sie in Ihrer Managed Airflow-Umgebung auf den Tab DAGs.
Klicken Sie auf die DAG-ID
azure_blob_to_gcs_dag.Klicken Sie auf DAG auslösen.
Warten Sie etwa fünf bis zehn Minuten, bis ein grünes Häkchen angezeigt wird, das darauf hinweist, dass die Aufgaben erfolgreich abgeschlossen wurden.
Erfolg des DAG prüfen
Rufen Sie in der Google Cloud Console die Seite BigQuery auf.
Klicken Sie im Bereich Explorer auf den Namen Ihres Projekts.
Klicken Sie auf
holidays_weather_joined.Klicken Sie auf „Vorschau“, um die resultierende Tabelle anzusehen. Die Zahlen in der Spalte „Wert“ sind in Zehntelgrad Celsius angegeben.
Klicken Sie auf
holidays_weather_normalized.Klicken Sie auf „Vorschau“, um die resultierende Tabelle anzusehen. Die Zahlen in der Spalte „Wert“ sind in Grad Celsius angegeben.
Bereinigen
Löschen Sie die einzelnen Ressourcen, die Sie für diese Anleitung erstellt haben:
Löschen Sie den Cloud Storage-Bucket, den Sie für diese Anleitung erstellt haben.
Löschen Sie die Managed Airflow-Umgebung, einschließlich des manuellen Löschens des Buckets der Umgebung.