Crea pipeline di data engineering

Questa guida descrive come creare ed eseguire il deployment di una pipeline di orchestrazione nell'estensione Google Cloud Data Agent Kit per Visual Studio Code.

La pipeline di esempio esegue uno script PySpark in Managed Service for Apache Spark.

Puoi eseguire il deployment delle pipeline di orchestrazione da VS Code come versioni locali o tramite un'azione GitHub, ad esempio quando unisci le modifiche al ramo main. Questo documento mostra come eseguire il deployment della versione locale di una pipeline di orchestrazione.

Prima di iniziare

Prima di iniziare, completa le seguenti operazioni:

  1. Installa l'estensione Data Agent Kit per VS Code.
  2. Configura le impostazioni.
  3. Aggiungi un repository GitHub al tuo workspace VS Code per archiviare pipeline di orchestrazione e asset come gli script.

Esamina i ruoli IAM richiesti

Per ottenere le autorizzazioni per creare risorse nel tuo progetto, eseguire il deployment ed eseguire pipeline di orchestrazione, chiedi all'amministratore di concederti i ruoli richiesti.

Per creare e gestire gli ambienti Managed Service for Apache Airflow e gestire gli oggetti nei bucket associati, devi disporre dei seguenti ruoli. Per ulteriori informazioni su questi ruoli utente, vedi Concedere ruoli agli utenti nella documentazione di Managed Service for Apache Airflow.

  • Amministratore ambienti e oggetti Storage (composer.environmentAndStorageObjectAdmin)
  • Service Account User (iam.serviceAccountUser)

Per lavorare con le risorse BigQuery e Cloud Storage, devi disporre dei seguenti ruoli.

  • Editor dati BigQuery (roles/bigquery.dataEditor)
  • Storage Object Admin (roles/storage.objectAdmin)

A seconda delle risorse a cui prevedi di accedere, potresti aver bisogno di ruoli aggiuntivi oltre a quelli che ti consentono di utilizzare l'estensione e lavorare con le pipeline di orchestration.

Crea un account di servizio e concedigli ruoli IAM

Utilizza un account di servizio univoco per l'ambiente Airflow gestito di terza generazione. Il account di servizio crea un ambiente Airflow gestito di terza generazione ed esegue tutte le pipeline di orchestrazione che implementi.

Chiedi all'amministratore di completare i seguenti passaggi:

  1. Crea un service account come descritto nella documentazione IAM.
  2. Concedi al service account il ruolo Worker Composer (composer.worker). Questo ruolo fornisce le autorizzazioni richieste nella maggior parte dei casi.

Come best practice, se devi accedere ad altre risorse nel tuo progettoGoogle Cloud , concedi autorizzazioni aggiuntive a questo service account solo quando necessario per l'operazione della pipeline di orchestrazione.

Crea Google Cloud risorse per la pipeline di orchestrazione

In questo passaggio, crea risorse Google Cloud per la pipeline di orchestrazione.

Crea un ambiente Airflow gestito di terza generazione

Crea un ambiente Airflow gestito di terza generazione con la seguente configurazione:

  • Nome ambiente: inserisci un nome che utilizzerai in un secondo momento per configurare la pipeline di orchestrazione. Ad esempio, example-pipeline-scheduler.
  • Località: seleziona una località. Ti consigliamo di creare tutte le risorse di questa guida nella stessa località. Ad esempio, us-central1.
  • Service account: seleziona il account di servizio che hai creato per questo ambiente.

Il seguente esempio di comando Google Cloud CLI mostra la sintassi:

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

Aggiungi parametri dell'ambiente alla configurazione dello scheduler

Fornisci i dettagli di connessione per l'ambiente Managed Airflow che eseguirà la pipeline di orchestrazione.

Aggiungi i parametri di configurazione dell'ambiente che hai creato utilizzando l'editor delle impostazioni di Google Cloud Data Agent Kit:

  1. Fai clic sull'icona Google Cloud Data Agent Kit nella barra delle attività.
  2. Espandi Impostazioni e poi fai clic su Impostazioni.
  3. Seleziona Pianificatore.
  4. Inserisci i parametri per l'ambiente Managed Airflow di terza generazione che hai creato in precedenza:
    • ID progetto: il nome del progetto in cui si trova l'ambiente. Esempio: example-project.
    • Regione: la regione in cui si trova l'ambiente. Esempio: us-central1.
    • Environment: il nome dell'ambiente. Esempio: example-pipeline-scheduler.
  5. Fai clic su Salva.

Crea un bucket per gli artefatti della pipeline

Crea un bucket Cloud Storage nello stesso progetto dell'ambiente Managed Airflow e assegnagli un nome simile a example-pipelines-bucket. Questo bucket è necessario per archiviare il job Managed Service for Apache Spark.

Alcune azioni della pipeline, ad esempio l'output dei risultati in un bucket Cloud Storage.

Crea un nuovo set di dati e una nuova tabella in BigQuery

Questa guida mostra una pipeline che scrive i dati in una tabella BigQuery. Crea le seguenti risorse BigQuery nel tuo progetto:

  1. Crea un nuovo set di dati denominato wordcount_dataset.
  2. Crea una nuova tabella BigQuery denominata wordcount_output.

Aggiungere asset della pipeline

Questa guida illustra un'attività comune di data engineering (ETL: estrazione, trasformazione, caricamento) utilizzando PySpark, la lettura da BigQuery, la trasformazione dei dati (conteggio delle parole) e il loro ricaricamento in BigQuery.

Non agentico

Aggiungi il seguente file alla cartella /scripts del repository. In un secondo momento aggiungi un'azione della pipeline che esegue questo script in Managed Service for Apache Spark.

File wordcount.py di esempio:

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

Sostituisci quanto segue:

  • ARTIFACTS_BUCKET_NAME: il nome del bucket Cloud Storage che hai creato in precedenza. Esempio: example-pipelines-bucket.
  • PROJECT_ID: il nome del progetto in cui si trova l'ambiente. Esempio: example-project.

Agentici

Chiedi all'agente di generare uno script PySpark di esempio nella cartella /scripts del tuo repository. In un secondo momento, aggiungi un'azione della pipeline che esegue questo script in Managed Service for Apache Spark.

Inserisci un prompt simile al seguente:

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

Inizializzare le pipeline di orchestrazione nel repository

Quando inizializzi le pipeline di orchestrazione, l'estensione Data Agent Kit per VS Code crea una struttura che include quanto segue:

  • Un file YAML della pipeline di orchestrazione: una definizione di pipeline di esempio che contiene una pianificazione, ma nessuna azione definita.
  • deployment.yaml: una configurazione di deployment della pipeline di esempio che definisce come deve essere eseguito il deployment della pipeline. Questo file mostra la configurazione richiesta per l'ambiente Managed Airflow, il bucket degli artefatti e qualsiasi altra risorsa utilizzata dalle azioni della pipeline.
  • .github/workflows/deploy.yaml: configura un'azione GitHub che esegue il deployment della pipeline quando unisci le modifiche al branch main del repository GitHub.
  • .github/workflows/validate.yaml: configura un'azione GitHub che convalida la pipeline dopo il deployment.

Nei passaggi successivi di questo documento, espandi queste definizioni utilizzando l'estensione Data Agent Kit per VS Code per creare e implementare una pipeline di orchestrazione in locale.

Non agentico

Per inizializzare le pipeline di orchestrazione:

  1. Fai clic sull'icona Google Cloud Data Agent Kit nella barra delle attività.
  2. Espandi Data Engineering e poi fai clic su Inizializza pipeline di orchestrazione.
  3. Inserisci i parametri per la nuova pipeline di orchestrazione:
  4. ID pipeline: inserisci l'ID della pipeline. Esempio: example-pipeline.
  5. ID progetto Google Cloud: il nome del progetto in cui si trova l'ambiente. Esempio: example-project.
  6. Regione: la regione in cui si trova il tuo ambiente. Esempio: us-central1.
  7. ID ambiente: il nome dell'ambiente con cui vuoi sviluppare. Esempio: dev/staging.
  8. Scheduler Managed Service for Apache Airflow Environment: il nome dell'ambiente in cui vuoi orchestrare le pipeline. Per questo documento, specifica lo stesso ambiente in questo parametro.

  9. Bucket artefatti: il nome del bucket utilizzato per gli artefatti della pipeline, senza il prefisso gs://. Esempio: example-pipelines-bucket.

  10. Fai clic su Avanti.

  11. Fai clic su Inizializza.

  12. Specifica uno spazio di lavoro in cui vuoi inizializzare la pipeline.

Agentici

Chiedi all'agente di creare una struttura per le pipeline di orchestrazione del tuo repository.

Inserisci un prompt simile al seguente:

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

Dopo aver inizializzato le pipeline nel repository, non puoi farlo di nuovo perché la nuova impalcatura sovrascriverebbe le modifiche alla configurazione apportate. Puoi aggiungere nuove pipeline creando nuovi file di definizione della pipeline nel tuo progetto e aggiungendoli alla configurazione del deployment.

Aggiungi una nuova attività alla pipeline

Poiché la configurazione iniziale della pipeline non prevede azioni, aggiungi un'azione che esegue lo script PySpark.

Non agentico

Per modificare una pipeline:

  1. Fai clic sull'icona Google Cloud Data Agent Kit nella barra delle attività.
  2. Espandi Data Engineering, quindi Pipeline di orchestrazione.
  3. Seleziona example-pipeline.yaml. Si apre un editor della pipeline per la pipeline selezionata.
  4. (Facoltativo) Seleziona il nodo Attivatore di pianificazione. Puoi modificare la pianificazione della pipeline specificando un'espressione simile a cron e gli orari di inizio e fine della pianificazione. La pianificazione predefinita per la pipeline appena inizializzata è 0 2 * * *, che viene eseguita alle 2:00 ogni giorno.
  1. Aggiungi una nuova attività. In questa guida, aggiungi un'attività PySpark che esegue uno script PySpark che hai aggiunto in precedenza:

    1. Fai clic su Aggiungi prima attività per aggiungere un nuovo nodo di attività.
    2. Seleziona Esegui script PySpark e il file script/wordcount.py.

    Viene visualizzato il riquadro Esegui script PySpark.

    1. In Modalità cluster Spark, seleziona Spark serverless.
    2. In Posizione, specifica la posizione in cui si trova il tuo ambiente. Esempio: us-central1.
    3. Fai clic su Salva.

Agentici

Esegui il seguente prompt:

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

Esegui il deployment della versione locale della pipeline

Esegui il deployment della versione locale della pipeline per verificare che sia configurata correttamente.

Quando esegui il deployment di una versione locale della pipeline di orchestrazione, l'estensione Data Agent Kit per VS Code carica una versione locale del bundle della pipeline nell'ambiente Managed Airflow e la esegue. Il deployment locale è destinato a essere utilizzato quando si lavora in un ambiente di sviluppo.

Il comando di deployment esegue il deployment di una pianificazione non sospesa. Per evitare questo problema, puoi mettere in pausa la pianificazione manualmente nel riquadro Gestione pipeline. Puoi anche modificare il file YAML della pipeline per commentare o rimuovere il blocco triggers: - schedule.

Non agentico

Per eseguire il deployment di una versione locale della pipeline di orchestrazione di esempio, segui questi passaggi:

  1. Fai clic sull'icona Google Cloud Data Agent Kit nella barra delle attività.
  2. Espandi Data Engineering e poi Pipeline di orchestrazione.
  3. Seleziona example-pipeline.yaml. Si apre un editor della pipeline per la pipeline selezionata.
  4. Seleziona Esegui pipeline, quindi seleziona l'ambiente di sviluppo o di staging che hai creato in precedenza.

Agentici

Esegui il seguente prompt:

Deploy my pipeline

Monitora l'esecuzione della pipeline e controlla i log di esecuzione

Dopo il deployment della pipeline, puoi visualizzarne le informazioni dettagliate, la cronologia delle esecuzioni e i log di esecuzione:

  1. Fai clic sull'icona Google Cloud Data Agent Kit nella barra delle attività.
  2. Espandi Data Engineering, quindi seleziona Gestione delle pipeline.
  3. Fai clic sul nome della pipeline (example-pipeline) per visualizzarne la cronologia di esecuzione. Nell'elenco delle esecuzioni per una data specifica, puoi visualizzare le singole esecuzioni della pipeline e la suddivisione delle singole azioni all'interno di ogni esecuzione della pipeline.
  4. Fai clic su un ID attività per visualizzare i log di esecuzione dell'attività. Poiché lo script PySpark di esempio è stato eseguito in Managed Service for Apache Spark, i log delle attività avranno un link ai log batch.

Risolvere i problemi e correggere gli errori della pipeline

Quando la pipeline non va a buon fine, nel riquadro Gestione pipeline viene visualizzato un pulsante Diagnostica.

Agentici

Quando fai clic sul pulsante Diagnostica, l'agente genera un prompt per risolvere il problema della pipeline. Il prompt viene copiato negli appunti o aperto in una nuova sessione di chat.

L'agente utilizza competenze specializzate per risolvere i problemi relativi alle pipeline, concentrandosi su raccolta dei log, controllo incrociato del codice e dello spazio di lavoro di cui è stato eseguito il deployment e generazione di un'analisi delle cause principali.

I possibili passaggi successivi dopo aver ricevuto l'analisi delle cause principali sono i seguenti:

  • Applica l'analisi delle cause principali nell'area di lavoro corrente.
  • Chiedi all'agente di creare un nuovo ramo e di applicare le modifiche lì.
  • Apri un ticket di assistenza clienti Google Cloud con i dettagli dell'analisi della causa principale.

Per assistenza nella risoluzione dei problemi relativi all'estensione, consulta Risolvere i problemi dell'estensione Data Agent Kit per VS Code.

Passaggi successivi