Halaman ini berisi referensi DSL Orchestration Pipelines.
Batasan dalam Pratinjau
Saat dalam Pratinjau, Orchestration Pipelines memiliki batasan berikut:
Untuk tindakan
pysparkdannotebook:- Hanya satu file
requirements.txtuntuk semua tindakanpysparkdannotebookyang didukung. - Platform Windows tidak didukung untuk membuat paket melalui alat
uv. - Hanya paket Python dengan biner yang telah dibuat sebelumnya yang didukung.
- Hanya satu file
Untuk tindakan
sql:- Definisi
inlinedi kunciquerytidak didukung.
- Definisi
Tentang format dan nilai
Pipeline ditentukan dalam format YAML, dan harus disimpan dalam file terpisah, satu per pipeline, di repositori Anda.
Pipeline Orkestrasi menyediakan beberapa cara untuk menggunakan variabel dalam definisi pipeline dan konfigurasi deployment. Misalnya, Anda dapat menentukan variabel kustom, menggunakan secret GitHub, dan mengganti nilai variabel di command line. Untuk mengetahui informasi selengkapnya, lihat Variabel, secret, dan penggantian.
Untuk mengetahui informasi selengkapnya tentang cara menambahkan pipeline tambahan ke paket pipeline, lihat Menambahkan pipeline lain.
Contoh kode
Repositori orchestration-pipelines di GitHub memiliki contoh kode terbaru untuk berbagai kombinasi tindakan dan mesin pipeline. Sebaiknya gunakan contoh ini sebagai titik awal dalam mempelajari kemampuan Pipeline Orkestrasi.
Definisi pipeline
Definisi pipeline memiliki kunci tingkat teratas berikut:
modelVersion: Versi model definisi pipeline. Versi model terbaru adalah1.0.pipelineId: ID unik untuk pipeline. ID ini tetap konsisten di beberapa deployment dan versi, sehingga memungkinkan pelacakan dan pengelolaan entitas pipeline logis.description: Deskripsi pipeline, yang dipetakan ke deskripsi DAG Airflow di lingkungan runner.owner: Pemilik pipeline.tags: ID string yang diterapkan pada pipeline, digunakan untuk memfilter pipeline.notifications: notifikasi tentang peristiwa pipeline. Jenis notifikasi yang didukung:onPipelineFailure: email saat pipeline gagal.
Notifikasi memerlukan layanan email SendGrid yang dikonfigurasi di lingkungan runner Anda. Untuk mengetahui petunjuknya, lihat Mengonfigurasi notifikasi email.
Contoh:
notifications: onPipelineFailure: email: ["user1@example.com", "user2@example.com"]runner: Menentukan mesin orkestrasi target. Dipesan untuk penggunaan di masa mendatang. Tetapkan nilai ini keairflow.defaults: Menetapkan nilai default untuk properti sepertiproject_id,location, danexecutionConfigyang berlaku untuk semua tindakan kecuali jika diganti dalam tindakan tertentu. Propertiproject_iddanlocationdapat diganti oleh properti tindakan individual. PropertiexecutionConfigtidak dapat diganti dalam setiap tindakan dan menentukan jumlah percobaan ulang untuk semua tindakan dalam pipeline di kolomretries.triggers: Menentukan cara pipeline dimulai:Tidak ada nilai. Pipeline masih dapat dipicu secara manual.
schedule. Memicu pipeline sesuai jadwal, menggunakan ekspresi cron.Contoh jadwal:
triggers: - schedule: interval: "0 5 * * *" startTime: "2025-10-01T00:00:00" endTime: "2026-10-01T00:00:00" catchup: false timezone: "UTC"
actionsPemetaan tugas yang akan dieksekusi. Setiap entri pemetaan sesuai dengan satu tindakan. Lihat Tindakan.
Tindakan
Tindakan pipeline menentukan setiap langkah dalam eksekusi pipeline. Setiap tindakan harus memiliki mesin atau framework yang ditentukan untuknya. Mesin atau framework menentukan resource mana yang digunakan untuk menjalankan tindakan.
Pipeline Orkestrasi mendukung tindakan berikut:
- Pyspark (
pyspark): Menjalankan skrip PySpark. - Notebook (
notebook): Menjalankan file notebook. - Kueri SQL (
notebook): Jalankan kueri SQL. - Python (
python): Jalankan skrip Python. - Pipeline (
pipeline): Menjalankan pipeline pemrosesan data.
Pipeline Orkestrasi mendukung mesin dan framework berikut:
dataprocOnGce>existingCluster: Cluster Managed Service untuk Apache Spark yang diidentifikasi oleh clusterName, project, dan lokasi.dataprocOnGce>ephemeral: Cluster Managed Service untuk Apache Spark dibuat dan dihapus setelah menjalankan tugas.dataprocServerless: Managed Service untuk Apache Spark pengiriman batch.bigquery: Tugas BigQuery.python>local: Skrip Python yang dieksekusi di Airflow Worker dalam lingkungan pelaksana.dbt>airflowWorker: Model dbt yang dieksekusi di pekerja Airflow dalam lingkungan runner menggunakandbt-core.dataform>airflowWorker: Alur kerja Dataform yang dijalankan di worker Airflow dalam lingkungan runner menggunakan dataform core cli.dataform>dataformService: Alur kerja Dataform yang dijalankan di layanan Dataform.
Tabel berikut mencantumkan kemungkinan kombinasi jenis tindakan, mesin, dan framework. Lihat deskripsi mesin dan framework untuk contoh kode tindakan.
| Tindakan | Mesin atau Framework | Output ke |
|---|---|---|
pyspark |
dataprocOnGce > existingCluster |
Log tugas Managed Service untuk Apache Spark |
pyspark |
dataprocOnGce > ephemeralCluster |
Log tugas Managed Service untuk Apache Spark |
pyspark |
dataprocServerless |
Log Batch Managed Service untuk Apache Spark |
notebook |
dataprocOnGce > existingCluster |
Bucket runner, di direktori composer_declarative_dags_resources |
notebook |
dataprocOnGce > ephemeralCluster |
Log tugas Managed Service untuk Apache Spark |
notebook |
dataprocServerless |
Bucket runner, di direktori composer_declarative_dags_resources |
sql |
bigquery |
Tabel yang ditentukan dalam parameter destinationTable |
sql |
dataprocServerless |
Log Batch Managed Service untuk Apache Spark. |
python |
local (eksekusi lokal) |
Log |
pipeline |
dbt > airflowWorker |
Log dan BigQuery |
pipeline |
dataform > airflowWorker |
Tabel yang ditentukan di BigQuery |
pipeline |
dataform > dataformService |
Di Dataform |
Semua tindakan memiliki kunci umum berikut. Kunci lainnya bergantung pada jenis tindakan.
name: Nama tindakan. Nama ini dipetakan ke nama tugas Airflow di lingkungan pelaksana. Jika tindakan memerlukan lebih dari satu tugas Airflow, nama ini dipetakan ke grup tugas.dependsOn: Daftar nama tindakan upstream yang menjadi dependensi tindakan ini, yang menentukan urutan eksekusi. Jika salah satu tindakan upstream gagal, tindakan downstream yang bergantung padanya tidak akan dijalankan.executionTimeout: Waktu tunggu untuk menjalankan tindakan. Contoh:1h,30m,40s.
python
Tindakan jenis python. Jalankan skrip Python.
Tombol khusus jenis tindakan:
mainFilePath: jalur relatif ke file skrip Python.pythonCallable: nama yang dapat dipanggil Python untuk dieksekusi dalam skrip Python.opKwargs: pemetaan argumen kata kunci untuk operator.(Opsional)
environment: jalankan skrip dalam Lingkungan Virtual Python yang dibuat secara dinamis.requirements: persyaratan untuk Lingkungan Virtual. Persyaratan diselesaikan saat runtime.inline: persyaratan ditentukan secara inline.list: daftar persyaratan. Cantumkan setiap persyaratan sesuai dengan PEP-508.Contoh:
environment: requirements: inline: list: ["pandas>=2.0.0"]
(Alternatif)
path: Jalur ke file dengan persyaratan. Persyaratan dalam file ini harus dicantumkan sesuai dengan PEP-508.Contoh:
environment: requirements: path: "scripts/requirements.txt"
systemSitePackages: Jikatrue, Lingkungan Virtual mewarisi paket dari direktori site-packages pekerja Airflow. Anda dapat menginstal paket PyPI kustom di lingkungan runner.
engine:local: eksekusi lokal di lingkungan runner
Contoh:
lokal
modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 1
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- python:
name: "first_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
systemSitePackages: true
requirements:
path: "scripts/requirements.txt"
- python:
name: "second_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
requirements:
inline:
list: ["pandas>=2.0.0"]
systemSitePackages: true
dependsOn: ["first_script_run"]
- python:
name: "third_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_2.py"
pythonCallable: "main"
engine:
local: {}
opKwargs:
api_endpoint: "https://api.my-vendor.example.com/v1/status"
api_key_secret_name: "my-vendor-api-key"
dependsOn: ["first_script_run"]
pyspark
Tindakan jenis pyspark. Jalankan skrip PySpark.
Tombol khusus jenis tindakan:
mainFilePath: Jalur relatif ke skrip PySpark.archiveUris: Daftar URI arsip yang akan digunakan dengan tindakan ini.stagingBucket: Bucket Cloud Storage yang akan digunakan dengan tindakan ini.pyFiles: Daftar file Python yang akan digunakan dengan tugas Spark ini.environment: Konfigurasi lingkungan Python.requirements: File persyaratan Python yang akan digunakan.path: Jalur ke file dengan persyaratan. Persyaratan dalam file ini harus dicantumkan sesuai dengan PEP-508.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
Contoh:
existingCluster
modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 4 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run-pyspark-with-pyfiles-on-existing-cluster"
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
ephemeralCluster
pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run_dataproc_ephemeral"
executionTimeout: "1h"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "ephemeral-cluster-inline"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pyspark:
name: "run-pyspark-on-dataproc-serverless"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
notebook
Tindakan jenis notebook. Jalankan notebook .ipynb melalui Papermill.
Tombol khusus jenis tindakan:
mainFilePath: jalur relatif ke file notebook.archiveUris: Daftar URI arsip yang akan digunakan dengan tindakan ini.stagingBucket: Bucket Cloud Storage yang akan digunakan dengan tindakan ini.environment: Konfigurasi lingkungan Python.requirements: File persyaratan Python yang akan digunakan.path: Jalur ke file dengan persyaratan. Persyaratan dalam file ini harus dicantumkan sesuai dengan PEP-508.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
Contoh:
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- notebook:
name: "run-notebook-on-dataproc-serverless"
mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
archiveUris:
- "gs://example-bucket-additional-data/custom_venv.tar.gz"
staging_bucket: "example-bucket-additional-data-additional-data"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
sql
Tindakan jenis sql. Menjalankan kueri SQL.
Tombol khusus jenis tindakan:
query: menentukan kueri.path: kueri ditentukan dalam file yang berada di jalur relatif ke file konfigurasi deployment.inline: kueri ditentukan secara inline.
engine:bigQuerydataprocServerlessdataprocOnGce>existingClusterdataprocOnGce>ephemeralCluster
bigQuery
modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run_bigquery_insert_job_create"
query:
inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
engine:
bigquery:
location: "US"
- sql:
name: "run_bigquery_insert_job_select"
query:
path: "sql-scripts/count_rows.sql"
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
dependsOn:
- "run_bigquery_insert_job_create"
dataprocServerless
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-dataproc"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocServerless:
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
resourceProfile:
inline:
runtimeConfig:
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
dataproc.sparkBqConnector.version: "0.42.3"
existingCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-existing-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
existingCluster:
clusterName: "cluster-sql"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
ephemeralCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-ephemeral-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
ephemeralCluster:
clusterName: "example-ephemeral-cluster"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
resourceProfile:
inline:
clusterConfig:
gceClusterConfig:
zoneUri: "us-central1-a"
metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
pipeline
Tindakan jenis pipeline. Jalankan pipeline pemrosesan data.
Tombol khusus jenis tindakan:
framework:dbtdataform>airflowWorkerdataform>dataformService
Contoh:
dbt
modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "dbt-action"
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform>airflowWorker
modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform>dataformService
modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"
- pipeline:
name: "run_dataform_compilation"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
Mesin
Mesin yang digunakan dalam tindakan.
dataprocOnGce > existingCluster
Jalankan di cluster Managed Service untuk Apache Spark yang ada, yang diidentifikasi oleh clusterName, project, dan lokasi.
Anda dapat mengelola cluster yang ditentukan dalam konfigurasi deployment, atau secara manual di Managed Service untuk Apache Spark. Sebaiknya upgrade cluster secara rutin.
Kunci:
clusterName: Nama clusterlocation: Region tempat cluster beradaprojectId: Project ID project tempat cluster beradaproperties: Peta properti tugas Spark.
Contoh:
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
dataprocOnGce > ephemeralCluster
Dieksekusi di cluster Managed Service untuk Apache Spark sementara, yang dibuat dan dihapus setelah menjalankan tugas.
Kunci:
clusterName: Nama clusterlocation: Region tempat cluster beradaprojectId: Project ID project tempat cluster beradaimpersonationChain: Rantai peniruan akun layanan yang akan digunakan untuk menjalankan tindakan.resourceProfile: Profil resource cluster Managed Service for Apache Spark.Untuk mengetahui deskripsi kolom yang tersedia, lihat ClusterConfig dalam dokumentasi Managed Service untuk Apache Spark.
Profil resource dapat ditentukan dengan cara berikut:
inline: ditentukan sebagai bagian dari konfigurasi pipeline.path: ditentukan dalam file yang berada di jalur relatif.external_config_path: ditentukan dalam file yang berada di bucket Cloud Storage. Tidak seperti opsiinlinedanpath, yang mengharuskan penerapan dan men-deploy untuk memperbarui nilai profil resource, profil resource eksternal diselesaikan di setiap eksekusi pipeline dan Anda dapat memperbaruinya tanpa men-deploy ulang pipeline.
Penggantian dapat diterapkan ke profil resource yang ditentukan dengan kunci
override. Penggantian diterapkan dengan penggabungan mendalam ke profil presource yang disediakan.properties: Peta properti tugas Spark.
Contoh:
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "example-ephemeral-cluster"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
Eksekusi di Managed Service untuk Apache Spark pengiriman batch.
Kunci:
location: Region tempat tugas Spark harus dijalankan.impersonationChain: rantai peniruan akun layanan yang akan digunakan untuk menjalankan tindakan.resourceProfile: Profil resource Managed Service for Apache Spark.Profil resource dapat ditentukan dengan cara berikut:
inline: ditentukan sebagai bagian dari konfigurasi pipeline.path: ditentukan dalam file yang berada di jalur relatif.external_config_path: ditentukan dalam file yang berada di bucket Cloud Storage. Tidak seperti opsiinlinedanpath, yang mengharuskan penerapan dan men-deploy untuk memperbarui nilai profil resource, profil resource eksternal diselesaikan di setiap eksekusi pipeline dan Anda dapat memperbaruinya tanpa men-deploy ulang pipeline.
Kunci berikut menentukan konfigurasi profil resource:
environmentConfig: konfigurasi lingkunganruntimeConfig: konfigurasi runtime
Untuk deskripsi kolom yang tersedia, lihat RuntimeConfig dan EnvironmentConfig dalam dokumentasi Managed Service untuk Apache Spark.
Penggantian dapat diterapkan ke profil resource yang ditentukan dengan kunci
override. Penggantian diterapkan dengan penggabungan mendalam ke profil resource yang diberikan.
Contoh (inline):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
Contoh (jalur eksternal dan penggantian):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
bigQuery
Dieksekusi sebagai tugas BigQuery.
Kunci:
location: Region tempat tabel tujuan berada.destinationTable: Tabel BigQuery untuk menampilkan dataimpersonationChain: Rantai peniruan akun layanan yang akan digunakan untuk menjalankan tindakan.
Contoh:
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
lokal
Dieksekusi secara lokal di lingkungan runner.
Lihat tindakan python untuk mengetahui cara mengonfigurasi Lingkungan Virtual.
Contoh:
engine:
local: {}
Framework
Framework yang digunakan dalam tindakan.
dbt > airflowWorker
Jalankan model dbt yang dijalankan di pekerja Airflow di lingkungan runner
menggunakan dbt-core.
Kunci:
projectDirectoryPath: Jalur relatif ke folder yang berisi project DBT.selectModels: Daftar model yang akan disertakan dalam operasi menurut nama (setara dengandbt --select).tags: Daftar model yang akan disertakan dalam proses berdasarkan tag (setara dengandbt --select).
Contoh:
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform > airflowWorker
Alur kerja Dataform yang dijalankan di pekerja Airflow di lingkungan runner menggunakan dataform core cli.
Kunci:
projectDirectoryPath: Jalur relatif ke folder yang berisi definisi alur kerja Dataform.
Contoh:
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform > dataformService
Menjalankan alur kerja Dataform yang dieksekusi di layanan Dataform.
Kunci:
location: Lokasi tempat repositori Dataform berada.projectId: Project tempat repositori Dataform berada.repositoryId: ID repositori DataformworkflowInvocation: Konfigurasi untuk pemanggilan alur kerja, yang menentukan tindakan mana yang akan dijalankan. Lihat WorkflowInvocation.
Contoh:
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"