Créer des pipelines d'ingénierie des données

Ce guide explique comment créer et déployer un pipeline d'orchestration dans l'extension Google Cloud Data Agent Kit pour Visual Studio Code.

L'exemple de pipeline exécute un script PySpark dans Managed Service pour Apache Spark.

Vous pouvez déployer des pipelines d'orchestration à partir de VS Code en tant que versions locales ou via une action GitHub, par exemple lorsque vous fusionnez des modifications dans la branche main. Ce document explique comment déployer la version locale d'un pipeline d'orchestration.

Avant de commencer

Avant de commencer, procédez comme suit :

  1. Installez l'extension Data Agent Kit pour VS Code.
  2. Configurez vos paramètres.
  3. Ajoutez un dépôt GitHub à votre espace de travail VS Code pour stocker les pipelines d'orchestration et les éléments tels que les scripts.

Vérifier les rôles IAM requis

Pour obtenir les autorisations nécessaires pour créer des ressources dans votre projet, déployer et exécuter des pipelines d'orchestration, demandez à votre administrateur de vous accorder les rôles requis.

Pour créer et gérer des environnements Managed Service pour Apache Airflow et gérer des objets dans les buckets associés, vous avez besoin des rôles suivants. Pour en savoir plus sur ces rôles utilisateur, consultez la section Attribuer des rôles aux utilisateurs dans la documentation Managed Service pour Apache Airflow.

  • Administrateur de l'environnement et des objets Storage (composer.environmentAndStorageObjectAdmin)
  • Utilisateur du compte de service (iam.serviceAccountUser)

Pour utiliser les ressources BigQuery et Cloud Storage, vous avez besoin des rôles suivants.

  • Éditeur de données BigQuery (roles/bigquery.dataEditor)
  • Administrateur d'objets de l'espace de stockage (roles/storage.objectAdmin)

Selon les ressources auxquelles vous prévoyez d'accéder, vous aurez peut-être besoin de rôles supplémentaires en plus de ceux qui vous permettent d'utiliser l'extension et de travailler avec des pipelines d'orchestration.

Créer un compte de service et lui attribuer des rôles IAM

Utilisez un compte de service unique pour l'environnement Managed Airflow de 3e génération. Le compte de service crée un environnement Managed Airflow de 3e génération et exécute tous les pipelines d'orchestration que vous déployez.

Demandez à votre administrateur de procéder comme suit :

  1. Créez un compte de service comme décrit dans la documentation IAM.
  2. Attribuez le rôle Nœud de calcul Composer (composer.worker) au compte de service. Dans la plupart des cas, ce rôle fournit les autorisations requises.

Nous vous recommandons, si vous devez accéder à d'autres ressources de votre Google Cloud projet, d'accorder des autorisations supplémentaires à ce compte de service uniquement lorsque cela est nécessaire au fonctionnement du pipeline d'orchestration.

Créer Google Cloud des ressources pour votre pipeline d'orchestration

Au cours de cette étape, créez des ressources pour votre pipeline d'orchestration. Google Cloud

Créer un environnement Managed Airflow de 3e génération

Créez un environnement Managed Airflow de 3e génération avec la configuration suivante :

  • Nom de l'environnement : saisissez un nom que vous utiliserez ultérieurement pour configurer le pipeline d'orchestration. Par exemple, example-pipeline-scheduler.
  • Emplacement : sélectionnez un emplacement. Nous vous recommandons de créer toutes les ressources de ce guide au même emplacement. Par exemple, us-central1.
  • Compte de service : sélectionnez le compte de service que vous avez créé pour cet environnement.

L'exemple de commande Google Cloud CLI suivant illustre la syntaxe :

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

Ajouter des paramètres d'environnement à la configuration du planificateur

Fournissez les informations de connexion pour l'environnement Managed Airflow qui exécutera votre pipeline d'orchestration.

Ajoutez les paramètres de configuration de l'environnement que vous avez créé à l'aide de l'éditeur de paramètres Google Cloud Data Agent Kit :

  1. Cliquez sur l'icône Google Cloud Data Agent Kit dans la barre des tâches.
  2. Développez Settings (Paramètres), puis cliquez sur Settings (Paramètres).
  3. Sélectionnez Scheduler (Planificateur).
  4. Saisissez les paramètres de l'environnement Managed Airflow de 3e génération que vous avez créé précédemment :
    • ID du projet : nom du projet dans lequel se trouve l'environnement. Exemple : example-project.
    • Région : région dans laquelle se trouve l'environnement. Exemple : us-central1.
    • Environnement : nom de l'environnement. Exemple : example-pipeline-scheduler.
  5. Cliquez sur Enregistrer.

Créer un bucket pour les artefacts de pipeline

Créez un bucket Cloud Storage dans le même projet que l' environnement Managed Airflow et attribuez-lui un nom semblable à example-pipelines-bucket. Ce bucket est nécessaire pour stocker votre job Managed Service pour Apache Spark.

Certaines actions de pipeline, telles que la sortie des résultats vers un bucket Cloud Storage.

Créer un ensemble de données et une table dans BigQuery

Ce guide présente un pipeline qui écrit des données dans une table BigQuery. Créez les ressources BigQuery suivantes dans votre projet :

  1. Créez un nouvel ensemble de données nommé wordcount_dataset.
  2. Créez une table BigQuery nommée wordcount_output.

Ajouter des éléments de pipeline

Ce guide présente une tâche courante d'ingénierie des données (ETL : Extract, Transform, Load) à l'aide de PySpark, en lisant des données à partir de BigQuery, en les transformant (comptage de mots) et en les chargeant dans BigQuery.

Non agentique

Ajoutez le fichier suivant au dossier /scripts de votre dépôt. Vous ajouterez ensuite une action de pipeline qui exécutera ce script dans Managed Service pour Apache Spark.

Exemple de fichier wordcount.py :

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

Remplacez les éléments suivants :

  • ARTIFACTS_BUCKET_NAME : nom du bucket Cloud Storage que vous avez créé précédemment. Exemple : example-pipelines-bucket.
  • PROJECT_ID : nom du projet dans lequel réside l'environnement. Exemple : example-project.

Agentique

Demandez à l'agent de générer un exemple de script PySpark dans le dossier /scripts de votre dépôt. Vous ajouterez ensuite une action de pipeline qui exécutera ce script dans Managed Service pour Apache Spark.

Saisissez un prompt semblable à celui-ci :

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

Initialiser les pipelines d'orchestration dans votre dépôt

Lorsque vous initialisez des pipelines d'orchestration, l'extension Data Agent Kit pour VS Code crée une structure qui inclut les éléments suivants :

  • Fichier YAML de pipeline d'orchestration : exemple de définition de pipeline contenant une planification, mais aucune action définie.
  • deployment.yaml: exemple de configuration de déploiement de pipeline qui définit comment votre pipeline doit être déployé. Ce fichier illustre la configuration requise pour l'environnement Managed Airflow, le bucket d'artefacts et toutes les autres ressources utilisées par les actions de votre pipeline.
  • .github/workflows/deploy.yaml: configure une action GitHub qui déploie votre pipeline lorsque vous fusionnez des modifications dans la branche main de votre dépôt GitHub.
  • .github/workflows/validate.yaml: configure une action GitHub qui valide votre pipeline une fois qu'il est déployé.

Dans les étapes ultérieures de ce document, vous développerez ces définitions à l'aide de l'extension Data Agent Kit pour VS Code afin de créer et de déployer un pipeline d'orchestration en local.

Non agentique

Pour initialiser des pipelines d'orchestration, procédez comme suit :

  1. Cliquez sur l'icône Google Cloud Data Agent Kit dans la barre des tâches.
  2. Développez Data Engineering (Ingénierie des données), puis cliquez sur Initialize orchestration pipeline (Initialiser le pipeline d'orchestration).
  3. Saisissez les paramètres du nouveau pipeline d'orchestration :
  4. ID du pipeline : saisissez l'ID de votre pipeline. Exemple : example-pipeline.
  5. ID du projet Google Cloud : nom du projet dans lequel réside l'environnement réside. Exemple : example-project.
  6. Région : région dans laquelle réside votre environnement. Exemple : us-central1.
  7. ID de l'environnement : nom de l'environnement avec lequel vous souhaitez développer. Exemple : dev/staging.
  8. Environnement Managed Service pour Apache Airflow du planificateur : nom de l'environnement dans lequel vous souhaitez orchestrer vos pipelines. Pour ce document, spécifiez le même environnement dans ce paramètre.

  9. Bucket d'artefacts : nom du bucket utilisé pour les artefacts de pipeline, sans le préfixe gs://. Exemple : example-pipelines-bucket.

  10. Cliquez sur Suivant.

  11. Cliquez sur Initialize (Initialiser).

  12. Spécifiez un espace de travail dans lequel vous souhaitez initialiser le pipeline.

Agentique

Demandez à l'agent de créer une structure pour les pipelines d'orchestration de votre dépôt.

Saisissez un prompt semblable à celui-ci :

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

Une fois que vous avez initialisé des pipelines dans votre dépôt, vous ne pouvez plus le faire, car la nouvelle structure écraserait toutes les modifications de configuration que vous avez apportées. Vous pouvez ajouter de nouveaux pipelines en créant des fichiers de définition de pipeline dans votre projet et en les ajoutant à la configuration de déploiement.

Ajouter une tâche au pipeline

Étant donné que la configuration initiale du pipeline ne comporte aucune action, vous ajoutez une action qui exécute votre script PySpark.

Non agentique

Pour modifier un pipeline, procédez comme suit :

  1. Cliquez sur l'icône Google Cloud Data Agent Kit dans la barre des tâches.
  2. Développez Data Engineering (Ingénierie des données), puis Orchestration Pipelines (Pipelines d'orchestration).
  3. Sélectionnez example-pipeline.yaml. Un éditeur de pipeline s'ouvre pour le pipeline sélectionné.
  4. Facultatif : Sélectionnez le nœud Schedule trigger (Déclencheur de planification). Vous pouvez ajuster la planification de votre pipeline en spécifiant une expression de type Cron, ainsi que des heures de début et de fin de la planification. La planification par défaut du pipeline nouvellement initialisé est 0 2 * * *, qui s'exécute tous les jours à 2h.
  1. Ajoutez une tâche. Dans ce guide, vous ajoutez une tâche PySpark qui exécute un script PySpark que vous avez ajouté précédemment :

    1. Cliquez sur Add first task (Ajouter la première tâche) pour ajouter un nœud de tâche.
    2. Sélectionnez Execute PySpark script (Exécuter le script PySpark) et le fichier script/wordcount.py.

    Le panneau Execute PySpark script (Exécuter le script PySpark) s'ouvre.

    1. Dans "Spark Cluster Mode" (Mode de cluster Spark), sélectionnez Serverless Spark (Spark sans serveur).
    2. Dans Location (Emplacement), spécifiez l'emplacement où réside votre environnement. Exemple : us-central1.
    3. Cliquez sur Enregistrer.

Agentique

Exécutez le prompt suivant :

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

Déployer la version locale du pipeline

Déployez la version locale du pipeline pour vérifier qu'elle est correctement configurée.

Lorsque vous déployez une version locale du pipeline d'orchestration, l'extension Data Agent Kit pour VS Code importe une version locale du bundle de pipeline dans l'environnement Managed Airflow et l'exécute. Le déploiement local est destiné à être utilisé lorsque vous travaillez dans un environnement de développement.

La commande de déploiement déploie une planification non suspendue. Pour éviter cela, vous pouvez suspendre la planification manuellement dans le panneau "Pipelines Management" (Gestion des pipelines). Vous pouvez également modifier votre fichier YAML de pipeline pour commenter ou supprimer le bloc triggers: - schedule.

Non agentique

Pour déployer une version locale de l'exemple de pipeline d'orchestration, procédez comme suit :

  1. Cliquez sur l'icône Google Cloud Data Agent Kit dans la barre des tâches.
  2. Développez Data Engineering (Ingénierie des données), puis Orchestration Pipelines (Pipelines d'orchestration).
  3. Sélectionnez example-pipeline.yaml. Un éditeur de pipeline s'ouvre pour le pipeline sélectionné.
  4. Sélectionnez Run pipeline (Exécuter le pipeline), puis sélectionnez l'environnement de développement ou de préproduction que vous avez créé précédemment.

Agentique

Exécutez le prompt suivant :

Deploy my pipeline

Surveiller l'exécution du pipeline et vérifier les journaux d'exécution

Une fois votre pipeline déployé, vous pouvez afficher les informations détaillées, l'historique des exécutions de pipeline et les journaux d'exécution du pipeline :

  1. Cliquez sur l'icône Google Cloud Data Agent Kit dans la barre des tâches.
  2. Développez Data Engineering (Ingénierie des données), puis sélectionnez Pipelines management (Gestion des pipelines).
  3. Cliquez sur le nom de votre pipeline (example-pipeline) pour afficher son historique d'exécution. Dans la liste des exécutions pour une date spécifique, vous pouvez voir les exécutions de pipeline individuelles et la répartition des actions individuelles dans chaque exécution de pipeline.
  4. Cliquez sur un ID de tâche pour afficher les journaux d'exécution de la tâche. Étant donné que l'exemple de script PySpark a été exécuté dans Managed Service pour Apache Spark, les journaux de tâches contiendront un lien vers les journaux de traitement par lot.

Dépanner et corriger les échecs de pipeline

Lorsque votre pipeline échoue, un bouton Diagnose (Diagnostiquer) s'affiche dans le panneau Pipelines management (Gestion des pipelines).

Agentique

Lorsque vous cliquez sur le bouton Diagnose (Diagnostiquer), l'agent génère un prompt pour résoudre l'échec du pipeline. Le prompt est copié dans votre presse-papiers ou ouvert dans une nouvelle session de chat.

L'agent utilise des compétences spécialisées pour résoudre les problèmes liés aux pipelines, en se concentrant sur la collecte de journaux, la vérification croisée du code déployé et de l'espace de travail, ainsi que la génération d'une analyse des causes fondamentales.

Voici les étapes possibles après avoir reçu l'analyse des causes fondamentales :

  • Appliquez l'analyse des causes fondamentales dans l'espace de travail actuel.
  • Demandez à l'agent de créer une branche et d'y appliquer les modifications.
  • Ouvrez une demande Cloud Customer Care avec les détails de l'analyse des causes fondamentales.

Pour obtenir de l'aide sur la résolution des problèmes liés à l'extension, consultez la section Dépannage.

Étape suivante