In dieser Anleitung wird beschrieben, wie Sie eine Orchestrierungspipeline in der Erweiterung „Google Cloud Data Agent Kit“ für Visual Studio Code erstellen und bereitstellen.
Die Beispielpipeline führt ein PySpark-Skript in Managed Service for Apache Spark aus.
Sie können Orchestrierungspipelines in VS Code als lokale Versionen oder über eine GitHub-Aktion bereitstellen, z. B. wenn Sie Änderungen in den main-Branch zusammenführen. In diesem Dokument wird gezeigt, wie Sie die lokale Version einer Orchestrierungspipeline bereitstellen.
Hinweis
Führen Sie die folgenden Schritte aus, bevor Sie beginnen:
- Installieren Sie die Erweiterung „Data Agent Kit“ für VS Code.
- Konfigurieren Sie Ihre Einstellungen.
- Fügen Sie Ihrem VS Code-Arbeitsbereich ein GitHub-Repository hinzu, um Orchestrierungspipelines und Assets wie Skripts zu speichern.
Erforderliche IAM-Rollen prüfen
Bitten Sie Ihren Administrator, Ihnen die erforderlichen Rollen zuzuweisen, um die Berechtigungen zum Erstellen von Ressourcen in Ihrem Projekt, zum Bereitstellen und Ausführen von Orchestrierungspipelines zu erhalten.
Zum Erstellen und Verwalten von Managed Service for Apache Airflow-Umgebungen und zum Verwalten von Objekten in den zugehörigen Buckets benötigen Sie die folgenden Rollen. Weitere Informationen zu diesen Nutzerrollen finden Sie unter Rollen für Nutzer zuweisen in der Dokumentation zu Managed Service for Apache Airflow.
- Administrator für Umgebung und Storage-Objekte (composer.environmentAndStorageObjectAdmin)
- Dienstkontonutzer (
iam.serviceAccountUser)
Für die Arbeit mit BigQuery- und Cloud Storage-Ressourcen benötigen Sie die folgenden Rollen.
- BigQuery-Datenbearbeiter (
roles/bigquery.dataEditor) - Storage-Objekt-Administrator (
roles/storage.objectAdmin)
Je nach den Ressourcen, auf die Sie zugreifen möchten, benötigen Sie möglicherweise zusätzliche Rollen, die über die Rollen hinausgehen, mit denen Sie die Erweiterung verwenden und mit Orchestrierungspipelines arbeiten können.
Dienstkonto erstellen und IAM-Rollen zuweisen
Verwenden Sie ein eindeutiges Dienstkonto für die Managed Airflow Gen 3-Umgebung. Mit dem Dienstkonto wird eine Managed Airflow Gen 3-Umgebung erstellt und alle von Ihnen bereitgestellten Orchestrierungspipelines werden ausgeführt.
Bitten Sie Ihren Administrator, die folgenden Schritte auszuführen:
- Erstellen Sie ein Dienstkonto, wie in der IAM-Dokumentation beschrieben.
- Weisen Sie dem Dienstkonto die Rolle Composer-Worker (
composer.worker) zu. Diese Rolle bietet in den meisten Fällen die erforderlichen Berechtigungen.
Als Best Practice sollten Sie diesem Dienstkonto nur dann zusätzliche Berechtigungen gewähren, wenn dies für den Betrieb der Orchestrierungspipeline erforderlich ist, wenn Sie auf andere Ressourcen in Ihrem Google Cloud Projekt zugreifen müssen.
Ressourcen für Ihre Orchestrierungspipeline erstellen Google Cloud
In diesem Schritt erstellen Sie Google Cloud Ressourcen für Ihre Orchestrierungs pipeline.
Managed Airflow Gen 3-Umgebung erstellen
Erstellen Sie eine Managed Airflow Gen 3-Umgebung mit der folgenden Konfiguration:
- Umgebungsname: Geben Sie einen Namen ein, den Sie später zum Konfigurieren
der Orchestrierungspipeline verwenden. Beispiel:
example-pipeline-scheduler. - Standort: Wählen Sie einen Standort aus. Wir empfehlen, alle Ressourcen in dieser Anleitung am selben Standort zu erstellen. Beispiel:
us-central1. - Dienstkonto: Wählen Sie das Dienstkonto aus, das Sie für diese Umgebung erstellt haben.
Das folgende Beispiel für einen Google Cloud CLI-Befehl zeigt die Syntax:
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
Umgebungsparameter zur Planerkonfiguration hinzufügen
Geben Sie die Verbindungsdetails für die Managed Airflow-Umgebung an, in der Ihre Orchestrierungspipeline ausgeführt wird.
Fügen Sie die Konfigurationsparameter der Umgebung hinzu, die Sie mit dem Einstellungseditor für das Google Cloud Data Agent Kit erstellt haben:
- Klicken Sie in der Aktivitätsleiste auf das Symbol Google Cloud Data Agent Kit.
- Maximieren Sie Einstellungen und klicken Sie dann auf Einstellungen.
- Wählen Sie Planer aus.
- Geben Sie die Parameter für die zuvor erstellte Managed Airflow Gen 3-Umgebung ein:
- Projekt-ID: Name des Projekts, in dem sich die Umgebung liegt.
Beispiel:
example-project. - Region: Region, in der sich die Umgebung befindet. Beispiel:
us-central1. - Umgebung: Name der Umgebung. Beispiel:
example-pipeline-scheduler.
- Projekt-ID: Name des Projekts, in dem sich die Umgebung liegt.
Beispiel:
- Klicken Sie auf Speichern.
Bucket für Pipeline-Artefakte erstellen
Erstellen Sie einen Cloud Storage-Bucket im selben Projekt wie die
Managed Airflow-Umgebung und geben Sie ihm einen Namen wie
example-pipelines-bucket. Dieser Bucket ist erforderlich, um Ihren Managed Service for Apache Spark-Job zu speichern.
Einige Pipelineaktionen, z. B. die Ausgabe der Ergebnisse in einen Cloud Storage-Bucket.
Neues Dataset und neue Tabelle in BigQuery erstellen
In dieser Anleitung wird eine Pipeline gezeigt, die Daten in eine BigQuery-Tabelle schreibt. Erstellen Sie die folgenden BigQuery-Ressourcen in Ihrem Projekt:
- Erstellen Sie ein neues Dataset mit dem Namen
wordcount_dataset. - Erstellen Sie eine neue BigQuery-Tabelle mit dem Namen
wordcount_output.
Pipeline-Assets hinzufügen
In dieser Anleitung wird eine häufige Aufgabe im Bereich Data Engineering (ETL: Extrahieren, Transformieren, Laden) mit PySpark gezeigt. Dabei werden Daten aus BigQuery gelesen, transformiert (Wortzählung) und wieder in BigQuery geladen.
Nicht agentisch
Fügen Sie die folgende Datei dem Ordner /scripts Ihres Repositorys hinzu. Später fügen Sie eine Pipelineaktion hinzu, mit der dieses Skript in Managed Service for Apache Spark ausgeführt wird.
Beispieldatei wordcount.py:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
Ersetzen Sie Folgendes:
- ARTIFACTS_BUCKET_NAME: der Name des Cloud Storage-Bucket
, den Sie zuvor erstellt haben. Beispiel:
example-pipelines-bucket. - PROJECT_ID: der Name des Projekts, in dem sich die Umgebung
befindet. Beispiel:
example-project.
Agentisch
Fordern Sie den Agent auf, ein PySpark-Beispielskript im Ordner /scripts Ihres Repositorys zu generieren. Später fügen Sie eine Pipelineaktion hinzu, mit der dieses Skript in Managed Service for Apache Spark ausgeführt wird.
Geben Sie einen Prompt ähnlich dem folgenden ein:
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
Orchestrierungspipelines in Ihrem Repository initialisieren
Wenn Sie Orchestrierungspipelines initialisieren, erstellt die Erweiterung „Data Agent Kit“ für VS Code ein Gerüst, das Folgendes enthält:
- Eine YAML-Datei für die Orchestrierungspipeline: Eine Beispiel-Pipelinedefinition, die einen Zeitplan, aber keine definierten Aktionen enthält.
deployment.yaml: Eine Beispielkonfiguration für die Pipelinebereitstellung, die definiert, wie Ihre Pipeline bereitgestellt werden muss. Diese Datei zeigt die erforderliche Konfiguration für die Managed Airflow-Umgebung, den Artefakt-Bucket und alle anderen Ressourcen, die von Ihren Pipelineaktionen verwendet werden..github/workflows/deploy.yaml: Richtet eine GitHub-Aktion ein, mit der Ihre Pipeline bereitgestellt wird, wenn Sie Änderungen in denmain-Branch Ihres GitHub-Repositorys zusammenführen..github/workflows/validate.yaml: Richtet eine GitHub-Aktion ein, mit der Ihre Pipeline nach der Bereitstellung validiert wird.
In späteren Schritten dieses Dokuments erweitern Sie diese Definitionen mit der Erweiterung „Data Agent Kit“ für VS Code, um eine Orchestrierungspipeline lokal zu erstellen und bereitzustellen.
Nicht agentisch
So initialisieren Sie Orchestrierungspipelines:
- Klicken Sie in der Aktivitätsleiste auf das Symbol Google Cloud Data Agent Kit.
- Maximieren Sie Data Engineering und klicken Sie dann auf Orchestrierung pipeline initialisieren.
- Geben Sie die Parameter für die neue Orchestrierungspipeline ein:
- Pipeline-ID: Geben Sie die ID Ihrer Pipeline ein. Beispiel:
example-pipeline. - Google Cloud-Projekt-ID: der Name des Projekts, in dem sich die Umgebung
befindet. Beispiel:
example-project. - Region: die Region, in der sich Ihre Umgebung befindet. Beispiel:
us-central1. - Umgebungs-ID: der Name der Umgebung, mit der Sie entwickeln möchten.
Beispiel:
dev/staging. Managed Service for Apache Airflow-Umgebung für den Planer: der Name von der Umgebung, in der Sie Ihre Pipelines orchestrieren möchten. Geben Sie für dieses Dokument dieselbe Umgebung in diesem Parameter an.
Artefakt-Bucket: der Name des Buckets, der für Pipeline-Artefakte verwendet wird, ohne das
gs://Präfix. Beispiel:example-pipelines-bucket.Klicken Sie auf Weiter.
Klicken Sie auf Initialisieren.
Geben Sie einen Arbeitsbereich an, in dem die Pipeline initialisiert werden soll.
Agentisch
Fordern Sie den Agenten auf, ein Gerüst für Orchestrierungspipelines Ihres Repositorys zu erstellen.
Geben Sie einen Prompt ähnlich dem folgenden ein:
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
Nachdem Sie Pipelines in Ihrem Repository initialisiert haben, können Sie dies nicht noch einmal tun, da das neue Gerüst alle von Ihnen vorgenommenen Konfigurationsänderungen überschreiben würde. Sie können neue Pipelines hinzufügen, indem Sie neue Pipelinedefinitionsdateien in Ihrem Projekt erstellen und sie der Bereitstellungskonfiguration hinzufügen.
Neue Aufgabe zur Pipeline hinzufügen
Da die anfängliche Pipelinekonfiguration keine Aktionen enthält, fügen Sie eine Aktion hinzu, mit der Ihr PySpark-Skript ausgeführt wird.
Nicht agentisch
So bearbeiten Sie eine Pipeline:
- Klicken Sie in der Aktivitätsleiste auf das Symbol Google Cloud Data Agent Kit.
- Erweitern Sie Data Engineering und dann Orchestration Pipelines.
- Wählen Sie
example-pipeline.yamlaus. Ein Pipeline-Editor wird für die ausgewählte Pipeline geöffnet. - Optional: Wählen Sie den Knoten Zeitplantrigger aus. Sie können den Zeitplan für Ihre Pipeline anpassen, indem Sie einen cron-ähnlichen Ausdruck und Start- und Endzeiten für den Zeitplan angeben. Der Standardzeitplan für die neu initialisierte Pipeline ist
0 2 * * *, was bedeutet, dass sie täglich um 2:00 Uhr ausgeführt wird.
Fügen Sie eine neue Aufgabe hinzu. In dieser Anleitung fügen Sie eine PySpark-Aufgabe hinzu, mit der ein PySpark-Skript ausgeführt wird, das Sie zuvor hinzugefügt haben:
- Klicken Sie auf Erste Aufgabe hinzufügen , um einen neuen Aufgabenknoten hinzuzufügen.
- Wählen Sie PySpark-Skript ausführen und die Datei
script/wordcount.pyaus.
Das Fenster PySpark-Skript ausführen wird geöffnet.
- Wählen Sie unter „Spark-Clustermodus“ die Option Serverless Spark aus.
- Geben Sie unter Standort den Standort an, an dem sich Ihre Umgebung befindet.
Beispiel:
us-central1. - Klicken Sie auf Speichern.
Agentisch
Führen Sie den folgenden Prompt aus:
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
Lokale Version der Pipeline bereitstellen
Stellen Sie die lokale Version der Pipeline bereit, um zu bestätigen, dass sie richtig konfiguriert ist.
Wenn Sie eine lokale Version der Orchestrierungspipeline bereitstellen, lädt die Erweiterung „Data Agent Kit“ für VS Code eine lokale Version des Pipelinepakets in die Managed Airflow-Umgebung hoch und führt sie aus. Die lokale Bereitstellung ist für die Arbeit in einer Entwicklungsumgebung vorgesehen.
Mit dem Befehl „Bereitstellen“ wird ein nicht pausierter Zeitplan bereitgestellt. Um dies zu verhindern, können Sie den Zeitplan manuell im Bereich „Pipelines verwalten“ pausieren. Sie können auch Ihre YAML-Datei für die Pipeline bearbeiten, um den Block triggers: - schedule auszukommentieren oder zu entfernen.
Nicht agentisch
So stellen Sie eine lokale Version der Beispiel-Orchestrierungspipeline bereit:
- Klicken Sie in der Aktivitätsleiste auf das Symbol Google Cloud Data Agent Kit.
- Erweitern Sie Data Engineering und dann Orchestration Pipelines.
- Wählen Sie
example-pipeline.yamlaus. Ein Pipeline-Editor wird für die ausgewählte Pipeline geöffnet. - Wählen Sie Pipeline ausführen und dann die zuvor erstellte Entwicklungs- oder Stagingumgebung aus.
Agentisch
Führen Sie den folgenden Prompt aus:
Deploy my pipeline
Pipelineausführung überwachen und Ausführungsprotokolle prüfen
Nachdem Ihre Pipeline bereitgestellt wurde, können Sie die detaillierten Informationen, den Verlauf der Pipelineausführungen und die Ausführungsprotokolle der Pipeline aufrufen:
- Klicken Sie in der Aktivitätsleiste auf das Symbol Google Cloud Data Agent Kit.
- Maximieren Sie Data Engineering und wählen Sie dann Pipelines verwalten aus.
- Klicken Sie auf den Namen Ihrer Pipeline (
example-pipeline), um den Ausführungsverlauf aufzurufen. In der Liste der Ausführungen für ein bestimmtes Datum sehen Sie einzelne Pipelineausführungen und die Aufschlüsselung der einzelnen Aktionen in jeder Pipelineausführung. - Klicken Sie auf eine Aufgaben-ID, um die Protokolle der Aufgabenbearbeitung aufzurufen. Da das PySpark-Beispielskript in Managed Service for Apache Spark ausgeführt wurde, enthalten die Aufgabenprotokolle einen Link zu den Batchprotokollen.
Pipelinefehler beheben
Wenn Ihre Pipeline fehlschlägt, wird im Bereich Pipelines verwalten die Schaltfläche Diagnose angezeigt.
Agentisch
Wenn Sie auf die Schaltfläche Diagnose klicken, generiert der Agent einen Prompt zur Fehlerbehebung bei der Pipeline. Der Prompt wird entweder in die Zwischenablage kopiert oder in einer neuen Chatsitzung geöffnet.
Der Agent verwendet spezielle Fähigkeiten zur Fehlerbehebung bei Pipelines. Dabei werden Protokolle erfasst, bereitgestellter Code und der Arbeitsbereich abgeglichen und eine Ursachenanalyse erstellt.
Mögliche nächste Schritte nach Erhalt der Ursachenanalyse:
- Ursachenanalyse im aktuellen Arbeitsbereich anwenden.
- Den Agent bitten, einen neuen Branch zu erstellen und die Änderungen dort anzuwenden.
- Ein Cloud Customer Care-Ticket mit den Details der Ursachenanalyse öffnen.
Hilfe zur Fehlerbehebung bei Problemen mit der Erweiterung finden Sie unter Fehlerbehebung.