Questa guida descrive come creare ed eseguire il deployment di una pipeline di orchestrazione nell'estensione Google Cloud Data Agent Kit per Antigravity.
La pipeline di esempio esegue uno script PySpark in Managed Service for Apache Spark.
Puoi eseguire il deployment delle pipeline di orchestrazione da Antigravity come versioni locali o tramite un'azione GitHub, ad esempio quando unisci le modifiche al ramo main. Questo documento mostra come eseguire il deployment della versione locale di una pipeline di orchestrazione.
Prima di iniziare
Prima di iniziare, completa i seguenti passaggi:
- Installa l'estensione Data Agent Kit per Antigravity.
- Configura le impostazioni.
- Aggiungi un repository GitHub all'area di lavoro Antigravity per archiviare pipeline di orchestrazione e asset come gli script.
Esamina i ruoli IAM richiesti
Per ottenere le autorizzazioni per creare risorse nel progetto, eseguire il deployment ed eseguire pipeline di orchestrazione, chiedi all'amministratore di concederti i ruoli richiesti.
Per creare e gestire gli ambienti Managed Service for Apache Airflow e gestire gli oggetti nei bucket associati, devi disporre dei seguenti ruoli. Per ulteriori informazioni su questi ruoli utente, vedi Concedere ruoli agli utenti nella documentazione di Managed Service for Apache Airflow.
- Amministratore ambienti e oggetti Storage (composer.environmentAndStorageObjectAdmin)
- Utente Service Account (
iam.serviceAccountUser)
Per lavorare con le risorse BigQuery e Cloud Storage, devi disporre dei seguenti ruoli.
- Editor dati BigQuery (
roles/bigquery.dataEditor) - Storage Object Admin (
roles/storage.objectAdmin)
A seconda delle risorse a cui prevedi di accedere, potresti aver bisogno di ruoli aggiuntivi oltre a quelli che ti consentono di utilizzare l'estensione e lavorare con le pipeline di orchestrazione.
Crea un account di servizio e concedi i ruoli IAM
Utilizza un account di servizio univoco per l'ambiente Managed Airflow di terza generazione. Il account di servizio crea un ambiente Managed Airflow di terza generazione ed esegue tutte le pipeline di orchestrazione di cui esegui il deployment.
Chiedi all'amministratore di completare i seguenti passaggi:
- Crea un service account come descritto nella documentazione IAM.
- Concedi al service account il ruolo Worker Composer (
composer.worker). Questo ruolo fornisce le autorizzazioni richieste nella maggior parte dei casi.
Come best practice, se devi accedere ad altre risorse nel tuo Google Cloud progetto, concedi autorizzazioni aggiuntive a questo service account solo quando necessario per l'operazione della pipeline di orchestrazione.
Creare Google Cloud risorse per la pipeline di orchestrazione
In questo passaggio, crea Google Cloud risorse per la pipeline di orchestrazione.
Crea un ambiente Managed Airflow di terza generazione
Crea un ambiente Managed Airflow di terza generazione con la seguente configurazione:
- Nome ambiente: inserisci un nome che utilizzerai in un secondo momento per configurare
la pipeline di orchestrazione. Ad esempio,
example-pipeline-scheduler. - Località: seleziona una località. Ti consigliamo di creare tutte le risorse in questa guida nella stessa località. Ad esempio,
us-central1. - Service account: seleziona il account di servizio che hai creato per questo ambiente.
Il seguente esempio di comando Google Cloud CLI mostra la sintassi:
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
Aggiungi parametri di ambiente alla configurazione dello scheduler
Fornisci i dettagli di connessione per l'ambiente Managed Airflow che eseguirà la pipeline di orchestrazione.
Aggiungi i parametri di configurazione dell'ambiente che hai creato utilizzando l'editor delle impostazioni di Google Cloud Data Agent Kit:
- Fai clic sull'icona Google Cloud Data Agent Kit nella barra delle attività.
- Espandi Impostazioni e poi fai clic su Impostazioni.
- Seleziona Scheduler.
- Inserisci i parametri per l'ambiente Managed Airflow di terza generazione che hai creato in precedenza:
- ID progetto: nome del progetto in cui si trova l'ambiente.
Esempio:
example-project. - Regione: regione in cui si trova l'ambiente. Esempio:
us-central1. - Ambiente: nome dell'ambiente. Esempio:
example-pipeline-scheduler.
- ID progetto: nome del progetto in cui si trova l'ambiente.
Esempio:
- Fai clic su Salva.
Crea un bucket per gli artefatti della pipeline
Crea un bucket Cloud Storage nello stesso progetto dell'
ambiente Managed Airflow e assegnagli un nome simile a
example-pipelines-bucket. Questo bucket è necessario per archiviare il job Managed Service for Apache Spark.
Alcune azioni della pipeline, ad esempio l'output dei risultati in un bucket Cloud Storage.
Crea un nuovo set di dati e una tabella in BigQuery
Questa guida illustra una pipeline che scrive i dati in una tabella BigQuery. Crea le seguenti risorse BigQuery nel tuo progetto:
- Crea un nuovo set di dati denominato
wordcount_dataset. - Crea una nuova tabella BigQuery denominata
wordcount_output.
Aggiungi asset della pipeline
Questa guida illustra un'attività di data engineering comune (ETL: estrazione, trasformazione, caricamento) utilizzando PySpark, leggendo da BigQuery, trasformando i dati (conteggio delle parole) e caricandoli di nuovo in BigQuery.
Non agentico
Aggiungi il seguente file alla cartella /scripts del repository. In un secondo momento aggiungerai un'azione della pipeline che esegue questo script in Managed Service for Apache Spark.
Esempio di file wordcount.py:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
Sostituisci quanto segue:
- ARTIFACTS_BUCKET_NAME: il nome del bucket Cloud Storage
che hai creato in precedenza. Esempio:
example-pipelines-bucket. - PROJECT_ID: il nome del progetto in cui si trova l'ambiente
risiede. Esempio:
example-project.
Agentico
Chiedi all'agente di generare uno script PySpark di esempio nella cartella /scripts del repository. In un secondo momento aggiungerai un'azione della pipeline che esegue questo script in Managed Service for Apache Spark.
Inserisci un prompt simile al seguente:
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
Inizializza le pipeline di orchestrazione nel repository
Quando inizializzi le pipeline di orchestrazione, l'estensione Data Agent Kit per Antigravity crea uno scaffolding che include quanto segue:
- Un file YAML della pipeline di orchestrazione: una definizione di pipeline di esempio che contiene una pianificazione, ma nessuna azione definita.
deployment.yaml: un esempio di configurazione del deployment della pipeline che definisce come deve essere eseguito il deployment della pipeline. Questo file mostra la configurazione richiesta per l'ambiente Managed Airflow, il bucket degli artefatti e qualsiasi altra risorsa utilizzata dalle azioni della pipeline..github/workflows/deploy.yaml: configura un'azione GitHub che esegue il deployment della pipeline quando unisci le modifiche al ramomaindel repository GitHub..github/workflows/validate.yaml: configura un'azione GitHub che convalida la pipeline dopo il deployment.
Nei passaggi successivi di questo documento, espanderai queste definizioni utilizzando l'estensione Data Agent Kit per Antigravity per creare ed eseguire il deployment di una pipeline di orchestrazione in locale.
Non agentico
Per inizializzare le pipeline di orchestrazione:
- Fai clic sull'icona Google Cloud Data Agent Kit nella barra delle attività.
- Espandi Data Engineering e poi fai clic su Initialize orchestration pipeline.
- Inserisci i parametri per la nuova pipeline di orchestrazione:
- ID pipeline: inserisci l'ID della pipeline. Esempio:
example-pipeline. - ID progetto Google Cloud: il nome del progetto in cui risiede l'ambiente. Esempio:
example-project. - Regione: la regione in cui si trova l'ambiente. Esempio:
us-central1. - ID ambiente: il nome dell'ambiente con cui vuoi sviluppare.
Esempio:
dev/staging. Ambiente Managed Service for Apache Airflow dello scheduler: il nome dell' ambiente in cui vuoi orchestrare le pipeline. Per questo documento, specifica lo stesso ambiente in questo parametro.
Bucket degli artefatti: il nome del bucket utilizzato per gli artefatti della pipeline, senza il prefisso
gs://. Esempio:example-pipelines-bucket.Fai clic su Avanti.
Fai clic su Inizializza.
Specifica un'area di lavoro in cui vuoi inizializzare la pipeline.
Agentico
Chiedi all'agente di creare uno scaffolding per le pipeline di orchestrazione del repository.
Inserisci un prompt simile al seguente:
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
Dopo aver inizializzato le pipeline nel repository, non puoi farlo di nuovo perché il nuovo scaffolding sovrascriverebbe le modifiche di configurazione apportate. Puoi aggiungere nuove pipeline creando nuovi file di definizione della pipeline nel progetto e aggiungendoli alla configurazione del deployment.
Aggiungi una nuova attività alla pipeline
Poiché la configurazione iniziale della pipeline non ha azioni, aggiungi un'azione che esegue lo script PySpark.
Non agentico
Per modificare una pipeline:
- Fai clic sull'icona Google Cloud Data Agent Kit nella barra delle attività.
- Espandi Data Engineering, quindi Orchestration Pipelines.
- Seleziona
example-pipeline.yaml. Si apre un editor di pipeline per la pipeline selezionata. - (Facoltativo) Seleziona il nodo Trigger di pianificazione. Puoi modificare la pianificazione della pipeline specificando un'espressione simile a cron e le ore di inizio e fine della pianificazione. La pianificazione predefinita per la pipeline appena inizializzata è
0 2 * * *, che viene eseguita alle 2:00 ogni giorno.
Aggiungi una nuova attività. In questa guida, aggiungi un'attività PySpark che esegue uno script PySpark che hai aggiunto in precedenza:
- Fai clic su Aggiungi prima attività per aggiungere un nuovo nodo attività.
- Seleziona Esegui script PySpark e il file
script/wordcount.py.
Si apre il riquadro Esegui script PySpark.
- In Modalità cluster Spark, seleziona Spark serverless.
- In Località, specifica la località in cui si trova l'ambiente.
Esempio:
us-central1. - Fai clic su Salva.
Agentico
Esegui il seguente prompt:
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
Esegui il deployment della versione locale della pipeline
Esegui il deployment della versione locale della pipeline per verificare che sia configurata correttamente.
Quando esegui il deployment di una versione locale della pipeline di orchestrazione, l'estensione Data Agent Kit per Antigravity carica una versione locale del pacchetto della pipeline nell'ambiente Managed Airflow e la esegue. Il deployment locale è destinato all'utilizzo in un ambiente di sviluppo.
Il comando di deployment esegue il deployment di una pianificazione non in pausa. Per evitare questo problema, puoi mettere in pausa la pianificazione manualmente nel riquadro Gestione pipeline. Puoi anche modificare il file YAML della pipeline per aggiungere un commento o rimuovere il blocco triggers: - schedule.
Non agentico
Per eseguire il deployment di una versione locale della pipeline di orchestrazione di esempio:
- Fai clic sull'icona Google Cloud Data Agent Kit nella barra delle attività.
- Espandi Data Engineering e poi Pipeline di orchestrazione.
- Seleziona
example-pipeline.yaml. Si apre un editor di pipeline per la pipeline selezionata. - Seleziona Esegui pipeline e poi seleziona l'ambiente di sviluppo o di gestione temporanea che hai creato in precedenza.
Agentico
Esegui il seguente prompt:
Deploy my pipeline
Monitora l'esecuzione della pipeline e controlla i log di esecuzione
Dopo aver eseguito il deployment della pipeline, puoi visualizzare le informazioni dettagliate, la cronologia delle esecuzioni della pipeline e i log di esecuzione della pipeline:
- Fai clic sull'icona Google Cloud Data Agent Kit nella barra delle attività.
- Espandi Data Engineering e poi seleziona Gestione pipeline.
- Fai clic sul nome della pipeline (
example-pipeline) per visualizzarne la cronologia di esecuzione. Nell'elenco delle esecuzioni per una data specifica, puoi visualizzare le singole esecuzioni della pipeline e la suddivisione delle singole azioni all'interno di ogni esecuzione della pipeline. - Fai clic su un ID attività per visualizzare i log di esecuzione dell'attività. Poiché lo script PySpark di esempio è stato eseguito in Managed Service for Apache Spark, i log delle attività avranno un link ai log batch.
Risolvi i problemi e correggi gli errori della pipeline
Quando la pipeline non riesce, viene visualizzato un pulsante Diagnostica nel riquadro Gestione pipeline.
Agentico
Quando fai clic sul pulsante Diagnostica, l'agente genera un prompt per risolvere il problema della pipeline. Il prompt viene copiato negli appunti o aperto in una nuova sessione di chat.
L'agente utilizza competenze specializzate per risolvere i problemi delle pipeline, concentrandosi sulla raccolta dei log, sul controllo incrociato del codice di cui è stato eseguito il deployment e dell'area di lavoro e sulla generazione di un'analisi delle cause principali (RCA).
I possibili passaggi successivi dopo aver ricevuto l'analisi delle cause principali sono i seguenti:
- Applica l'analisi delle cause principali nell'area di lavoro corrente.
- Chiedi all'agente di creare un nuovo ramo e di applicare le modifiche.
- Apri un ticket di assistenza clienti Google Cloud con i dettagli dell'analisi delle cause principali.
Per assistenza nella risoluzione dei problemi relativi all'estensione, vedi Risolvere i problemi dell'estensione Data Agent Kit per Antigravity.