Managed Airflow (Gen 3) | Managed Airflow (Gen 2) | Managed Airflow (Legacy Gen 1)
In dieser Anleitung wird gezeigt, wie Sie mit Managed Airflow einen Apache Airflow-DAG erstellen. Im DAG werden Daten aus einem öffentlichen BigQuery-Dataset und einer in einem Cloud Storage-Bucket gespeicherten CSV-Datei zusammengeführt. Anschließend wird ein Batchjob für Managed Service for Apache Spark ausgeführt, um die zusammengeführten Daten zu verarbeiten.
Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, eine integrierte Datenbank mit Klimazusammenfassungen weltweit. Die CSV-Datei enthält Informationen zu den Daten und Namen von Feiertagen in den USA von 1997 bis 2021.
Die Frage, die wir mit dem DAG beantworten möchten, lautet: „Wie warm war es in Chicago an Thanksgiving in den letzten 25 Jahren?“
Ziele
- Managed Airflow-Umgebung in der Standardkonfiguration erstellen
- Leeres BigQuery-Dataset erstellen
- Neuen Cloud Storage-Bucket erstellen
- Erstellen und führen Sie eine DAG mit den folgenden Aufgaben aus:
- Externes Dataset aus Cloud Storage in BigQuery laden
- Zwei Datasets in BigQuery zusammenführen
- PySpark-Job für die Datenanalyse ausführen
Hinweis
APIs aktivieren
Aktivieren Sie folgende APIs:
Console
Aktivieren Sie die APIs für Managed Service for Apache Spark, Managed Airflow, BigQuery und Cloud Storage.
Rollen, die zum Aktivieren von APIs erforderlich sind
Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Weitere Informationen zum Zuweisen von Rollen
gcloud
Aktivieren Sie die APIs für Managed Service for Apache Spark, Managed Airflow, BigQuery und Cloud Storage:
Rollen, die zum Aktivieren von APIs erforderlich sind
Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Weitere Informationen zum Zuweisen von Rollen
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Berechtigungen erteilen
Weisen Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen zu:
Rollen zum Verwalten von Managed Airflow-Umgebungen und Umgebungs-Buckets zuweisen
Weisen Sie die Rolle BigQuery-Dateninhaber (
roles/bigquery.dataOwner) zu, um ein BigQuery-Dataset zu erstellen.Weisen Sie die Rolle Storage-Administrator (
roles/storage.admin) zu, um einen Cloud Storage-Bucket zu erstellen.
Managed Airflow-Umgebung erstellen und vorbereiten
Managed Airflow-Umgebung mit Standardparametern erstellen:
- Wählen Sie eine Region in den USA aus.
- Wählen Sie die aktuelle Managed Airflow-Version aus.
Weisen Sie dem Dienstkonto, das in Ihrer verwalteten Airflow-Umgebung verwendet wird, die folgenden Rollen zu, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausführen können:
- BigQuery-Nutzer (
roles/bigquery.user) - BigQuery-Dateninhaber (
roles/bigquery.dataOwner) - Dienstkontonutzer (
roles/iam.serviceAccountUser) - Dataproc-Bearbeiter (
roles/dataproc.editor) - Dataproc-Worker (
roles/dataproc.worker)
- BigQuery-Nutzer (
Zugehörige Ressourcen erstellen
Leeres BigQuery-Dataset erstellen mit den folgenden Parametern:
- Name:
holiday_weather - Region:
US
- Name:
Erstellen Sie einen neuen Cloud Storage-Bucket in der Multiregion
US.Führen Sie den folgenden Befehl aus, um privaten Google-Zugriff im Standardsubnetz in der Region zu aktivieren, in der Sie Managed Service for Apache Spark ausführen möchten, um die Netzwerkanforderungen zu erfüllen. Es wird empfohlen, dieselbe Region wie für Ihre verwaltete Airflow-Umgebung zu verwenden.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Datenverarbeitung mit Managed Service for Apache Spark
PySpark-Beispieljob ansehen
Der unten gezeigte Code ist ein Beispiel für einen PySpark-Job, der die Temperatur von Zehntelgrad Celsius in Grad Celsius umrechnet. Mit diesem Job werden Temperaturdaten aus dem Dataset in ein anderes Format konvertiert.
Unterstützende Dateien in Cloud Storage hochladen
So laden Sie die PySpark-Datei und das Dataset hoch, die in holidays.csv gespeichert sind:
Speichern Sie data_analytics_process.py auf Ihrem lokalen Computer.
Speichern Sie holidays.csv auf Ihrem lokalen Computer.
Wechseln Sie in der Google Cloud Console zur Seite Cloud Storage-Browser:
Klicken Sie auf den Namen des Buckets, den Sie zuvor erstellt haben.
Klicken Sie auf dem Tab Objekte für den Bucket auf die Schaltfläche Dateien hochladen, wählen Sie im angezeigten Dialogfeld
data_analytics_process.pyundholidays.csvaus und klicken Sie auf Öffnen.
DAG für die Datenanalyse
Beispiel-DAG ansehen
Im DAG werden mehrere Operatoren verwendet, um die Daten zu transformieren und zu vereinheitlichen:
Mit dem
GCSToBigQueryOperator-Task wird die Datei holidays.csv aus Cloud Storage in eine neue Tabelle im BigQuery-Datasetholidays_weatheraufgenommen, das Sie zuvor erstellt haben.Mit dem Befehl
DataprocCreateBatchOperatorwird ein PySpark-Batchjob mit Managed Service for Apache Spark erstellt und ausgeführt.Mit
BigQueryInsertJobOperatorwerden die Daten aus holidays.csv in der Spalte „Date“ mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d verknüpft. DieBigQueryInsertJobOperator-Aufgaben werden dynamisch mit einer for-Schleife generiert und befinden sich in einemTaskGroup, um die Lesbarkeit in der Diagrammansicht der Airflow-Benutzeroberfläche zu verbessern.
Variablen über die Airflow-Benutzeroberfläche hinzufügen
In Airflow sind Variablen eine universelle Möglichkeit, beliebige Einstellungen oder Konfigurationen als einfachen Schlüssel/Wert-Speicher zu speichern und abzurufen. In diesem DAG werden Airflow-Variablen zum Speichern gemeinsamer Werte verwendet. So fügen Sie sie Ihrer Umgebung hinzu:
Über die Managed Airflow-Konsole auf die Airflow-UI zugreifen
Klicken Sie auf Verwaltung > Variablen.
Fügen Sie die folgenden Variablen hinzu:
gcp_project: Ihre Projekt-ID.gcs_bucket: der Name des Buckets, den Sie zuvor erstellt haben (ohne das Präfixgs://).gce_region: Die Region, in der Sie Ihren Managed Service for Apache Spark-Job ausführen möchten, der den Netzwerkanforderungen für Managed Service for Apache Spark entspricht. Das ist die Region, in der Sie den privaten Google-Zugriff zuvor aktiviert haben.dataproc_service_account: Das Dienstkonto für Ihre verwaltete Airflow-Umgebung. Sie finden dieses Dienstkonto auf dem Tab „Umgebungskonfiguration“ für Ihre Managed Airflow-Umgebung.
DAG in den Bucket Ihrer Umgebung hochladen
Managed Airflow plant DAGs, die sich im /dags-Ordner des Buckets Ihrer Umgebung befinden. So laden Sie den DAG über dieGoogle Cloud Console hoch:
Speichern Sie data_analytics_dag.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAGs-Ordner Ihrer Umgebung wird geöffnet.
Klicken Sie auf Dateien hochladen.
Wählen Sie
data_analytics_dag.pyauf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.
DAG auslösen
Klicken Sie in Ihrer Managed Airflow-Umgebung auf den Tab DAGs.
Klicken Sie auf die DAG-ID
data_analytics_dag.Klicken Sie auf DAG auslösen.
Warten Sie etwa fünf bis zehn Minuten, bis ein grünes Häkchen angezeigt wird, das angibt, dass die Aufgaben erfolgreich abgeschlossen wurden.
Erfolg des DAG prüfen
Rufen Sie in der Google Cloud Console die Seite BigQuery auf.
Klicken Sie im Explorer-Bereich auf den Namen Ihres Projekts.
Klicken Sie auf
holidays_weather_joined.Klicken Sie auf „Vorschau“, um die resultierende Tabelle aufzurufen. Die Zahlen in der Spalte „Wert“ werden in Zehntelgrad Celsius angegeben.
Klicken Sie auf
holidays_weather_normalized.Klicken Sie auf „Vorschau“, um die resultierende Tabelle aufzurufen. Die Zahlen in der Spalte „Wert“ sind in Grad Celsius angegeben.
Managed Service for Apache Spark im Detail (optional)
Sie können eine erweiterte Version dieses DAG mit einem komplexeren PySpark-Datenverarbeitungsablauf ausprobieren. Weitere Informationen finden Sie unter Managed Service for Apache Spark extension for the Data Analytics Example auf GitHub.
Bereinigen
Löschen Sie die einzelnen Ressourcen, die Sie für diese Anleitung erstellt haben:
Löschen Sie den Cloud Storage-Bucket, den Sie für diese Anleitung erstellt haben.
Löschen Sie die verwaltete Airflow-Umgebung, einschließlich des manuellen Löschens des Buckets der Umgebung.
Nächste Schritte
- Datenanalyse-DAG in Google Cloud Mit Daten von AWS ausführen
- DAG für die Datenanalyse in Azure ausführen: