Créer des pipelines d'ingénierie des données

Ce guide explique comment créer et déployer un pipeline d'orchestration dans l'extension Google Cloud Data Agent Kit pour Antigravity.

L'exemple de pipeline exécute un script PySpark dans Managed Service pour Apache Spark.

Vous pouvez déployer des pipelines d'orchestration à partir d'Antigravity en tant que versions locales ou via une action GitHub, par exemple lorsque vous fusionnez des modifications dans la branche main. Ce document explique comment déployer la version locale d'un pipeline d'orchestration.

Avant de commencer

Avant de commencer, procédez comme suit :

  1. Installez l'extension Data Agent Kit pour Antigravity.
  2. Configurez vos paramètres.
  3. Ajoutez un dépôt GitHub à votre espace de travail Antigravity pour stocker les pipelines d'orchestration et les composants tels que les scripts.

Examiner les rôles IAM requis

Pour obtenir les autorisations nécessaires pour créer des ressources dans votre projet, déployer et exécuter des pipelines d'orchestration, demandez à votre administrateur de vous accorder les rôles requis.

Pour créer et gérer des environnements Managed Service pour Apache Airflow, et pour gérer les objets dans leurs buckets associés, vous devez disposer des rôles suivants. Pour en savoir plus sur ces rôles utilisateur, consultez Attribuer des rôles aux utilisateurs dans la documentation Managed Service for Apache Airflow.

  • Administrateur de l'environnement et des objets Storage (composer.environmentAndStorageObjectAdmin)
  • Utilisateur du compte de service (iam.serviceAccountUser)

Pour travailler avec les ressources BigQuery et Cloud Storage, vous avez besoin des rôles suivants.

  • Éditeur de données BigQuery (roles/bigquery.dataEditor)
  • Administrateur d'objets de l'espace de stockage (roles/storage.objectAdmin)

Selon les ressources auxquelles vous prévoyez d'accéder, vous aurez peut-être besoin de rôles supplémentaires en plus de ceux qui vous permettent d'utiliser l'extension et de travailler avec des pipelines d'orchestration.

Créer un compte de service et lui attribuer des rôles IAM

Utilisez un compte de service unique pour l'environnement Managed Airflow de 3e génération. Le compte de service crée un environnement Managed Airflow de 3e génération et exécute tous les pipelines d'orchestration que vous déployez.

Demandez à votre administrateur de procéder comme suit :

  1. Créez un compte de service comme décrit dans la documentation IAM.
  2. Attribuez le rôle Composer Worker (composer.worker) au compte de service. Ce rôle fournit les autorisations requises dans la plupart des cas.

En règle générale, si vous devez accéder à d'autres ressources dans votre projetGoogle Cloud , n'accordez des autorisations supplémentaires à ce compte de service que lorsque cela est nécessaire pour le fonctionnement du pipeline d'orchestration.

Créer des ressources Google Cloud pour votre pipeline d'orchestration

Au cours de cette étape, créez des ressources Google Cloud pour votre pipeline d'orchestration.

Créer un environnement Airflow de 3e génération géré

Créez un environnement Managed Airflow de 3e génération avec la configuration suivante :

  • Nom de l'environnement : saisissez un nom que vous utiliserez ultérieurement pour configurer le pipeline d'orchestration. Exemple :example-pipeline-scheduler
  • Emplacement : sélectionnez un emplacement. Nous vous recommandons de créer toutes les ressources de ce guide au même emplacement. Exemple :us-central1
  • Compte de service : sélectionnez le compte de service que vous avez créé pour cet environnement.

L'exemple de commande Google Cloud CLI suivant illustre la syntaxe :

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

Ajouter des paramètres d'environnement à la configuration du planificateur

Fournissez les informations de connexion pour l'environnement Managed Airflow qui exécutera votre pipeline d'orchestration.

Ajoutez les paramètres de configuration de l'environnement que vous avez créé à l'aide de l'éditeur de paramètres du kit Google Cloud Data Agent :

  1. Cliquez sur l'icône Google Cloud Data Agent Kit dans la barre d'activité.
  2. Développez Paramètres, puis cliquez sur Paramètres.
  3. Sélectionnez Programmation.
  4. Saisissez les paramètres de l'environnement Managed Airflow 3e génération que vous avez créé précédemment :
    • ID du projet : nom du projet dans lequel se trouve l'environnement. Exemple : example-project.
    • Région : région dans laquelle se trouve l'environnement. Exemple : us-central1
    • Environnement : nom de l'environnement. Exemple : example-pipeline-scheduler
  5. Cliquez sur Enregistrer.

Créer un bucket pour les artefacts de pipeline

Créez un bucket Cloud Storage dans le même projet que l'environnement Managed Airflow et donnez-lui un nom semblable à example-pipelines-bucket. Ce bucket est nécessaire pour stocker votre job Managed Service pour Apache Spark.

Certaines actions de pipeline, telles que la sortie des résultats dans un bucket Cloud Storage.

Créer un ensemble de données et une table dans BigQuery

Ce guide présente un pipeline qui écrit des données dans une table BigQuery. Créez les ressources BigQuery suivantes dans votre projet :

  1. Créez un ensemble de données nommé wordcount_dataset.
  2. Créez une table BigQuery nommée wordcount_output.

Ajouter des composants de pipeline

Ce guide présente une tâche courante d'ingénierie des données (ETL : Extract, Transform, Load) à l'aide de PySpark, en lisant les données de BigQuery, en les transformant (nombre de mots) et en les chargeant de nouveau dans BigQuery.

Non agentique

Ajoutez le fichier suivant au dossier /scripts de votre dépôt. Vous ajouterez ensuite une action de pipeline qui exécutera ce script dans Managed Service pour Apache Spark.

Exemple de fichier wordcount.py :

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

Remplacez les éléments suivants :

  • ARTIFACTS_BUCKET_NAME : nom du bucket Cloud Storage que vous avez créé précédemment. Exemple : example-pipelines-bucket.
  • PROJECT_ID : nom du projet dans lequel réside l'environnement. Exemple : example-project.

Agentique

Demandez à l'agent de générer un exemple de script PySpark dans le dossier /scripts de votre dépôt. Vous ajouterez ensuite une action de pipeline qui exécutera ce script dans Managed Service pour Apache Spark.

Saisissez un prompt semblable à celui-ci :

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

Initialiser les pipelines d'orchestration dans votre dépôt

Lorsque vous initialisez des pipelines d'orchestration, l'extension Data Agent Kit pour Antigravity crée un échafaudage qui inclut les éléments suivants :

  • Fichier YAML de pipeline d'orchestration : exemple de définition de pipeline contenant une planification, mais aucune action définie.
  • deployment.yaml : exemple de configuration de déploiement de pipeline qui définit la manière dont votre pipeline doit être déployé. Ce fichier présente la configuration requise pour l'environnement Managed Airflow, le bucket d'artefacts et toutes les autres ressources utilisées par les actions de votre pipeline.
  • .github/workflows/deploy.yaml : configure une action GitHub qui déploie votre pipeline lorsque vous fusionnez des modifications dans la branche main de votre dépôt GitHub.
  • .github/workflows/validate.yaml : configure une action GitHub qui valide votre pipeline après son déploiement.

Dans les étapes ultérieures de ce document, vous développerez ces définitions à l'aide de l'extension Data Agent Kit pour Antigravity afin de créer et de déployer un pipeline d'orchestration en local.

Non agentique

Pour initialiser les pipelines d'orchestration, procédez comme suit :

  1. Cliquez sur l'icône Google Cloud Data Agent Kit dans la barre d'activité.
  2. Développez Data Engineering, puis cliquez sur Initialize orchestration pipeline.
  3. Saisissez les paramètres du nouveau pipeline d'orchestration :
  4. ID du pipeline : saisissez l'ID de votre pipeline. Exemple : example-pipeline.
  5. ID du projet Google Cloud : nom du projet dans lequel réside l'environnement. Exemple : example-project.
  6. Région : région dans laquelle se trouve votre environnement. Exemple : us-central1
  7. ID d'environnement : nom de l'environnement avec lequel vous souhaitez développer. Exemple : dev/staging.
  8. Environnement Scheduler Managed Service pour Apache Airflow : nom de l'environnement dans lequel vous souhaitez orchestrer vos pipelines. Pour ce document, spécifiez le même environnement dans ce paramètre.

  9. Bucket d'artefacts : nom du bucket utilisé pour les artefacts de pipeline, sans le préfixe gs://. Exemple : example-pipelines-bucket.

  10. Cliquez sur Suivant.

  11. Cliquez sur Initialize (Initialiser).

  12. Spécifiez un espace de travail dans lequel vous souhaitez initialiser le pipeline.

Agentique

Demandez à l'agent de créer un échafaudage pour les pipelines d'orchestration de votre dépôt.

Saisissez un prompt semblable à celui-ci :

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

Une fois que vous avez initialisé des pipelines dans votre dépôt, vous ne pouvez plus le faire, car la nouvelle structure écraserait les modifications de configuration que vous avez apportées. Vous pouvez ajouter des pipelines en créant des fichiers de définition de pipeline dans votre projet et en les ajoutant à la configuration de déploiement.

Ajouter une tâche au pipeline

Étant donné que la configuration initiale du pipeline ne comporte aucune action, vous devez ajouter une action qui exécute votre script PySpark.

Non agentique

Pour modifier un pipeline :

  1. Cliquez sur l'icône Google Cloud Data Agent Kit dans la barre d'activité.
  2. Développez Ingénierie des données, puis Pipelines d'orchestration.
  3. Sélectionnez example-pipeline.yaml. Un éditeur de pipeline s'ouvre pour le pipeline sélectionné.
  4. Facultatif : Sélectionnez le nœud Déclencheur de programmation. Vous pouvez ajuster la planification de votre pipeline en spécifiant une expression de type Cron, ainsi que les heures de début et de fin de la planification. La planification par défaut du pipeline nouvellement initialisé est 0 2 * * *, qui s'exécute tous les jours à 2h.
  1. Ajoutez une tâche. Dans ce guide, vous allez ajouter une tâche PySpark qui exécute un script PySpark que vous avez ajouté précédemment :

    1. Cliquez sur Ajouter une première tâche pour ajouter un nœud de tâche.
    2. Sélectionnez Exécuter le script PySpark et le fichier script/wordcount.py.

    Le panneau Exécuter le script PySpark s'ouvre.

    1. Dans le mode cluster Spark, sélectionnez Serverless Spark.
    2. Dans Emplacement, spécifiez l'emplacement de votre environnement. Exemple : us-central1.
    3. Cliquez sur Enregistrer.

Agentique

Exécutez la requête suivante :

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

Déployer la version locale du pipeline

Déployez la version locale du pipeline pour vérifier qu'il est correctement configuré.

Lorsque vous déployez une version locale du pipeline d'orchestration, l'extension Data Agent Kit pour Antigravity importe une version locale du bundle de pipeline dans l'environnement Managed Airflow et l'exécute. Le déploiement local est destiné à être utilisé lorsque vous travaillez dans un environnement de développement.

La commande "deploy" déploie une planification non suspendue. Pour éviter cela, vous pouvez suspendre manuellement la programmation dans le volet "Gestion des pipelines". Vous pouvez également modifier le fichier YAML de votre pipeline pour mettre en commentaire ou supprimer le bloc triggers: - schedule.

Non agentique

Pour déployer une version locale de l'exemple de pipeline d'orchestration, procédez comme suit :

  1. Cliquez sur l'icône Google Cloud Data Agent Kit dans la barre d'activité.
  2. Développez Ingénierie des données, puis Pipelines d'orchestration.
  3. Sélectionnez example-pipeline.yaml. Un éditeur de pipeline s'ouvre pour le pipeline sélectionné.
  4. Sélectionnez Exécuter le pipeline, puis sélectionnez l'environnement de développement ou de préproduction que vous avez créé précédemment.

Agentique

Exécutez la requête suivante :

Deploy my pipeline

Surveiller l'exécution du pipeline et vérifier les journaux d'exécution

Une fois votre pipeline déployé, vous pouvez afficher des informations détaillées, l'historique des exécutions du pipeline et les journaux d'exécution du pipeline :

  1. Cliquez sur l'icône Google Cloud Data Agent Kit dans la barre d'activité.
  2. Développez Ingénierie des données, puis sélectionnez Gestion des pipelines.
  3. Cliquez sur le nom de votre pipeline (example-pipeline) pour afficher son historique d'exécution. Dans la liste des exécutions pour une date spécifique, vous pouvez voir les exécutions de pipeline individuelles et la répartition des actions individuelles dans chaque exécution de pipeline.
  4. Cliquez sur un ID de tâche pour afficher les journaux d'exécution de la tâche. Étant donné que l'exemple de script PySpark a été exécuté dans Managed Service pour Apache Spark, les journaux des tâches comporteront un lien vers les journaux Batch.

Résoudre les problèmes d'échec des pipelines

Lorsque votre pipeline échoue, un bouton Diagnostiquer s'affiche dans le volet Gestion des pipelines.

Agentique

Lorsque vous cliquez sur le bouton Diagnostiquer, l'agent génère une requête pour résoudre l'échec du pipeline. La requête est copiée dans votre presse-papiers ou ouverte dans une nouvelle session de chat.

L'agent utilise des compétences spécialisées pour résoudre les problèmes liés aux pipelines. Il se concentre sur la collecte de journaux, la vérification croisée du code déployé et de l'espace de travail, et la génération d'une analyse des causes premières.

Voici les prochaines étapes possibles après avoir reçu l'ARC :

  • Appliquez l'analyse de la cause première dans l'espace de travail actuel.
  • Demandez à l'agent de créer une branche et d'y appliquer les modifications.
  • Créez une demande Cloud Customer Care avec les détails de l'analyse des causes premières.

Pour obtenir de l'aide sur la résolution des problèmes liés à l'extension, consultez Résoudre les problèmes liés à l'extension Data Agent Kit pour Antigravity.

Étapes suivantes