Data-Engineering-Pipelines erstellen

In diesem Leitfaden wird beschrieben, wie Sie eine Orchestrierungspipeline in der Google Cloud Data Agent Kit-Erweiterung für Antigravity erstellen und bereitstellen.

In der Beispielpipeline wird ein PySpark-Script in Managed Service for Apache Spark ausgeführt.

Sie können Orchestrierungspipelines aus Antigravity als lokale Versionen oder über eine GitHub-Aktion bereitstellen, z. B. beim Zusammenführen von Änderungen in den main-Branch. In diesem Dokument wird gezeigt, wie Sie die lokale Version einer Orchestrierungspipeline bereitstellen.

Hinweis

Führen Sie zuerst die folgenden Schritte aus:

  1. Installieren Sie die Data Agent Kit-Erweiterung für Antigravity.
  2. Einstellungen konfigurieren
  3. Fügen Sie Ihrem Antigravity-Arbeitsbereich ein GitHub-Repository hinzu, um Orchestrierungspipelines und Assets wie Skripts zu speichern.

Erforderliche IAM-Rollen ansehen

Bitten Sie Ihren Administrator, Ihnen die erforderlichen Rollen zuzuweisen, damit Sie die Berechtigungen zum Erstellen von Ressourcen in Ihrem Projekt, zum Bereitstellen und Ausführen von Orchestrierungspipelines erhalten.

Zum Erstellen und Verwalten von Managed Service for Apache Airflow-Umgebungen und zum Verwalten von Objekten in den zugehörigen Buckets benötigen Sie die folgenden Rollen. Weitere Informationen zu diesen Nutzerrollen finden Sie in der Dokumentation zu Managed Service for Apache Airflow unter Rollen zuweisen.

  • Administrator für Umgebung und Storage-Objekte (composer.environmentAndStorageObjectAdmin)
  • Dienstkontonutzer (iam.serviceAccountUser)

Für die Arbeit mit BigQuery- und Cloud Storage-Ressourcen benötigen Sie die folgenden Rollen.

  • BigQuery Data Editor (roles/bigquery.dataEditor)
  • Storage-Objekt-Administrator (roles/storage.objectAdmin)

Je nach den Ressourcen, auf die Sie zugreifen möchten, benötigen Sie möglicherweise zusätzliche Rollen, die über die Rollen hinausgehen, mit denen Sie die Erweiterung verwenden und mit Orchestrierungspipelines arbeiten können.

Dienstkonto erstellen und ihm IAM-Rollen zuweisen

Verwenden Sie ein eindeutiges Dienstkonto für die Managed Airflow Gen 3-Umgebung. Mit dem Dienstkonto wird eine Managed Airflow Gen 3-Umgebung erstellt und alle Orchestrierungspipelines ausgeführt, die Sie bereitstellen.

Bitten Sie Ihren Administrator, die folgenden Schritte auszuführen:

  1. Erstellen Sie ein Dienstkonto, wie in der IAM-Dokumentation beschrieben.
  2. Weisen Sie dem Dienstkonto die Rolle Composer-Worker (composer.worker) zu. Diese Rolle bietet in den meisten Fällen die erforderlichen Berechtigungen.

Wenn Sie auf andere Ressourcen in IhremGoogle Cloud -Projekt zugreifen müssen, sollten Sie diesem Dienstkonto als Best Practice nur dann zusätzliche Berechtigungen erteilen, wenn dies für den Orchestrierungspipeline-Vorgang erforderlich ist.

Google Cloud -Ressourcen für Ihre Orchestrierungspipeline erstellen

In diesem Schritt erstellen Sie Google Cloud Ressourcen für Ihre Orchestrierungspipeline.

Managed Airflow Gen 3-Umgebung erstellen

Erstellen Sie eine Managed Airflow Gen 3-Umgebung mit der folgenden Konfiguration:

  • Umgebungsname: Geben Sie einen Namen ein, den Sie später zum Konfigurieren der Orchestrierungspipeline verwenden. Beispiel: example-pipeline-scheduler.
  • Standort: Wählen Sie einen Standort aus. Wir empfehlen, alle Ressourcen in diesem Leitfaden am selben Standort zu erstellen. Beispiel: us-central1.
  • Dienstkonto: Wählen Sie das Dienstkonto aus, das Sie für diese Umgebung erstellt haben.

Das folgende Beispiel für einen Google Cloud CLI-Befehl zeigt die Syntax:

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

Umgebungsparameter zur Planerkonfiguration hinzufügen

Geben Sie Verbindungsdetails für die verwaltete Airflow-Umgebung an, in der Ihre Orchestrierungspipeline ausgeführt wird.

Fügen Sie die Konfigurationsparameter der Umgebung hinzu, die Sie mit dem Editor für Google Cloud Data Agent Kit-Einstellungen erstellt haben:

  1. Klicken Sie in der Aktivitätsleiste auf das Symbol Google Cloud Data Agent Kit.
  2. Maximieren Sie die Einstellungen und klicken Sie dann auf Einstellungen.
  3. Wähle Scheduler aus.
  4. Geben Sie die Parameter für die zuvor erstellte Managed Airflow Gen 3-Umgebung ein:
    • Projekt-ID: Name des Projekts, in dem sich die Umgebung befindet. Beispiel: example-project.
    • Region: Die Region, in der sich die Umgebung befindet. Beispiel: us-central1.
    • Umgebung: Name der Umgebung. Beispiel: example-pipeline-scheduler.
  5. Klicken Sie auf Speichern.

Bucket für Pipelineartefakte erstellen

Erstellen Sie einen Cloud Storage-Bucket im selben Projekt wie die verwaltete Airflow-Umgebung und geben Sie ihm einen Namen wie example-pipelines-bucket. Dieser Bucket ist erforderlich, um Ihren Managed Service for Apache Spark-Job zu speichern.

Einige Pipeline-Aktionen, z. B. die Ausgabe der Ergebnisse in einen Cloud Storage-Bucket.

Neues Dataset und neue Tabelle in BigQuery erstellen

In diesem Leitfaden wird eine Pipeline gezeigt, mit der Daten in eine BigQuery-Tabelle geschrieben werden. Erstellen Sie die folgenden BigQuery-Ressourcen in Ihrem Projekt:

  1. Erstellen Sie ein neues Dataset mit dem Namen wordcount_dataset.
  2. Erstellen Sie eine neue BigQuery-Tabelle mit dem Namen wordcount_output.

Pipeline-Assets hinzufügen

In dieser Anleitung wird eine häufige Data-Engineering-Aufgabe (ETL: Extrahieren, Transformieren, Laden) mit PySpark demonstriert. Dabei werden Daten aus BigQuery gelesen, transformiert (Wortanzahl) und wieder in BigQuery geladen.

Nicht agentisch

Fügen Sie die folgende Datei dem Ordner /scripts Ihres Repositorys hinzu. Später fügen Sie eine Pipelineaktion hinzu, mit der dieses Script in Managed Service for Apache Spark ausgeführt wird.

wordcount.py-Beispieldatei:

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

Ersetzen Sie Folgendes:

  • ARTIFACTS_BUCKET_NAME: der Name des Cloud Storage-Bucket, den Sie zuvor erstellt haben. Beispiel: example-pipelines-bucket.
  • PROJECT_ID: der Name des Projekts, in dem sich die Umgebung befindet. Beispiel: example-project.

Agentisch

Fordern Sie den Agent auf, ein Beispiel-PySpark-Script im Ordner /scripts Ihres Repositorys zu generieren. Später fügen Sie eine Pipelineaktion hinzu, mit der dieses Skript in Managed Service for Apache Spark ausgeführt wird.

Geben Sie einen Prompt ähnlich dem folgenden ein:

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

Orchestration-Pipelines in Ihrem Repository initialisieren

Wenn Sie Orchestrierungspipelines initialisieren, erstellt die Data Agent Kit-Erweiterung für Antigravity ein Gerüst, das Folgendes umfasst:

  • Eine YAML-Datei für die Orchestrierungspipeline: Eine Beispiel-Pipelinedefinition, die einen Zeitplan, aber keine definierten Aktionen enthält.
  • deployment.yaml: Eine Beispielkonfiguration für die Bereitstellung von Pipelines, in der definiert wird, wie Ihre Pipeline bereitgestellt werden muss. In dieser Datei wird die erforderliche Konfiguration für die Managed Airflow-Umgebung, den Artefakt-Bucket und alle anderen Ressourcen veranschaulicht, die von Ihren Pipelineaktionen verwendet werden.
  • .github/workflows/deploy.yaml: Richtet eine GitHub-Aktion ein, mit der Ihre Pipeline bereitgestellt wird, wenn Sie Änderungen in den main-Branch Ihres GitHub-Repositorys zusammenführen.
  • .github/workflows/validate.yaml: Richtet eine GitHub-Aktion ein, die Ihre Pipeline nach der Bereitstellung validiert.

In späteren Schritten dieses Dokuments erweitern Sie diese Definitionen mit der Data Agent Kit-Erweiterung für Antigravity, um eine Orchestrierungspipeline lokal zu erstellen und bereitzustellen.

Nicht agentisch

So initialisieren Sie Orchestrierungspipelines:

  1. Klicken Sie in der Aktivitätsleiste auf das Symbol Google Cloud Data Agent Kit.
  2. Maximieren Sie Data Engineering und klicken Sie dann auf Orchestration-Pipeline initialisieren.
  3. Geben Sie Parameter für die neue Orchestrierungspipeline ein:
  4. Pipeline-ID: Geben Sie die ID Ihrer Pipeline ein. Beispiel: example-pipeline.
  5. Google Cloud-Projekt-ID: Der Name des Projekts, in dem sich die Umgebung befindet. Beispiel: example-project.
  6. Region: Die Region, in der sich Ihre Umgebung befindet. Beispiel: us-central1.
  7. Umgebungs-ID: Der Name der Umgebung, in der Sie entwickeln möchten. Beispiel: dev/staging.
  8. Scheduler Managed Service for Apache Airflow Environment (Scheduler Managed Service for Apache Airflow-Umgebung): Der Name der Umgebung, in der Sie Ihre Pipelines orchestrieren möchten. Geben Sie für dieses Dokument dieselbe Umgebung in diesem Parameter an.

  9. Artifacts Bucket (Artefakt-Bucket): Der Name des Buckets, der für Pipeline-Artefakte verwendet wird, ohne das Präfix gs://. Beispiel: example-pipelines-bucket.

  10. Klicken Sie auf Weiter.

  11. Klicken Sie auf Initialisieren.

  12. Geben Sie einen Arbeitsbereich an, in dem die Pipeline initialisiert werden soll.

Agentisch

Bitten Sie den Agenten, ein Gerüst für Orchestrierungspipelines Ihres Repositorys zu erstellen.

Geben Sie einen Prompt ähnlich dem folgenden ein:

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

Nachdem Sie Pipelines in Ihrem Repository initialisiert haben, können Sie dies nicht noch einmal tun, da durch das neue Gerüst alle von Ihnen vorgenommenen Konfigurationsänderungen überschrieben würden. Sie können neue Pipelines hinzufügen, indem Sie neue Pipelinedefinitionsdateien in Ihrem Projekt erstellen und sie der Bereitstellungskonfiguration hinzufügen.

Der Pipeline eine neue Aufgabe hinzufügen

Da die ursprüngliche Pipelinekonfiguration keine Aktionen enthält, fügen Sie eine Aktion hinzu, mit der Ihr PySpark-Skript ausgeführt wird.

Nicht agentisch

So bearbeiten Sie eine Pipeline:

  1. Klicken Sie in der Aktivitätsleiste auf das Symbol Google Cloud Data Agent Kit.
  2. Maximieren Sie Data Engineering und dann Orchestration Pipelines.
  3. Wählen Sie example-pipeline.yaml aus. Ein Pipeline-Editor wird für die ausgewählte Pipeline geöffnet.
  4. Optional: Wählen Sie den Knoten Trigger für Zeitplan aus. Sie können den Zeitplan für Ihre Pipeline anpassen, indem Sie einen cron-ähnlichen Ausdruck sowie Start- und Endzeiten für den Zeitplan angeben. Der Standardzeitplan für die neu initialisierte Pipeline ist 0 2 * * *, die täglich um 2:00 Uhr ausgeführt wird.
  1. Fügen Sie eine neue Aufgabe hinzu. In dieser Anleitung fügen Sie einen PySpark-Task hinzu, der ein PySpark-Script ausführt, das Sie zuvor hinzugefügt haben:

    1. Klicken Sie auf Erste Aufgabe hinzufügen, um einen neuen Aufgabenknoten hinzuzufügen.
    2. Wählen Sie PySpark-Skript ausführen und die Datei script/wordcount.py aus.

    Der Bereich PySpark-Script ausführen wird geöffnet.

    1. Wählen Sie unter „Spark Cluster Mode“ (Spark-Clustermodus) die Option Serverless Spark (Serverless Spark) aus.
    2. Geben Sie unter Speicherort den Speicherort Ihrer Umgebung an. Beispiel: us-central1.
    3. Klicken Sie auf Speichern.

Agentisch

Führen Sie den folgenden Prompt aus:

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

Lokale Version der Pipeline bereitstellen

Stellen Sie die lokale Version der Pipeline bereit, um zu prüfen, ob sie richtig konfiguriert ist.

Wenn Sie eine lokale Version der Orchestrierungspipeline bereitstellen, lädt die Data Agent Kit-Erweiterung für Antigravity eine lokale Version des Pipeline-Bundles in die Managed Airflow-Umgebung hoch und führt sie aus. Die lokale Bereitstellung ist für die Verwendung in einer Entwicklungsumgebung vorgesehen.

Mit dem Bereitstellungsbefehl wird ein nicht pausierter Zeitplan bereitgestellt. Um dies zu verhindern, können Sie den Zeitplan manuell im Bereich „Pipelines Management“ pausieren. Sie können auch die YAML-Datei Ihrer Pipeline bearbeiten, um den triggers: - schedule-Block auszukommentieren oder zu entfernen.

Nicht agentisch

So stellen Sie eine lokale Version der Beispiel-Orchestration-Pipeline bereit:

  1. Klicken Sie in der Aktivitätsleiste auf das Symbol Google Cloud Data Agent Kit.
  2. Maximieren Sie Data Engineering und dann Orchestration Pipelines.
  3. Wählen Sie example-pipeline.yaml aus. Ein Pipeline-Editor wird für die ausgewählte Pipeline geöffnet.
  4. Wählen Sie Pipeline ausführen und dann die zuvor erstellte Entwicklungs- oder Stagingumgebung aus.

Agentisch

Führen Sie den folgenden Prompt aus:

Deploy my pipeline

Pipeline-Ausführung überwachen und Ausführungsprotokolle prüfen

Nachdem Ihre Pipeline bereitgestellt wurde, können Sie die detaillierten Informationen, den Verlauf der Pipelineausführungen und die Pipelineausführungsprotokolle dafür ansehen:

  1. Klicken Sie in der Aktivitätsleiste auf das Symbol Google Cloud Data Agent Kit.
  2. Maximieren Sie Data Engineering und wählen Sie dann Pipelines verwalten aus.
  3. Klicken Sie auf den Namen Ihrer Pipeline (example-pipeline), um den Ausführungsverlauf aufzurufen. In der Liste der Ausführungen für ein bestimmtes Datum sehen Sie die einzelnen Pipeline-Ausführungen und die Aufschlüsselung der einzelnen Aktionen in jeder Pipeline-Ausführung.
  4. Klicken Sie auf eine Aufgaben-ID, um die Logs der Aufgabenausführung aufzurufen. Da das Beispiel-PySpark-Script in Managed Service for Apache Spark ausgeführt wurde, enthalten die Aufgabenlogs einen Link zu den Batchlogs.

Pipelinefehler beheben

Wenn Ihre Pipeline fehlschlägt, wird im Bereich Pipelines verwalten die Schaltfläche Diagnose angezeigt.

Agentisch

Wenn Sie auf die Schaltfläche Diagnose klicken, generiert der Agent einen Prompt zur Fehlerbehebung bei Pipelinefehlern. Der Prompt wird entweder in die Zwischenablage kopiert oder in einer neuen Chatsitzung geöffnet.

Der Agent verwendet spezielle Skills, um Fehler in Pipelines zu beheben. Dabei werden Logs erfasst, bereitgestellter Code und der Arbeitsbereich abgeglichen und eine Ursachenanalyse erstellt.

Nach Erhalt der RCA sind folgende Schritte möglich:

  • Wenden Sie die Ursachenanalyse im aktuellen Arbeitsbereich an.
  • Bitten Sie den Agent, einen neuen Branch zu erstellen und die Änderungen dort anzuwenden.
  • Erstellen Sie ein Cloud Customer Care-Ticket mit den RCA-Details.

Hilfe bei der Fehlerbehebung bei Problemen mit der Erweiterung finden Sie unter Fehlerbehebung bei der Data Agent Kit-Erweiterung für Antigravity.

Nächste Schritte