Orchestration Pipelines DSL 参考

本页包含 Orchestration Pipelines DSL 参考文档。

预览版中的限制

在预览版期间,Orchestration Pipelines 具有以下限制:

  • 对于 pysparknotebook 操作:

    • 仅支持一个 requirements.txt 文件,用于所有 pysparknotebook 操作。
    • Windows 平台不支持通过 uv 工具构建软件包。
    • 仅支持具有预构建二进制文件的 Python 软件包。
  • 对于 sql 操作:

    • 不支持 query 键中的 inline 定义。

格式和值简介

流水线以 YAML 格式定义,并且必须存储在代码库中单独的文件中(每个流水线对应一个文件)。

Orchestration Pipelines 提供了多种在流水线定义和部署配置中使用变量的方式。例如,您可以定义自定义变量、使用 GitHub Secret,以及在命令行中替换变量值。如需了解详情,请参阅变量、Secret 和替换

如需详细了解如何向流水线软件包添加其他流水线,请参阅添加其他流水线

代码示例

GitHub 上的 orchestration-pipelines 代码库包含许多流水线操作和引擎组合的最新代码示例。我们建议您先从这些示例入手,了解 Orchestration Pipelines 的功能。

流水线定义

流水线定义具有以下顶级键:

  • modelVersion:流水线定义模型的版本。最新模型版本为 1.0

  • pipelineId:流水线的唯一标识符。此 ID 在多个部署和版本中保持一致,从而可以跟踪和管理逻辑流水线实体。

  • description:流水线说明,映射到运行程序环境中的 Airflow DAG 说明。

  • owner:流水线的所有者。

  • tags:应用于流水线的字符串标识符,用于过滤流水线。

  • notifications:流水线事件通知。支持的通知类型:

    • onPipelineFailure:流水线失败时发送电子邮件。

    通知功能需要在 runner 环境中配置 SendGrid 电子邮件服务。如需查看相关说明,请参阅配置电子邮件通知

    示例:

    notifications:
      onPipelineFailure:
        email: ["user1@example.com", "user2@example.com"]
    
  • runner:指定目标编排引擎。预留以供日后使用。 并将此值设为 airflow

  • defaults:为 project_idlocationexecutionConfig 等属性设置默认值,这些属性适用于所有操作,除非在特定操作中被替换。project_idlocation 属性可被各个操作属性替换。executionConfig 属性无法在各个操作中被替换,并且会在 retries 字段中指定流水线中所有操作的重试次数。

  • triggers:定义流水线的启动方式:

    • 没有值。流水线仍可手动触发

    • schedule. 使用 cron 表达式按时间表触发流水线。

      示例时间表:

      triggers:
        - schedule:
            interval: "0 5 * * *"
            startTime: "2025-10-01T00:00:00"
            endTime: "2026-10-01T00:00:00"
            catchup: false
            timezone: "UTC"
      
  • actions

    要执行的任务的映射。每个映射条目都对应一个操作。请参阅操作

操作

流水线操作定义了流水线执行中的各个步骤。每个操作都必须指定引擎或框架。引擎或框架决定使用哪些资源来执行操作。

Orchestration Pipelines 支持以下操作:

  • Pyspark (pyspark):运行 PySpark 脚本。
  • 笔记本 (notebook):运行笔记本文件。
  • SQL 查询 (notebook):运行 SQL 查询。
  • Python (python):运行 Python 脚本。
  • 流水线 (pipeline):执行数据处理流水线。

Orchestration Pipelines 支持以下引擎和框架:

  • dataprocOnGce > existingCluster:由 clusterName、项目和位置标识的 Managed Service for Apache Spark 集群

  • dataprocOnGce > ephemeral:执行作业后创建并删除的 Managed Service for Apache Spark 集群

  • dataprocServerless:Managed Service for Apache Spark 批量提交

  • bigquery:BigQuery 作业

  • python > local:在 runner 环境中的 Airflow Worker 上执行的 Python 脚本。

  • dbt > airflowWorker:在运行器环境中使用 dbt-core 在 Airflow worker 上执行的 dbt 模型。

  • dataform > airflowWorker:在 runner 环境中使用 Dataform Core CLI 在 Airflow worker 上执行的 Dataform 工作流。

  • dataform > dataformService:在 Dataform 服务上执行的 Dataform 工作流。

下表列出了可能的操作类型、引擎和框架组合。如需查看操作代码示例,请参阅引擎和框架说明。

操作 引擎或框架 输出到
pyspark dataprocOnGce > existingCluster Managed Service for Apache Spark 作业日志
pyspark dataprocOnGce > ephemeralCluster Managed Service for Apache Spark 作业日志
pyspark dataprocServerless Managed Service for Apache Spark 批处理日志
notebook dataprocOnGce > existingCluster 运行程序存储桶,位于 composer_declarative_dags_resources 目录下
notebook dataprocOnGce > ephemeralCluster Managed Service for Apache Spark 作业日志
notebook dataprocServerless 运行程序存储桶,位于 composer_declarative_dags_resources 目录下
sql bigquery destinationTable 参数中指定的表
sql dataprocServerless Managed Service for Apache Spark 批处理日志。
python local(本地执行) 日志
pipeline dbt > airflowWorker 日志和 BigQuery
pipeline dataform > airflowWorker BigQuery 中指定的表
pipeline dataform > dataformService 在 Dataform 中

所有操作都具有以下通用键。其他键取决于操作类型。

  • name:操作名称。此名称会映射到运行器环境中的 Airflow 任务名称。如果某项操作需要多个 Airflow 任务,则此名称会映射到任务组。

  • dependsOn:此操作所依赖的上游操作名称列表,用于定义执行顺序。如果任何上游操作失败,则不会执行依赖于这些操作的下游操作。

  • executionTimeout:执行操作的超时时间。示例:1h30m40s

Python

python 类型的操作。执行 Python 脚本。

特定于操作类型的键:

  • mainFilePath:Python 脚本文件的相对路径。
  • pythonCallable:要在 Python 脚本中执行的 Python 可调用对象的名称。
  • opKwargs:运算符的关键字实参的映射。
  • (可选)environment:在动态创建的 Python 虚拟环境中执行脚本。

    • requirements:虚拟环境的要求。需求在运行时得到解决。

      • inline:要求以内嵌方式指定。

        • list:要求列表。根据 PEP-508 列出各个要求。

          示例:

          environment:
            requirements:
              inline:
                list: ["pandas>=2.0.0"]
          
      • (替代方案)path:包含要求的文件的路径。此文件中的要求必须按照 PEP-508 列出。

        示例:

        environment:
          requirements:
            path: "scripts/requirements.txt"
        
    • systemSitePackages:如果为 true,则虚拟环境会从 Airflow 工作程序的 site-packages 目录继承软件包。您可以在 runner 环境中安装自定义 PyPI 软件包

  • engine

    • local:在 Runner 环境中本地执行

示例:

本地

modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 1

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - python:
      name: "first_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        systemSitePackages: true
        requirements:
          path: "scripts/requirements.txt"

  - python:
      name: "second_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        requirements:
          inline:
            list: ["pandas>=2.0.0"]
        systemSitePackages: true
      dependsOn: ["first_script_run"]

  - python:
      name: "third_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_2.py"
      pythonCallable: "main"
      engine:
        local: {}
      opKwargs:
        api_endpoint: "https://api.my-vendor.example.com/v1/status"
        api_key_secret_name: "my-vendor-api-key"
      dependsOn: ["first_script_run"]

pyspark

pyspark 类型的操作。执行 PySpark 脚本。

特定于操作类型的键:

  • mainFilePath:PySpark 脚本的相对路径。
  • archiveUris:要在此操作中使用的归档 URI 的列表。
  • stagingBucket:要在此操作中使用的 Cloud Storage 存储桶。
  • pyFiles:要在此 Spark 作业中使用的 Python 文件列表。
  • environment:Python 环境配置。

    • requirements:要使用的 Python 要求文件。

      • path:包含要求的文件的路径。此文件中的要求必须按照 PEP-508 列出。
  • engine

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

示例:

existingCluster

modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 4 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-dataproc-cluster"
            location: "us-central1"
            projectId: "example-project"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"

ephemeralCluster

pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - pyspark:
      name: "run_dataproc_ephemeral"
      executionTimeout: "1h"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            projectId: "example-project"
            location: "us-central1"
            clusterName: "ephemeral-cluster-inline"
            resourceProfile:
              inline:
                config:
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
            properties:
              spark.submit.deployMode: "cluster"

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pyspark:
      name: "run-pyspark-on-dataproc-serverless"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            inline:
              environmentConfig:
                executionConfig:
                  serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
                  networkUri: "projects/example-project/global/networks/default"
              runtimeConfig:
                version: "2.3"
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

笔记本

notebook 类型的操作。通过 Papermill 执行 .ipynb 笔记本。

特定于操作类型的键:

  • mainFilePath:笔记本文件的相对路径。
  • archiveUris:要在此操作中使用的归档 URI 的列表。
  • stagingBucket:要在此操作中使用的 Cloud Storage 存储桶。
  • environment:Python 环境配置。

    • requirements:要使用的 Python 要求文件。

      • path:包含要求的文件的路径。此文件中的要求必须按照 PEP-508 列出。
  • engine

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

示例:

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - notebook:
      name: "run-notebook-on-dataproc-serverless"
      mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
      archiveUris:
        - "gs://example-bucket-additional-data/custom_venv.tar.gz"
      staging_bucket: "example-bucket-additional-data-additional-data"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

sql

sql 类型的操作。执行 SQL 查询。

特定于操作类型的键:

  • query:定义查询。

    • path:查询是在位于部署配置文件相对路径的文件中定义的。
    • inline:查询是内嵌定义的。

  • engine

    • bigQuery
    • dataprocServerless
    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster

bigQuery

modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run_bigquery_insert_job_create"
      query:
        inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
      engine:
        bigquery:
          location: "US"
  - sql:
      name: "run_bigquery_insert_job_select"
      query:
        path: "sql-scripts/count_rows.sql"
      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"
      dependsOn:
        - "run_bigquery_insert_job_create"

dataprocServerless

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-dataproc"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocServerless:
          location: "us-central1"
          impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
          resourceProfile:
            inline:
              runtimeConfig:
                properties:
                  spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
                  spark.sql.catalog.bigquery.project: "example-project"
                  dataproc.sparkBqConnector.version: "0.42.3"

existingCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-existing-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "cluster-sql"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"

ephemeralCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-ephemeral-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            clusterName: "example-ephemeral-cluster"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
            resourceProfile:
              inline:
                clusterConfig:
                  gceClusterConfig:
                    zoneUri: "us-central1-a"
                    metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
                      SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"

流水线

pipeline 类型的操作。执行数据处理流水线。

特定于操作类型的键:

  • framework

    • dbt
    • dataform > airflowWorker
    • dataform > dataformService

示例:

dbt

modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "dbt-action"
      framework:
        dbt:
          airflowWorker:
            projectDirectoryPath: "dbt_project"
            selectModels: ["model_1", "model_2"]

dataform>airflowWorker

modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          airflowWorker:
            projectDirectoryPath: "dataform_local"

dataform>dataformService

modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"

  - pipeline:
      name: "run_dataform_compilation"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
              invocationConfig:
                serviceAccount: "example-account@example-project.iam.gserviceaccount.com"

引擎

操作中使用的引擎。

dataprocOnGce > existingCluster

在由 clusterName、项目和位置标识的现有 Managed Service for Apache Spark 集群中执行。

您可以在部署配置中管理指定集群,也可以在 Managed Service for Apache Spark 中手动管理。 建议定期升级集群。

键:

  • clusterName:集群的名称
  • location:集群所在的区域
  • projectId:集群所在项目的 ID
  • propertiesSpark 作业属性的映射。

示例:

engine:
  dataprocOnGce:
    existingCluster:
      clusterName: "example-dataproc-cluster"
      location: "us-central1"
      projectId: "example-project"
      impersonationChain: "example-account@example-project.iam.gserviceaccount.com"

dataprocOnGce > ephemeralCluster

在临时 Managed Service for Apache Spark 集群中执行,该集群在执行作业后创建并删除。

键:

  • clusterName:集群的名称
  • location:集群所在的区域
  • projectId:集群所在项目的 ID
  • impersonationChain:用于运行操作的服务账号模拟链
  • resourceProfile:Managed Service for Apache Spark 集群资源配置。

    如需了解可用字段的说明,请参阅 Managed Service for Apache Spark 文档中的 ClusterConfig

    您可以通过以下方式指定资源配置文件:

    • inline:定义为流水线配置的一部分。
    • path:在位于相对路径的文件中定义。
    • external_config_path:在 Cloud Storage 存储桶中的文件中定义。与需要提交和部署才能更新资源配置文件值的 inlinepath 选项不同,外部资源配置文件会在每次流水线运行时解析,您可以在不重新部署流水线的情况下更新它。

    可以使用 override 键将替换项应用于指定的资源配置文件。通过深度合并将替换项应用于提供的预来源配置文件。

  • propertiesSpark 作业属性的映射。

示例:

engine:
  dataprocOnGce:
    ephemeralCluster:
      projectId: "example-project"
      location: "us-central1"
      clusterName: "example-ephemeral-cluster"
      resourceProfile:
        inline:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
            workerConfig:
              numInstances: 2
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
      properties:
        spark.submit.deployMode: "cluster"

dataprocServerless

在 Managed Service for Apache Spark 批量提交中执行。

键:

  • location:必须执行 Spark 作业的区域。
  • impersonationChain:用于运行操作的服务账号模拟链
  • resourceProfile:Managed Service for Apache Spark 资源配置文件。

    您可以通过以下方式指定资源配置文件:

    • inline:定义为流水线配置的一部分。
    • path:在位于相对路径的文件中定义。
    • external_config_path:在 Cloud Storage 存储桶中的文件中定义。与需要提交和部署才能更新资源配置文件值的 inlinepath 选项不同,外部资源配置文件会在每次流水线运行时解析,您可以在不重新部署流水线的情况下更新它。

    以下键用于指定资源配置文件配置:

    • environmentConfig:环境配置
    • runtimeConfig:运行时配置

    如需了解可用字段的说明,请参阅 Managed Service for Apache Spark 文档中的 RuntimeConfigEnvironmentConfig

    可以使用 override 键将替换项应用于指定的资源配置文件。通过深度合并将替换项应用于所提供的资源配置文件。

示例(内嵌):

engine:
  dataprocServerless:
    location: "us-central1"
    resourceProfile:
      inline:
        environmentConfig:
          executionConfig:
            serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
            networkUri: "projects/example-project/global/networks/default"
        runtimeConfig:
          version: "2.3"
          properties:
            spark.app.name: "run-notebook-on-dataproc-serverless"
            spark.executor.instances: "2"
            spark.driver.cores: "4"

示例(外部路径和替换项):

      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                    spark.app.name: "run-notebook-on-dataproc-serverless"
                    spark.executor.instances: "2"
                    spark.driver.cores: "4"

bigQuery

以 BigQuery 作业的形式执行。

键:

  • location:目标表所在的区域。
  • destinationTable:用于输出数据的 BigQuery 表
  • impersonationChain:用于运行操作的服务账号模拟链

示例:

      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"

局部

在 runner 环境中本地执行。

如需了解配置虚拟环境的方法,请参阅 python 操作。

示例:

    engine:
      local: {}

框架

操作中使用的框架。

dbt > airflowWorker

使用 dbt-core 在 runner 环境中执行在 Airflow 工作器上执行的 dbt 模型。

键:

  • projectDirectoryPath:包含 DBT 项目的文件夹的相对路径。
  • selectModels:要按名称纳入运行中的模型列表(相当于 dbt --select)。
  • tags:要按标记纳入运行的模型列表(相当于 dbt --select)。

示例:

framework:
  dbt:
    airflowWorker:
      projectDirectoryPath: "dbt_project"
      selectModels: ["model_1", "model_2"]

dataform > airflowWorker

在运行程序环境中,使用 Dataform Core CLI 在 Airflow 工作人员上执行的 Dataform 工作流。

键:

  • projectDirectoryPath:包含 Dataform 工作流定义的文件夹的相对路径。

示例:

framework:
  dataform:
    airflowWorker:
      projectDirectoryPath: "dataform_local"

dataform > dataformService

运行在 Dataform 服务上执行的 Dataform 工作流。

键:

  • location:Dataform 代码库所在的位置。
  • projectId:Dataform 代码库所在的项目。
  • repositoryId:Dataform 代码库 ID
  • workflowInvocation:工作流调用的配置,用于指定要运行的操作。请参阅 WorkflowInvocation

示例:

framework:
  dataform:
    dataformService:
      location: "us-central1"
      projectId: "example-project"
      repositoryId: "example-repository"
      workflowInvocation:
        compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
        invocationConfig:
          serviceAccount: "example-account@example-project.iam.gserviceaccount.com"