Panduan ini menjelaskan cara membuat dan men-deploy pipeline orkestrasi di ekstensi Google Cloud Data Agent Kit untuk Visual Studio Code.
Pipeline contoh menjalankan skrip PySpark di Managed Service untuk Apache Spark.
Anda dapat men-deploy pipeline orkestrasi dari VS Code sebagai versi lokal atau melalui tindakan GitHub seperti saat menggabungkan perubahan ke cabang main. Dokumen ini menunjukkan cara men-deploy versi lokal pipeline orkestrasi.
Sebelum memulai
Sebelum memulai, selesaikan hal-hal berikut:
- Instal ekstensi Data Agent Kit untuk VS Code.
- Konfigurasi setelan Anda.
- Tambahkan repositori GitHub ke ruang kerja VS Code Anda untuk menyimpan pipeline orkestrasi dan aset seperti skrip.
Meninjau peran IAM yang diperlukan
Untuk mendapatkan izin membuat resource di project Anda, men-deploy, dan menjalankan pipeline orkestrasi, minta administrator untuk memberi Anda peran yang diperlukan.
Untuk membuat dan mengelola lingkungan Managed Service untuk Apache Airflow dan mengelola objek di bucket terkait, Anda memerlukan peran berikut. Untuk mengetahui informasi selengkapnya tentang peran pengguna ini, lihat Memberikan peran kepada pengguna di dokumentasi Managed Service untuk Apache Airflow.
- Environment and Storage Object Administrator (composer.environmentAndStorageObjectAdmin)
- Service Account User
(
iam.serviceAccountUser)
Untuk menggunakan resource BigQuery dan Cloud Storage, Anda memerlukan peran berikut.
- BigQuery Data Editor (
roles/bigquery.dataEditor) - Storage Object Admin (
roles/storage.objectAdmin)
Bergantung pada resource yang ingin Anda akses, Anda mungkin memerlukan peran tambahan selain peran yang memungkinkan Anda menggunakan ekstensi dan menggunakan pipeline orkestrasi.
Membuat akun layanan dan memberikan peran IAM
Gunakan akun layanan unik untuk lingkungan Managed Airflow Gen 3. Akun layanan membuat lingkungan Managed Airflow Gen 3 dan menjalankan semua pipeline orkestrasi yang Anda deploy.
Minta administrator Anda untuk menyelesaikan langkah-langkah berikut:
- Buat akun layanan seperti yang dijelaskan dalam dokumentasi IAM.
- Berikan peran Composer Worker (
composer.worker) ke akun layanan. Peran ini memberikan izin yang diperlukan dalam sebagian besar kasus.
Sebagai praktik terbaik, jika Anda perlu mengakses resource lain di Google Cloud project Anda, berikan izin tambahan ke akun layanan ini hanya jika diperlukan untuk operasi pipeline orkestrasi.
Membuat Google Cloud resource untuk pipeline orkestrasi Anda
Pada langkah ini, buat Google Cloud resource untuk pipeline orkestrasi Anda.
Membuat lingkungan Managed Airflow Gen 3
Buat lingkungan Managed Airflow Gen 3 dengan konfigurasi berikut:
- Nama lingkungan: Masukkan nama yang akan Anda gunakan nanti untuk mengonfigurasi
pipeline orkestrasi. Misalnya,
example-pipeline-scheduler. - Lokasi: Pilih lokasi. Sebaiknya buat semua resource dalam panduan ini di lokasi yang sama. Misalnya,
us-central1. - Akun layanan: Pilih akun layanan yang Anda buat untuk lingkungan ini.
Contoh perintah Google Cloud CLI berikut menunjukkan sintaksisnya:
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
Menambahkan parameter lingkungan ke konfigurasi penjadwal
Berikan detail koneksi untuk lingkungan Managed Airflow yang akan menjalankan pipeline orkestrasi Anda.
Tambahkan parameter konfigurasi lingkungan yang telah Anda buat menggunakan editor Setelan Google Cloud Data Agent Kit:
- Klik ikon Google Cloud Data Agent Kit di panel aktivitas.
- Luaskan Settings, lalu klik Settings.
- Pilih Scheduler.
- Masukkan parameter untuk lingkungan Managed Airflow Gen 3 yang Anda buat sebelumnya:
- Project ID: nama project tempat lingkungan berada.
Contoh:
example-project. - Region: region tempat lingkungan berada. Contoh:
us-central1. - Environment: nama lingkungan. Contoh:
example-pipeline-scheduler.
- Project ID: nama project tempat lingkungan berada.
Contoh:
- Klik Save.
Membuat bucket untuk artefak pipeline
Buat bucket Cloud Storage di project yang sama dengan
lingkungan Managed Airflow dan beri nama yang mirip dengan
example-pipelines-bucket. Bucket ini diperlukan untuk menyimpan tugas Managed Service untuk Apache Spark Anda.
Beberapa tindakan pipeline, seperti output hasil ke bucket Cloud Storage.
Membuat set data dan tabel baru di BigQuery
Panduan ini menunjukkan pipeline yang menulis data ke tabel BigQuery. Buat resource BigQuery berikut di project Anda:
- Buat set data baru bernama
wordcount_dataset. - Buat tabel BigQuery baru bernama
wordcount_output.
Menambahkan aset pipeline
Panduan ini menunjukkan tugas rekayasa data umum (ETL: Extract, Transform, Load) menggunakan PySpark, membaca dari BigQuery, mengubah data (jumlah kata), dan memuatnya kembali ke BigQuery.
Non-agentic
Tambahkan file berikut ke folder /scripts repositori Anda. Anda akan menambahkan tindakan pipeline yang menjalankan skrip ini di Managed Service untuk Apache Spark.
Contoh file wordcount.py:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
Ganti kode berikut:
- ARTIFACTS_BUCKET_NAME: nama bucket Cloud Storage
yang Anda buat sebelumnya. Contoh:
example-pipelines-bucket. - PROJECT_ID: nama project tempat lingkungan
berada. Contoh:
example-project.
Agentic
Minta Agen untuk membuat contoh skrip PySpark di folder /scripts repositori Anda. Anda akan menambahkan tindakan pipeline yang menjalankan skrip ini di Managed Service untuk Apache Spark.
Masukkan perintah yang mirip dengan berikut:
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
Menginisialisasi pipeline orkestrasi di repositori Anda
Saat Anda menginisialisasi pipeline orkestrasi, ekstensi Data Agent Kit untuk VS Code akan membuat scaffolding yang mencakup hal berikut:
- File YAML pipeline orkestrasi: Contoh definisi pipeline yang berisi jadwal, tetapi tidak ada tindakan yang ditentukan.
deployment.yaml: Contoh konfigurasi deployment pipeline yang menentukan cara pipeline Anda harus di-deploy. File ini menunjukkan konfigurasi yang diperlukan untuk lingkungan Managed Airflow, bucket artefak, dan resource lain yang digunakan oleh tindakan pipeline Anda..github/workflows/deploy.yaml: Menyiapkan tindakan GitHub yang men-deploy pipeline Anda saat Anda menggabungkan perubahan ke cabangmainrepositori GitHub Anda..github/workflows/validate.yaml: Menyiapkan tindakan GitHub yang memvalidasi pipeline Anda setelah di-deploy.
Pada langkah-langkah selanjutnya dalam dokumen ini, Anda akan memperluas definisi ini menggunakan ekstensi Data Agent Kit untuk VS Code guna membuat dan men-deploy pipeline orkestrasi secara lokal.
Non-agentic
Untuk menginisialisasi pipeline orkestrasi, lakukan hal berikut:
- Klik ikon Google Cloud Data Agent Kit di panel aktivitas.
- Luaskan Data Engineering, lalu klik Initialize orchestration pipeline.
- Masukkan parameter untuk pipeline orkestrasi baru:
- ID Pipeline: Masukkan ID pipeline Anda. Contoh:
example-pipeline. - Google Cloud project ID: nama project tempat lingkungan
berada. Contoh:
example-project. - Wilayah: wilayah tempat lingkungan Anda berada. Contoh:
us-central1. - Environment ID: nama lingkungan yang ingin Anda gunakan untuk pengembangan.
Contoh:
dev/staging. Scheduler Managed Service for Apache Airflow Environment: nama lingkungan tempat Anda ingin mengatur pipeline. Untuk dokumen ini, tentukan lingkungan yang sama dalam parameter ini.
Artifacts Bucket: nama bucket yang digunakan untuk artefak pipeline, tanpa awalan
gs://. Contoh:example-pipelines-bucket.Klik Next.
Klik Initialize.
Tentukan ruang kerja tempat Anda ingin menginisialisasi pipeline.
Agentic
Minta Agen untuk membuat scaffolding untuk pipeline orkestrasi repositori Anda.
Masukkan perintah yang mirip dengan berikut:
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
Setelah Anda menginisialisasi pipeline di repositori, Anda tidak dapat melakukannya lagi karena scaffolding baru akan menimpa perubahan konfigurasi yang Anda buat. Anda dapat menambahkan pipeline baru dengan membuat file definisi pipeline baru di project Anda dan menambahkannya ke konfigurasi deployment.
Menambahkan tugas baru ke pipeline
Karena konfigurasi pipeline awal tidak memiliki tindakan apa pun, Anda menambahkan tindakan yang menjalankan skrip PySpark Anda ke konfigurasi tersebut.
Non-agentic
Untuk mengedit pipeline, lakukan hal berikut:
- Klik ikon Google Cloud Data Agent Kit di panel aktivitas.
- Luaskan Data Engineering, lalu Orchestration Pipelines.
- Pilih
example-pipeline.yaml. Editor pipeline akan terbuka untuk pipeline yang dipilih. - Opsional: Pilih node Schedule trigger. Anda dapat menyesuaikan jadwal untuk pipeline Anda dengan menentukan ekspresi seperti cron dan waktu mulai serta akhir jadwal. Jadwal default untuk pipeline yang baru diinisialisasi adalah
0 2 * * *, yang berjalan setiap hari pukul 02.00.
Tambahkan tugas baru. Dalam panduan ini, Anda menambahkan tugas PySpark yang menjalankan skrip PySpark yang Anda tambahkan sebelumnya:
- Klik Add first task untuk menambahkan node tugas baru.
- Pilih Execute PySpark script dan file
script/wordcount.py.
Panel Execute PySpark script akan terbuka.
- Di Spark Cluster Mode, pilih Serverless Spark.
- Di Location, tentukan lokasi tempat lingkungan Anda berada.
Contoh:
us-central1. - Klik Save.
Agentic
Jalankan perintah berikut:
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
Men-deploy versi lokal pipeline
Deploy versi lokal pipeline untuk mengonfirmasi bahwa pipeline tersebut dikonfigurasi dengan benar.
Saat Anda men-deploy versi lokal pipeline orkestrasi, ekstensi Data Agent Kit untuk VS Code akan mengupload versi lokal paket pipeline ke lingkungan Managed Airflow dan menjalankannya. Deployment lokal dimaksudkan untuk digunakan saat bekerja di lingkungan pengembangan.
Perintah deploy men-deploy jadwal yang tidak dijeda. Untuk mencegah hal ini, Anda dapat menjeda jadwal secara manual di panel Pengelolaan Pipeline. Anda juga dapat mengedit file YAML pipeline untuk mengomentari atau menghapus blok triggers: - schedule.
Non-agentic
Untuk men-deploy versi lokal pipeline orkestrasi contoh, lakukan hal berikut:
- Klik ikon Google Cloud Data Agent Kit di panel aktivitas.
- Luaskan Data Engineering , lalu Orchestration Pipelines.
- Pilih
example-pipeline.yaml. Editor pipeline akan terbuka untuk pipeline yang dipilih. - Pilih Run pipeline , lalu pilih lingkungan pengembangan atau staging yang Anda buat sebelumnya.
Agentic
Jalankan perintah berikut:
Deploy my pipeline
Memantau eksekusi pipeline dan memeriksa log eksekusi
Setelah pipeline di-deploy, Anda dapat melihat informasi mendetail, histori eksekusi pipeline, dan log eksekusi pipeline untuk pipeline tersebut:
- Klik ikon Google Cloud Data Agent Kit di panel aktivitas.
- Luaskan Data Engineering, lalu pilih Pipelines management.
- Klik nama pipeline Anda (
example-pipeline) untuk melihat histori eksekusinya. Dalam daftar eksekusi untuk tanggal tertentu, Anda dapat melihat eksekusi pipeline individual dan perincian tindakan individual dalam setiap eksekusi pipeline. - Klik ID tugas untuk melihat log eksekusi tugas. Karena contoh skrip PySpark dieksekusi di Managed Service untuk Apache Spark, log tugas akan memiliki link ke log Batch.
Memecahkan masalah dan memperbaiki kegagalan pipeline
Jika pipeline Anda gagal, Anda akan melihat tombol Diagnose di panel Pipelines management.
Agentic
Saat Anda mengklik tombol Diagnose, Agen akan membuat perintah untuk memecahkan masalah kegagalan pipeline. Perintah tersebut akan disalin ke papan klip Anda atau dibuka di sesi chat baru.
Agen menggunakan keterampilan khusus untuk memecahkan masalah pipeline, dengan berfokus pada pengumpulan log, pemeriksaan silang kode yang di-deploy dan ruang kerja, serta pembuatan analisis akar masalah (RCA).
Kemungkinan langkah berikutnya setelah menerima RCA adalah sebagai berikut:
- Terapkan analisis akar masalah di ruang kerja saat ini.
- Minta agen untuk membuat cabang baru dan menerapkan perubahan di sana.
- Buka tiket Cloud Customer Care dengan detail RCA.
Untuk mendapatkan bantuan dalam memecahkan masalah terkait ekstensi, lihat Memecahkan masalah ekstensi Data Agent Kit untuk VS Code.