本页包含 Orchestration Pipelines DSL 参考文档。
预览版中的限制
在预览版期间,Orchestration Pipelines 具有以下限制:
对于
pyspark和notebook操作:- 仅支持一个
requirements.txt文件,用于所有pyspark和notebook操作。 - Windows 平台不支持通过
uv工具构建软件包。 - 仅支持具有预构建二进制文件的 Python 软件包。
- 仅支持一个
对于
sql操作:- 不支持
query键中的inline定义。
- 不支持
格式和值简介
流水线以 YAML 格式定义,并且必须存储在代码库中单独的文件中(每个流水线对应一个文件)。
Orchestration Pipelines 提供了多种在流水线定义和部署配置中使用变量的方式。例如,您可以定义自定义变量、使用 GitHub Secret,以及在命令行中替换变量值。如需了解详情,请参阅变量、Secret 和替换。
如需详细了解如何向流水线软件包添加其他流水线,请参阅添加其他流水线。
代码示例
GitHub 上的 orchestration-pipelines 代码库包含许多流水线操作和引擎组合的最新代码示例。我们建议您先从这些示例入手,了解 Orchestration Pipelines 的功能。
流水线定义
流水线定义具有以下顶级键:
modelVersion:流水线定义模型的版本。最新模型版本为1.0。pipelineId:流水线的唯一标识符。此 ID 在多个部署和版本中保持一致,从而可以跟踪和管理逻辑流水线实体。description:流水线说明,映射到运行程序环境中的 Airflow DAG 说明。owner:流水线的所有者。tags:应用于流水线的字符串标识符,用于过滤流水线。notifications:流水线事件通知。支持的通知类型:onPipelineFailure:流水线失败时发送电子邮件。
通知功能需要在 runner 环境中配置 SendGrid 电子邮件服务。如需查看相关说明,请参阅配置电子邮件通知。
示例:
notifications: onPipelineFailure: email: ["user1@example.com", "user2@example.com"]runner:指定目标编排引擎。预留以供日后使用。 并将此值设为airflow。defaults:为project_id、location和executionConfig等属性设置默认值,这些属性适用于所有操作,除非在特定操作中被替换。project_id和location属性可被各个操作属性替换。executionConfig属性无法在各个操作中被替换,并且会在retries字段中指定流水线中所有操作的重试次数。triggers:定义流水线的启动方式:没有值。流水线仍可手动触发。
schedule. 使用 cron 表达式按时间表触发流水线。示例时间表:
triggers: - schedule: interval: "0 5 * * *" startTime: "2025-10-01T00:00:00" endTime: "2026-10-01T00:00:00" catchup: false timezone: "UTC"
actions要执行的任务的映射。每个映射条目都对应一个操作。请参阅操作。
操作
流水线操作定义了流水线执行中的各个步骤。每个操作都必须指定引擎或框架。引擎或框架决定使用哪些资源来执行操作。
Orchestration Pipelines 支持以下操作:
- Pyspark (
pyspark):运行 PySpark 脚本。 - 笔记本 (
notebook):运行笔记本文件。 - SQL 查询 (
notebook):运行 SQL 查询。 - Python (
python):运行 Python 脚本。 - 流水线 (
pipeline):执行数据处理流水线。
Orchestration Pipelines 支持以下引擎和框架:
dataprocOnGce>existingCluster:由 clusterName、项目和位置标识的 Managed Service for Apache Spark 集群。dataprocOnGce>ephemeral:执行作业后创建并删除的 Managed Service for Apache Spark 集群。dataprocServerless:Managed Service for Apache Spark 批量提交。bigquery:BigQuery 作业。python>local:在 runner 环境中的 Airflow Worker 上执行的 Python 脚本。dbt>airflowWorker:在运行器环境中使用dbt-core在 Airflow worker 上执行的 dbt 模型。dataform>airflowWorker:在 runner 环境中使用 Dataform Core CLI 在 Airflow worker 上执行的 Dataform 工作流。dataform>dataformService:在 Dataform 服务上执行的 Dataform 工作流。
下表列出了可能的操作类型、引擎和框架组合。如需查看操作代码示例,请参阅引擎和框架说明。
| 操作 | 引擎或框架 | 输出到 |
|---|---|---|
pyspark |
dataprocOnGce > existingCluster |
Managed Service for Apache Spark 作业日志 |
pyspark |
dataprocOnGce > ephemeralCluster |
Managed Service for Apache Spark 作业日志 |
pyspark |
dataprocServerless |
Managed Service for Apache Spark 批处理日志 |
notebook |
dataprocOnGce > existingCluster |
运行程序存储桶,位于 composer_declarative_dags_resources 目录下 |
notebook |
dataprocOnGce > ephemeralCluster |
Managed Service for Apache Spark 作业日志 |
notebook |
dataprocServerless |
运行程序存储桶,位于 composer_declarative_dags_resources 目录下 |
sql |
bigquery |
destinationTable 参数中指定的表 |
sql |
dataprocServerless |
Managed Service for Apache Spark 批处理日志。 |
python |
local(本地执行) |
日志 |
pipeline |
dbt > airflowWorker |
日志和 BigQuery |
pipeline |
dataform > airflowWorker |
BigQuery 中指定的表 |
pipeline |
dataform > dataformService |
在 Dataform 中 |
所有操作都具有以下通用键。其他键取决于操作类型。
name:操作名称。此名称会映射到运行器环境中的 Airflow 任务名称。如果某项操作需要多个 Airflow 任务,则此名称会映射到任务组。dependsOn:此操作所依赖的上游操作名称列表,用于定义执行顺序。如果任何上游操作失败,则不会执行依赖于这些操作的下游操作。executionTimeout:执行操作的超时时间。示例:1h、30m、40s。
Python
python 类型的操作。执行 Python 脚本。
特定于操作类型的键:
mainFilePath:Python 脚本文件的相对路径。pythonCallable:要在 Python 脚本中执行的 Python 可调用对象的名称。opKwargs:运算符的关键字实参的映射。(可选)
environment:在动态创建的 Python 虚拟环境中执行脚本。requirements:虚拟环境的要求。需求在运行时得到解决。systemSitePackages:如果为true,则虚拟环境会从 Airflow 工作程序的 site-packages 目录继承软件包。您可以在 runner 环境中安装自定义 PyPI 软件包。
engine:local:在 Runner 环境中本地执行
示例:
本地
modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 1
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- python:
name: "first_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
systemSitePackages: true
requirements:
path: "scripts/requirements.txt"
- python:
name: "second_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
requirements:
inline:
list: ["pandas>=2.0.0"]
systemSitePackages: true
dependsOn: ["first_script_run"]
- python:
name: "third_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_2.py"
pythonCallable: "main"
engine:
local: {}
opKwargs:
api_endpoint: "https://api.my-vendor.example.com/v1/status"
api_key_secret_name: "my-vendor-api-key"
dependsOn: ["first_script_run"]
pyspark
pyspark 类型的操作。执行 PySpark 脚本。
特定于操作类型的键:
mainFilePath:PySpark 脚本的相对路径。archiveUris:要在此操作中使用的归档 URI 的列表。stagingBucket:要在此操作中使用的 Cloud Storage 存储桶。pyFiles:要在此 Spark 作业中使用的 Python 文件列表。environment:Python 环境配置。requirements:要使用的 Python 要求文件。path:包含要求的文件的路径。此文件中的要求必须按照 PEP-508 列出。
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
示例:
existingCluster
modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 4 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run-pyspark-with-pyfiles-on-existing-cluster"
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
ephemeralCluster
pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run_dataproc_ephemeral"
executionTimeout: "1h"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "ephemeral-cluster-inline"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pyspark:
name: "run-pyspark-on-dataproc-serverless"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
笔记本
notebook 类型的操作。通过 Papermill 执行 .ipynb 笔记本。
特定于操作类型的键:
mainFilePath:笔记本文件的相对路径。archiveUris:要在此操作中使用的归档 URI 的列表。stagingBucket:要在此操作中使用的 Cloud Storage 存储桶。environment:Python 环境配置。requirements:要使用的 Python 要求文件。path:包含要求的文件的路径。此文件中的要求必须按照 PEP-508 列出。
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
示例:
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- notebook:
name: "run-notebook-on-dataproc-serverless"
mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
archiveUris:
- "gs://example-bucket-additional-data/custom_venv.tar.gz"
staging_bucket: "example-bucket-additional-data-additional-data"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
sql
sql 类型的操作。执行 SQL 查询。
特定于操作类型的键:
query:定义查询。path:查询是在位于部署配置文件相对路径的文件中定义的。inline:查询是内嵌定义的。
engine:bigQuerydataprocServerlessdataprocOnGce>existingClusterdataprocOnGce>ephemeralCluster
bigQuery
modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run_bigquery_insert_job_create"
query:
inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
engine:
bigquery:
location: "US"
- sql:
name: "run_bigquery_insert_job_select"
query:
path: "sql-scripts/count_rows.sql"
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
dependsOn:
- "run_bigquery_insert_job_create"
dataprocServerless
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-dataproc"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocServerless:
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
resourceProfile:
inline:
runtimeConfig:
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
dataproc.sparkBqConnector.version: "0.42.3"
existingCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-existing-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
existingCluster:
clusterName: "cluster-sql"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
ephemeralCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-ephemeral-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
ephemeralCluster:
clusterName: "example-ephemeral-cluster"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
resourceProfile:
inline:
clusterConfig:
gceClusterConfig:
zoneUri: "us-central1-a"
metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
流水线
pipeline 类型的操作。执行数据处理流水线。
特定于操作类型的键:
framework:dbtdataform>airflowWorkerdataform>dataformService
示例:
dbt
modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "dbt-action"
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform>airflowWorker
modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform>dataformService
modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"
- pipeline:
name: "run_dataform_compilation"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
引擎
操作中使用的引擎。
dataprocOnGce > existingCluster
在由 clusterName、项目和位置标识的现有 Managed Service for Apache Spark 集群中执行。
您可以在部署配置中管理指定集群,也可以在 Managed Service for Apache Spark 中手动管理。 建议定期升级集群。
键:
clusterName:集群的名称location:集群所在的区域projectId:集群所在项目的 IDproperties:Spark 作业属性的映射。
示例:
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
dataprocOnGce > ephemeralCluster
在临时 Managed Service for Apache Spark 集群中执行,该集群在执行作业后创建并删除。
键:
clusterName:集群的名称location:集群所在的区域projectId:集群所在项目的 IDimpersonationChain:用于运行操作的服务账号模拟链。resourceProfile:Managed Service for Apache Spark 集群资源配置。如需了解可用字段的说明,请参阅 Managed Service for Apache Spark 文档中的 ClusterConfig。
您可以通过以下方式指定资源配置文件:
inline:定义为流水线配置的一部分。path:在位于相对路径的文件中定义。external_config_path:在 Cloud Storage 存储桶中的文件中定义。与需要提交和部署才能更新资源配置文件值的inline和path选项不同,外部资源配置文件会在每次流水线运行时解析,您可以在不重新部署流水线的情况下更新它。
可以使用
override键将替换项应用于指定的资源配置文件。通过深度合并将替换项应用于提供的预来源配置文件。properties:Spark 作业属性的映射。
示例:
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "example-ephemeral-cluster"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
在 Managed Service for Apache Spark 批量提交中执行。
键:
location:必须执行 Spark 作业的区域。impersonationChain:用于运行操作的服务账号模拟链。resourceProfile:Managed Service for Apache Spark 资源配置文件。您可以通过以下方式指定资源配置文件:
inline:定义为流水线配置的一部分。path:在位于相对路径的文件中定义。external_config_path:在 Cloud Storage 存储桶中的文件中定义。与需要提交和部署才能更新资源配置文件值的inline和path选项不同,外部资源配置文件会在每次流水线运行时解析,您可以在不重新部署流水线的情况下更新它。
以下键用于指定资源配置文件配置:
environmentConfig:环境配置runtimeConfig:运行时配置
如需了解可用字段的说明,请参阅 Managed Service for Apache Spark 文档中的 RuntimeConfig 和 EnvironmentConfig。
可以使用
override键将替换项应用于指定的资源配置文件。通过深度合并将替换项应用于所提供的资源配置文件。
示例(内嵌):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
示例(外部路径和替换项):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
bigQuery
以 BigQuery 作业的形式执行。
键:
location:目标表所在的区域。destinationTable:用于输出数据的 BigQuery 表impersonationChain:用于运行操作的服务账号模拟链。
示例:
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
局部
在 runner 环境中本地执行。
如需了解配置虚拟环境的方法,请参阅 python 操作。
示例:
engine:
local: {}
框架
操作中使用的框架。
dbt > airflowWorker
使用 dbt-core 在 runner 环境中执行在 Airflow 工作器上执行的 dbt 模型。
键:
projectDirectoryPath:包含 DBT 项目的文件夹的相对路径。selectModels:要按名称纳入运行中的模型列表(相当于dbt --select)。tags:要按标记纳入运行的模型列表(相当于dbt --select)。
示例:
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform > airflowWorker
在运行程序环境中,使用 Dataform Core CLI 在 Airflow 工作人员上执行的 Dataform 工作流。
键:
projectDirectoryPath:包含 Dataform 工作流定义的文件夹的相对路径。
示例:
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform > dataformService
运行在 Dataform 服务上执行的 Dataform 工作流。
键:
location:Dataform 代码库所在的位置。projectId:Dataform 代码库所在的项目。repositoryId:Dataform 代码库 IDworkflowInvocation:工作流调用的配置,用于指定要运行的操作。请参阅 WorkflowInvocation。
示例:
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"