Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Diese Anleitung ist eine Änderung von DAG für Datenanalyse ausführen Google Cloud und zeigt, wie Sie Ihre Cloud Composer-Umgebung mit Amazon Web Services verbinden, um dort gespeicherte Daten zu nutzen. Außerdem wird gezeigt, wie Sie mit Cloud Composer einen Apache Airflow DAG erstellen. Der DAG verknüpft Daten aus einem öffentlichen BigQuery-Dataset und einer CSV-Datei, die in einem Amazon Web Services (AWS) S3-Bucket gespeichert ist, und führt dann einen Google Cloud Serverless for Apache Spark-Batchjob aus, um die verknüpften Daten zu verarbeiten.
Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, eine integrierte Datenbank mit Klimazusammenfassungen aus aller Welt. Die CSV-Datei enthält Informationen zu den Daten und Namen von Feiertagen in den USA von 1997 bis 2021.
Die Frage, die wir mit dem DAG beantworten möchten, lautet: „Wie warm war es in Chicago an Thanksgiving in den letzten 25 Jahren?“
Ziele
- Cloud Composer-Umgebung in der Standardkonfiguration erstellen
- Bucket in AWS S3 erstellen
- Leeres BigQuery-Dataset erstellen
- Neuen Cloud Storage-Bucket erstellen
- DAG mit nachfolgenden Aufgaben erstellen und ausführen:
- Externes Dataset aus S3 in Cloud Storage laden
- Externes Dataset aus Cloud Storage in BigQuery laden
- Zwei Datasets in BigQuery verknüpfen
- PySpark-Job für Datenanalyse ausführen
Hinweis
Berechtigungen in AWS verwalten
Folgen Sie der Anleitung im Abschnitt Richtlinien mit dem visuellen Editor erstellen der AWS-Anleitung IAM-Richtlinien erstellen, um eine benutzerdefinierte IAM-Richtlinie für AWS S3 mit der folgenden Konfiguration zu erstellen:
- Dienst:S3
- ListAllMyBuckets (
s3:ListAllMyBuckets) zum Aufrufen Ihres S3-Buckets - CreateBucket (
s3:CreateBucket) zum Erstellen eines Buckets - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls) zum Erstellen eines Buckets - ListBucket (
s3:ListBucket) zum Erteilen der Berechtigung zum Auflisten von Objekten in einem S3-Bucket - PutObject (
s3:PutObject) zum Hochladen von Dateien in einen Bucket - GetBucketVersioning (
s3:GetBucketVersioning) zum Löschen eines Objekts in einem Bucket - DeleteObject (
s3:DeleteObject) zum Löschen eines Objekts in einem Bucket - ListBucketVersions (
s3:ListBucketVersions) zum Löschen eines Buckets - DeleteBucket (
s3:DeleteBucket) zum Löschen eines Buckets - Ressourcen:Wählen Sie neben „Bucket“ und „Objekt“ die Option „Beliebig“ aus, um Berechtigungen für alle Ressourcen dieses Typs zu erteilen.
- Tag:Keine
- Name:TutorialPolicy
Weitere Informationen zu den einzelnen Konfigurationen finden Sie in der Liste der in Amazon S3 unterstützten Aktionen.
Fügen Sie Ihrer Identität die IAM-Richtlinie TutorialPolicy hinzu.
APIs aktivieren
Aktivieren Sie folgende APIs:
Console
Aktivieren Sie die Dataproc, Cloud Composer, BigQuery und Cloud Storage APIs.
Rollen, die zum Aktivieren von APIs erforderlich sind
Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“
(roles/serviceusage.serviceUsageAdmin), die
die Berechtigung serviceusage.services.enable enthält. Informationen zum Zuweisen von
Rollen.
gcloud
Aktivieren Sie die Dataproc, Cloud Composer, BigQuery und Cloud Storage APIs:
Rollen, die zum Aktivieren von APIs erforderlich sind
Zum Aktivieren von APIs benötigen Sie die IAM-Rolle „Service Usage-Administrator“ (roles/serviceusage.serviceUsageAdmin), die die Berechtigung serviceusage.services.enable enthält. Informationen zum Zuweisen von
Rollen.
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Berechtigungen erteilen
Erteilen Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen:
Erteilen Sie Rollen zum Verwalten von Cloud Composer-Umgebungen und Umgebungs-Buckets.
Erteilen Sie die Rolle BigQuery-Dateninhaber (
roles/bigquery.dataOwner), um ein BigQuery-Dataset zu erstellen.Erteilen Sie die Rolle Storage-Administrator (
roles/storage.admin), um einen Cloud Storage-Bucket zu erstellen.
Cloud Composer-Umgebung erstellen und vorbereiten
Erstellen Sie eine Cloud Composer-Umgebung mit Standard parametern:
- Wählen Sie eine Region in den USA aus.
- Wählen Sie die neueste Cloud Composer-Version aus.
Erteilen Sie dem in Ihrer Cloud Composer-Umgebung verwendeten Dienstkonto die folgenden Rollen, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausführen können:
- BigQuery User (
roles/bigquery.user) - BigQuery-Dateninhaber (
roles/bigquery.dataOwner) - Dienstkontonutzer (
roles/iam.serviceAccountUser) - Dataproc-Bearbeiter (
roles/dataproc.editor) - Dataproc-Worker (
roles/dataproc.worker)
- BigQuery User (
Verwandte Ressourcen in erstellen und ändern Google Cloud
Installieren Sie das
apache-airflow-providers-amazonPyPI-Paket in Ihrer Cloud Composer-Umgebung.Erstellen Sie ein leeres BigQuery-Dataset mit den folgenden Parametern:
- Name:
holiday_weather - Region:
US
- Name:
Erstellen Sie einen neuen Cloud Storage-Bucket in der
USMultiregion.Führen Sie den folgenden Befehl aus, um den privaten Google-Zugriff im Standardsubnetz in der Region zu aktivieren, in der Sie Google Cloud Serverless for Apache Spark ausführen möchten, um die Netzwerkanforderungenzu erfüllen. Wir empfehlen, dieselbe Region wie für Ihre Cloud Composer-Umgebung zu verwenden.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Verwandte Ressourcen in AWS erstellen
Erstellen Sie einen S3-Bucket mit den Standardeinstellungen in Ihrer bevorzugten Region.
Verbindung von Cloud Composer zu AWS herstellen
- AWS-Zugriffsschlüssel-ID und geheimen Zugriffsschlüssel abrufen
Fügen Sie Ihre AWS S3 Verbindung über die Airflow-Benutzeroberfläche hinzu:
- Klicken Sie auf Admin > Verbindungen.
Erstellen Sie eine neue Verbindung mit der folgenden Konfiguration:
- Verbindungs-ID:
aws_s3_connection - Verbindungstyp:
Amazon S3 - Extras (oder JSON für zusätzliche Felder):
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- Verbindungs-ID:
Datenverarbeitung mit Google Cloud Serverless for Apache Spark
In diesem Abschnitt wird die Verarbeitung von Daten mit Google Cloud Serverless for Apache Spark beschrieben.
Beispiel für einen PySpark-Job ansehen
Der unten gezeigte Code ist ein Beispiel für einen PySpark-Job, der die Temperatur von Zehntelgrad Celsius in Grad Celsius umwandelt. Mit diesem Job werden Temperaturdaten aus dem Dataset in ein anderes Format konvertiert.
PySpark-Datei in Cloud Storage hochladen
So laden Sie die PySpark-Datei in Cloud Storage hoch:
Speichern Sie data_analytics_process.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Cloud Storage-Browser auf:
Klicken Sie auf den Namen des Buckets, den Sie zuvor erstellt haben.
Klicken Sie auf dem Tab Objekte für den Bucket auf die Schaltfläche Dateien hochladen , wählen Sie im angezeigten Dialogfeld
data_analytics_process.pyaus und klicken Sie auf Öffnen.
CSV-Datei in AWS S3 hochladen
So laden Sie die Datei holidays.csv hoch:
- Speichern Sie
holidays.csvauf Ihrem lokalen Computer. - Folgen Sie der AWS-Anleitung, um die Datei in Ihren Bucket hochzuladen.
DAG für Datenanalyse
In diesem Abschnitt wird beschrieben, wie Sie den DAG für die Datenanalyse konfigurieren und verwenden.
Beispiel-DAG ansehen
Der DAG verwendet mehrere Operatoren, um die Daten zu transformieren und zu vereinheitlichen:
Der
S3ToGCSOperatorüberträgt die Datei holidays.csv aus Ihrem AWS S3-Bucket in Ihren Cloud Storage-Bucket.Der
GCSToBigQueryOperatornimmt die Datei holidays.csv aus Cloud Storage in eine neue Tabelle im BigQuery-Datasetholidays_weatherauf, das Sie zuvor erstellt haben.Der
DataprocCreateBatchOperatorerstellt und führt einen PySpark-Batchjob mit Serverless for Apache Spark aus.Der
BigQueryInsertJobOperatorverknüpft die Daten aus holidays.csv in der Spalte „Date“ mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d. DieBigQueryInsertJobOperatorAufgaben werden dynamisch mit einer for-Schleife generiert und befinden sich in einerTaskGroup, um die Lesbarkeit in der Diagramm Ansicht der Airflow-Benutzeroberfläche zu verbessern.
Variablen über die Airflow-Benutzeroberfläche hinzufügen
In Airflow sind Variablen eine universelle Möglichkeit, beliebige Einstellungen oder Konfigurationen als einfachen Schlüssel-Wert-Speicher zu speichern und abzurufen. Dieser DAG verwendet Airflow-Variablen, um allgemeine Werte zu speichern. So fügen Sie sie Ihrer Umgebung hinzu:
Rufen Sie die Airflow-Benutzeroberfläche über Google Cloud die Console auf.
Klicken Sie auf Admin > Variablen.
Fügen Sie die folgenden Variablen hinzu:
s3_bucket: der Name des zuvor erstellten S3-Buckets.gcp_project: Ihre Projekt-ID.gcs_bucket: der Name des zuvor erstellten Buckets (ohne das Präfixgs://).gce_region: die Region, in der Sie Ihren Dataproc-Job ausführen möchten, der die Google Cloud Netzwerkanforderungen von Serverless for Apache Spark erfüllt. Dies ist die Region, in der Sie zuvor den privater Google-Zugriff aktiviert haben.dataproc_service_account: das Dienstkonto für Ihre Cloud Composer-Umgebung. Sie finden dieses Dienstkonto auf dem Tab „Umgebungskonfiguration“ für Ihre Cloud Composer-Umgebung.
DAG in den Bucket Ihrer Umgebung hochladen
Cloud Composer plant DAGs, die sich im Ordner /dags im Bucket Ihrer Umgebung befinden. So laden Sie den DAG über die
Google Cloud Console hoch:
Speichern Sie s3togcsoperator_tutorial.py auf Ihrem lokalen Computer.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAGs-Ordner Ihrer Umgebung wird geöffnet.
Klicken Sie auf Dateien hochladen.
Wählen Sie
s3togcsoperator_tutorial.pyauf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.
DAG auslösen
Klicken Sie in Ihrer Cloud Composer-Umgebung auf den Tab DAGs.
Klicken Sie auf die DAG-ID
s3_to_gcs_dag.Klicken Sie auf DAG auslösen.
Warten Sie etwa fünf bis zehn Minuten, bis ein grünes Häkchen angezeigt wird, das darauf hinweist, dass die Aufgaben erfolgreich abgeschlossen wurden.
Erfolg des DAG prüfen
Rufen Sie in der Google Cloud Console die Seite BigQuery auf.
Klicken Sie im Bereich Explorer auf den Namen Ihres Projekts.
Klicken Sie auf
holidays_weather_joined.Klicken Sie auf „Vorschau“, um die resultierende Tabelle anzusehen. Beachten Sie, dass die Zahlen in der Spalte „Wert“ in Zehntelgrad Celsius angegeben sind.
Klicken Sie auf
holidays_weather_normalized.Klicken Sie auf „Vorschau“, um die resultierende Tabelle anzusehen. Beachten Sie, dass die Zahlen in der Spalte „Wert“ in Grad Celsius angegeben sind.
Bereinigen
Löschen Sie die einzelnen Ressourcen, die Sie für diese Anleitung erstellt haben:
Löschen Sie die Datei
holidays.csvin Ihrem AWS S3 Bucket.Löschen Sie den Cloud Storage-Bucket, den Sie für diese Anleitung erstellt haben.
Löschen Sie die Cloud Composer-Umgebung, einschließlich des Buckets der Umgebung, den Sie manuell löschen müssen.