本頁包含 Orchestration Pipelines DSL 參考資料。
預先發布版限制
在預覽期間,Orchestration Pipelines 有下列限制:
pyspark和notebook動作:- 系統僅支援所有
pyspark和notebook動作的單一requirements.txt檔案。 - 透過
uv工具建構套件時,不支援 Windows 平台。 - 系統僅支援含有預先建構二進位檔的 Python 套件。
- 系統僅支援所有
如要執行
sql動作:- 不支援
query鍵中的inline定義。
- 不支援
關於格式和值
管道是以 YAML 格式定義,且必須儲存在存放區中,每個管道各別儲存在一個檔案。
Orchestration Pipelines 提供多種方式,可在管道定義和部署設定中使用變數。舉例來說,您可以定義自訂變數、使用 GitHub 祕密,以及在指令列中替換變數值。詳情請參閱「變數、密鑰和替代」。
如要進一步瞭解如何將其他管道新增至管道套件,請參閱「新增其他管道」。
程式碼範例
GitHub 上的 orchestration-pipelines 存放區提供許多管道動作和引擎組合的最新程式碼範例。建議您先從這些範例著手,瞭解 Orchestration Pipelines 的功能。
pipeline 定義
管道定義包含下列頂層鍵:
modelVersion:管道定義模型的版本。最新模型版本為1.0。pipelineId:管道的專屬 ID。這個 ID 在多個部署作業和版本中保持一致,方便追蹤及管理邏輯管道實體。description:管道說明,會對應至執行器環境中 Airflow DAG 的說明。owner:管道擁有者。tags:套用至管道的字串 ID,用於篩選管道。notifications:管道事件的通知。支援的通知類型:onPipelineFailure:管道失敗時傳送電子郵件。
如要接收通知,您必須在執行程式環境中設定 SendGrid 電子郵件服務。如需操作說明,請參閱「設定電子郵件通知」。
範例:
notifications: onPipelineFailure: email: ["user1@example.com", "user2@example.com"]runner:指定目標協調引擎。保留供日後使用。 請將這個值設為airflow。defaults:為project_id、location和executionConfig等屬性設定預設值,除非在特定動作中覆寫,否則這些值會套用至所有動作。個別動作屬性可以覆寫project_id和location屬性。executionConfig屬性無法在個別動作中覆寫,且會在retries欄位中指定管道中所有動作的重試次數。triggers:定義管道的啟動方式:不具現金價值。管道仍可手動觸發。
schedule使用 Cron 運算式,依排程觸發管道。範例時間表:
triggers: - schedule: interval: "0 5 * * *" startTime: "2025-10-01T00:00:00" endTime: "2026-10-01T00:00:00" catchup: false timezone: "UTC"
actions要執行的工作對應。每個對應項目都對應一個動作。請參閱「動作」。
動作
管道動作會定義管道執行中的個別步驟。每個動作都必須指定引擎或架構。引擎或框架會決定要使用哪些資源來執行動作。
Orchestration Pipelines 支援下列動作:
- Pyspark (
pyspark):執行 PySpark 指令碼。 - 筆記本 (
notebook):執行筆記本檔案。 - SQL 查詢 (
notebook):執行 SQL 查詢。 - Python (
python):執行 Python 指令碼。 - 管道 (
pipeline):執行資料處理管道。
Orchestration Pipelines 支援下列引擎和架構:
dataprocOnGce>existingCluster: 由 clusterName、專案和位置識別的 Managed Service for Apache Spark 叢集。dataprocOnGce>ephemeral: 執行工作後,系統會建立並刪除 Managed Service for Apache Spark 叢集。dataprocServerless:Managed Service for Apache Spark 批次提交。bigquery:BigQuery 工作。python>local:在執行器環境中,於 Airflow Worker 上執行的 Python 指令碼。dbt>airflowWorker:在執行器環境中,使用dbt-core在 Airflow 工作站執行的 dbt 模型。dataform>airflowWorker:在執行器環境中,使用 dataform core CLI,在 Airflow 工作站上執行的 Dataform 工作流程。dataform>dataformService:在 Dataform 服務上執行的 Dataform 工作流程。
下表列出可能採用的動作類型、引擎和架構組合。如需動作程式碼範例,請參閱引擎和架構說明。
| 動作 | 引擎或架構 | 輸出至 |
|---|---|---|
pyspark |
dataprocOnGce > existingCluster |
Managed Service for Apache Spark 工作記錄 |
pyspark |
dataprocOnGce > ephemeralCluster |
Managed Service for Apache Spark 工作記錄 |
pyspark |
dataprocServerless |
Managed Service for Apache Spark 批次記錄 |
notebook |
dataprocOnGce > existingCluster |
執行器 bucket,位於 composer_declarative_dags_resources 目錄下 |
notebook |
dataprocOnGce > ephemeralCluster |
Managed Service for Apache Spark 工作記錄 |
notebook |
dataprocServerless |
執行器 bucket,位於 composer_declarative_dags_resources 目錄下 |
sql |
bigquery |
destinationTable 參數中指定的資料表 |
sql |
dataprocServerless |
Managed Service for Apache Spark 批次記錄。 |
python |
local (本機執行) |
記錄 |
pipeline |
dbt > airflowWorker |
記錄和 BigQuery |
pipeline |
dataform > airflowWorker |
BigQuery 中指定的資料表 |
pipeline |
dataform > dataformService |
在 Dataform 中 |
所有動作都有下列通用鍵。其他鍵則取決於動作類型。
name:動作名稱。這個名稱會對應至執行器環境中的 Airflow 工作名稱。如果動作需要多個 Airflow 工作,這個名稱會對應至工作群組。dependsOn:這個動作所依附的上游動作名稱清單,用於定義執行順序。如果任何上游動作失敗,系統就不會執行依附於這些動作的下游動作。executionTimeout:執行動作的逾時時間。例如:1h、30m、40s。
python
python 類型的動作。執行 Python 指令碼。
動作類型專屬鍵:
mainFilePath:Python 指令碼檔案的相對路徑。pythonCallable:要在 Python 指令碼中執行的 Python 可呼叫函式名稱。opKwargs:運算子的關鍵字引數對應。(選用)
environment:在動態建立的 Python 虛擬環境中執行指令碼。requirements:虛擬環境的規定。系統會在執行階段解決這些需求。systemSitePackages:如果設為true,虛擬環境會從 Airflow 工作人員的 site-packages 目錄繼承套件。您可以在執行器環境中安裝自訂 PyPI 套件。
engine:local:在執行器環境中本機執行
範例:
當地
modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 1
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- python:
name: "first_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
systemSitePackages: true
requirements:
path: "scripts/requirements.txt"
- python:
name: "second_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
requirements:
inline:
list: ["pandas>=2.0.0"]
systemSitePackages: true
dependsOn: ["first_script_run"]
- python:
name: "third_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_2.py"
pythonCallable: "main"
engine:
local: {}
opKwargs:
api_endpoint: "https://api.my-vendor.example.com/v1/status"
api_key_secret_name: "my-vendor-api-key"
dependsOn: ["first_script_run"]
pyspark
pyspark 類型的動作。執行 PySpark 指令碼。
動作類型專屬鍵:
mainFilePath:PySpark 指令碼的相對路徑。archiveUris:要用於這項動作的封存 URI 清單。stagingBucket:要搭配這項動作使用的 Cloud Storage bucket。pyFiles:要用於這項 Spark 工作的 Python 檔案清單。environment:Python 環境設定。requirements:要使用的 Python 需求檔案。path:包含規定的檔案路徑。這個檔案中的需求必須按照 PEP-508 列出。
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
範例:
existingCluster
modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 4 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run-pyspark-with-pyfiles-on-existing-cluster"
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
ephemeralCluster
pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run_dataproc_ephemeral"
executionTimeout: "1h"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "ephemeral-cluster-inline"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pyspark:
name: "run-pyspark-on-dataproc-serverless"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
筆記本
notebook 類型的動作。透過 Papermill 執行 .ipynb 筆記本。
動作類型專屬鍵:
mainFilePath:筆記本檔案的相對路徑。archiveUris:要用於這項動作的封存 URI 清單。stagingBucket:要搭配這項動作使用的 Cloud Storage bucket。environment:Python 環境設定。requirements:要使用的 Python 需求檔案。path:包含規定的檔案路徑。這個檔案中的需求必須按照 PEP-508 列出。
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
範例:
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- notebook:
name: "run-notebook-on-dataproc-serverless"
mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
archiveUris:
- "gs://example-bucket-additional-data/custom_venv.tar.gz"
staging_bucket: "example-bucket-additional-data-additional-data"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
sql
sql 類型的動作。執行 SQL 查詢。
動作類型專屬鍵:
query:定義查詢。path:查詢是在部署設定檔的相對路徑中定義。inline:查詢是內嵌定義。
engine:bigQuerydataprocServerlessdataprocOnGce>existingClusterdataprocOnGce>ephemeralCluster
bigQuery
modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run_bigquery_insert_job_create"
query:
inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
engine:
bigquery:
location: "US"
- sql:
name: "run_bigquery_insert_job_select"
query:
path: "sql-scripts/count_rows.sql"
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
dependsOn:
- "run_bigquery_insert_job_create"
dataprocServerless
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-dataproc"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocServerless:
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
resourceProfile:
inline:
runtimeConfig:
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
dataproc.sparkBqConnector.version: "0.42.3"
existingCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-existing-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
existingCluster:
clusterName: "cluster-sql"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
ephemeralCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-ephemeral-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
ephemeralCluster:
clusterName: "example-ephemeral-cluster"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
resourceProfile:
inline:
clusterConfig:
gceClusterConfig:
zoneUri: "us-central1-a"
metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
pipeline
pipeline 類型的動作。執行資料處理管道。
動作類型專屬鍵:
framework:dbtdataform>airflowWorkerdataform>dataformService
範例:
dbt
modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "dbt-action"
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform>airflowWorker
modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform>dataformService
modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"
- pipeline:
name: "run_dataform_compilation"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
引擎
動作中使用的引擎。
dataprocOnGce > existingCluster
在現有的 Managed Service for Apache Spark 叢集中執行,該叢集由 clusterName、專案和位置識別。
您可以在部署設定中管理指定叢集,也可以在 Managed Service for Apache Spark 中手動管理。建議定期升級叢集。
金鑰:
clusterName:叢集名稱location:叢集所在的區域projectId:叢集所在專案的專案 IDproperties:Spark 工作屬性的地圖。
範例:
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
dataprocOnGce > ephemeralCluster
在暫時性 Managed Service for Apache Spark 叢集中執行,該叢集會在工作執行完畢後建立及刪除。
金鑰:
clusterName:叢集名稱location:叢集所在的區域projectId:叢集所在專案的專案 IDimpersonationChain:用於執行動作的服務帳戶模擬鏈。resourceProfile:Managed Service for Apache Spark 叢集資源設定檔。如要查看可用欄位的說明,請參閱 Managed Service for Apache Spark 說明文件中的 ClusterConfig。
您可以透過下列方式指定資源設定檔:
inline:定義為管道設定的一部分。path:定義於相對路徑的檔案中。external_config_path:定義於 Cloud Storage bucket 中的檔案。與inline和path選項不同,這些選項需要修訂並部署才能更新資源設定檔值,而外部資源設定檔會在每次管道執行時解析,因此您不必重新部署管道即可更新。
您可以使用
override鍵,對指定的資源設定檔套用覆寫。系統會將覆寫值深層合併至提供的預先來源設定檔。properties:Spark 工作屬性的地圖。
範例:
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "example-ephemeral-cluster"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
在 Managed Service for Apache Spark 中執行批次提交。
金鑰:
location:必須執行 Spark 工作的區域。impersonationChain:用於執行動作的服務帳戶模擬鏈。resourceProfile:Managed Service for Apache Spark 資源設定檔。您可以透過下列方式指定資源設定檔:
inline:定義為管道設定的一部分。path:定義於相對路徑的檔案中。external_config_path:定義於 Cloud Storage bucket 中的檔案。與inline和path選項不同,這些選項需要修訂並部署才能更新資源設定檔值,而外部資源設定檔會在每次管道執行時解析,因此您不必重新部署管道即可更新。
下列鍵值會指定資源設定檔設定:
environmentConfig:環境設定runtimeConfig:執行階段設定
如需可用欄位的說明,請參閱 Managed Service for Apache Spark 說明文件中的 RuntimeConfig 和 EnvironmentConfig。
您可以使用
override鍵,對指定的資源設定檔套用覆寫。系統會將覆寫值與提供的資源設定檔進行深層合併。
範例 (內嵌):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
範例 (外部路徑和覆寫):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
bigQuery
以 BigQuery 作業的形式執行。
金鑰:
location:目的地資料表所在的區域。destinationTable:用於輸出資料的 BigQuery 資料表impersonationChain:用於執行動作的服務帳戶模擬鏈。
範例:
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
局部
在執行器環境中在本機執行。
如要瞭解如何設定虛擬環境,請參閱 python 動作。
範例:
engine:
local: {}
架構
動作中使用的架構。
dbt > airflowWorker
在執行器環境中,使用 dbt-core 在 Airflow 工作站上執行 dbt 模型。
金鑰:
projectDirectoryPath:包含 DBT 專案的資料夾相對路徑。selectModels:要納入執行作業的模型名稱清單 (相當於dbt --select)。tags:要納入執行作業的模型清單 (相當於dbt --select)。
範例:
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform > airflowWorker
使用 dataform core cli,在執行器環境的 Airflow 工作站上執行的 Dataform 工作流程。
金鑰:
projectDirectoryPath:包含 Dataform 工作流程定義的資料夾相對路徑。
範例:
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform > dataformService
執行在 Dataform 服務上執行的 Dataform 工作流程。
金鑰:
location:Dataform 存放區所在位置。projectId:Dataform 存放區所在的專案。repositoryId:Dataform 存放區 IDworkflowInvocation:工作流程叫用設定,用於指定要執行的動作。請參閱 WorkflowInvocation。
範例:
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"