Orchestration Pipelines DSL 參考資料

本頁包含 Orchestration Pipelines DSL 參考資料。

預先發布版限制

在預覽期間,Orchestration Pipelines 有下列限制:

  • pysparknotebook 動作:

    • 系統僅支援所有 pysparknotebook 動作的單一 requirements.txt 檔案。
    • 透過 uv 工具建構套件時,不支援 Windows 平台。
    • 系統僅支援含有預先建構二進位檔的 Python 套件。
  • 如要執行 sql 動作:

    • 不支援 query 鍵中的 inline 定義。

關於格式和值

管道是以 YAML 格式定義,且必須儲存在存放區中,每個管道各別儲存在一個檔案。

Orchestration Pipelines 提供多種方式,可在管道定義和部署設定中使用變數。舉例來說,您可以定義自訂變數、使用 GitHub 祕密,以及在指令列中替換變數值。詳情請參閱「變數、密鑰和替代」。

如要進一步瞭解如何將其他管道新增至管道套件,請參閱「新增其他管道」。

程式碼範例

GitHub 上的 orchestration-pipelines 存放區提供許多管道動作和引擎組合的最新程式碼範例。建議您先從這些範例著手,瞭解 Orchestration Pipelines 的功能。

pipeline 定義

管道定義包含下列頂層鍵:

  • modelVersion:管道定義模型的版本。最新模型版本為 1.0

  • pipelineId:管道的專屬 ID。這個 ID 在多個部署作業和版本中保持一致,方便追蹤及管理邏輯管道實體。

  • description:管道說明,會對應至執行器環境中 Airflow DAG 的說明。

  • owner:管道擁有者。

  • tags:套用至管道的字串 ID,用於篩選管道。

  • notifications:管道事件的通知。支援的通知類型:

    • onPipelineFailure:管道失敗時傳送電子郵件。

    如要接收通知,您必須在執行程式環境中設定 SendGrid 電子郵件服務。如需操作說明,請參閱「設定電子郵件通知」。

    範例:

    notifications:
      onPipelineFailure:
        email: ["user1@example.com", "user2@example.com"]
    
  • runner:指定目標協調引擎。保留供日後使用。 請將這個值設為 airflow

  • defaults:為 project_idlocationexecutionConfig 等屬性設定預設值,除非在特定動作中覆寫,否則這些值會套用至所有動作。個別動作屬性可以覆寫 project_idlocation 屬性。executionConfig 屬性無法在個別動作中覆寫,且會在 retries 欄位中指定管道中所有動作的重試次數。

  • triggers:定義管道的啟動方式:

    • 不具現金價值。管道仍可手動觸發

    • schedule使用 Cron 運算式,依排程觸發管道。

      範例時間表:

      triggers:
        - schedule:
            interval: "0 5 * * *"
            startTime: "2025-10-01T00:00:00"
            endTime: "2026-10-01T00:00:00"
            catchup: false
            timezone: "UTC"
      
  • actions

    要執行的工作對應。每個對應項目都對應一個動作。請參閱「動作」。

動作

管道動作會定義管道執行中的個別步驟。每個動作都必須指定引擎或架構。引擎或框架會決定要使用哪些資源來執行動作。

Orchestration Pipelines 支援下列動作:

  • Pyspark (pyspark):執行 PySpark 指令碼。
  • 筆記本 (notebook):執行筆記本檔案。
  • SQL 查詢 (notebook):執行 SQL 查詢。
  • Python (python):執行 Python 指令碼。
  • 管道 (pipeline):執行資料處理管道。

Orchestration Pipelines 支援下列引擎和架構:

  • dataprocOnGce > existingCluster: 由 clusterName、專案和位置識別的 Managed Service for Apache Spark 叢集

  • dataprocOnGce > ephemeral: 執行工作後,系統會建立並刪除 Managed Service for Apache Spark 叢集

  • dataprocServerless:Managed Service for Apache Spark 批次提交

  • bigquery:BigQuery 工作

  • python > local:在執行器環境中,於 Airflow Worker 上執行的 Python 指令碼。

  • dbt > airflowWorker:在執行器環境中,使用 dbt-core 在 Airflow 工作站執行的 dbt 模型。

  • dataform > airflowWorker:在執行器環境中,使用 dataform core CLI,在 Airflow 工作站上執行的 Dataform 工作流程。

  • dataform > dataformService:在 Dataform 服務上執行的 Dataform 工作流程。

下表列出可能採用的動作類型、引擎和架構組合。如需動作程式碼範例,請參閱引擎和架構說明。

動作 引擎或架構 輸出至
pyspark dataprocOnGce > existingCluster Managed Service for Apache Spark 工作記錄
pyspark dataprocOnGce > ephemeralCluster Managed Service for Apache Spark 工作記錄
pyspark dataprocServerless Managed Service for Apache Spark 批次記錄
notebook dataprocOnGce > existingCluster 執行器 bucket,位於 composer_declarative_dags_resources 目錄下
notebook dataprocOnGce > ephemeralCluster Managed Service for Apache Spark 工作記錄
notebook dataprocServerless 執行器 bucket,位於 composer_declarative_dags_resources 目錄下
sql bigquery destinationTable 參數中指定的資料表
sql dataprocServerless Managed Service for Apache Spark 批次記錄。
python local (本機執行) 記錄
pipeline dbt > airflowWorker 記錄和 BigQuery
pipeline dataform > airflowWorker BigQuery 中指定的資料表
pipeline dataform > dataformService 在 Dataform 中

所有動作都有下列通用鍵。其他鍵則取決於動作類型。

  • name:動作名稱。這個名稱會對應至執行器環境中的 Airflow 工作名稱。如果動作需要多個 Airflow 工作,這個名稱會對應至工作群組。

  • dependsOn:這個動作所依附的上游動作名稱清單,用於定義執行順序。如果任何上游動作失敗,系統就不會執行依附於這些動作的下游動作。

  • executionTimeout:執行動作的逾時時間。例如:1h30m40s

python

python 類型的動作。執行 Python 指令碼。

動作類型專屬鍵:

  • mainFilePath:Python 指令碼檔案的相對路徑。
  • pythonCallable:要在 Python 指令碼中執行的 Python 可呼叫函式名稱。
  • opKwargs:運算子的關鍵字引數對應。
  • (選用) environment:在動態建立的 Python 虛擬環境中執行指令碼。

    • requirements:虛擬環境的規定。系統會在執行階段解決這些需求。

      • inline:規定會內嵌指定。

        • list:需求清單。根據 PEP-508 列出個別需求。

          範例:

          environment:
            requirements:
              inline:
                list: ["pandas>=2.0.0"]
          
      • (替代做法) path:含有需求的檔案路徑。這個檔案中的需求必須按照 PEP-508 列出。

        範例:

        environment:
          requirements:
            path: "scripts/requirements.txt"
        
    • systemSitePackages:如果設為 true,虛擬環境會從 Airflow 工作人員的 site-packages 目錄繼承套件。您可以在執行器環境中安裝自訂 PyPI 套件

  • engine

    • local:在執行器環境中本機執行

範例:

當地

modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 1

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - python:
      name: "first_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        systemSitePackages: true
        requirements:
          path: "scripts/requirements.txt"

  - python:
      name: "second_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        requirements:
          inline:
            list: ["pandas>=2.0.0"]
        systemSitePackages: true
      dependsOn: ["first_script_run"]

  - python:
      name: "third_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_2.py"
      pythonCallable: "main"
      engine:
        local: {}
      opKwargs:
        api_endpoint: "https://api.my-vendor.example.com/v1/status"
        api_key_secret_name: "my-vendor-api-key"
      dependsOn: ["first_script_run"]

pyspark

pyspark 類型的動作。執行 PySpark 指令碼。

動作類型專屬鍵:

  • mainFilePath:PySpark 指令碼的相對路徑。
  • archiveUris:要用於這項動作的封存 URI 清單。
  • stagingBucket:要搭配這項動作使用的 Cloud Storage bucket。
  • pyFiles:要用於這項 Spark 工作的 Python 檔案清單。
  • environment:Python 環境設定。

    • requirements:要使用的 Python 需求檔案。

      • path:包含規定的檔案路徑。這個檔案中的需求必須按照 PEP-508 列出。
  • engine

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

範例:

existingCluster

modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 4 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-dataproc-cluster"
            location: "us-central1"
            projectId: "example-project"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"

ephemeralCluster

pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - pyspark:
      name: "run_dataproc_ephemeral"
      executionTimeout: "1h"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            projectId: "example-project"
            location: "us-central1"
            clusterName: "ephemeral-cluster-inline"
            resourceProfile:
              inline:
                config:
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
            properties:
              spark.submit.deployMode: "cluster"

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pyspark:
      name: "run-pyspark-on-dataproc-serverless"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            inline:
              environmentConfig:
                executionConfig:
                  serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
                  networkUri: "projects/example-project/global/networks/default"
              runtimeConfig:
                version: "2.3"
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

筆記本

notebook 類型的動作。透過 Papermill 執行 .ipynb 筆記本。

動作類型專屬鍵:

  • mainFilePath:筆記本檔案的相對路徑。
  • archiveUris:要用於這項動作的封存 URI 清單。
  • stagingBucket:要搭配這項動作使用的 Cloud Storage bucket。
  • environment:Python 環境設定。

    • requirements:要使用的 Python 需求檔案。

      • path:包含規定的檔案路徑。這個檔案中的需求必須按照 PEP-508 列出。
  • engine

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

範例:

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - notebook:
      name: "run-notebook-on-dataproc-serverless"
      mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
      archiveUris:
        - "gs://example-bucket-additional-data/custom_venv.tar.gz"
      staging_bucket: "example-bucket-additional-data-additional-data"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

sql

sql 類型的動作。執行 SQL 查詢。

動作類型專屬鍵:

  • query:定義查詢。

    • path:查詢是在部署設定檔的相對路徑中定義。
    • inline:查詢是內嵌定義。

  • engine

    • bigQuery
    • dataprocServerless
    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster

bigQuery

modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run_bigquery_insert_job_create"
      query:
        inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
      engine:
        bigquery:
          location: "US"
  - sql:
      name: "run_bigquery_insert_job_select"
      query:
        path: "sql-scripts/count_rows.sql"
      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"
      dependsOn:
        - "run_bigquery_insert_job_create"

dataprocServerless

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-dataproc"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocServerless:
          location: "us-central1"
          impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
          resourceProfile:
            inline:
              runtimeConfig:
                properties:
                  spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
                  spark.sql.catalog.bigquery.project: "example-project"
                  dataproc.sparkBqConnector.version: "0.42.3"

existingCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-existing-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "cluster-sql"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"

ephemeralCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-ephemeral-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            clusterName: "example-ephemeral-cluster"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
            resourceProfile:
              inline:
                clusterConfig:
                  gceClusterConfig:
                    zoneUri: "us-central1-a"
                    metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
                      SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"

pipeline

pipeline 類型的動作。執行資料處理管道。

動作類型專屬鍵:

  • framework

    • dbt
    • dataform > airflowWorker
    • dataform > dataformService

範例:

dbt

modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "dbt-action"
      framework:
        dbt:
          airflowWorker:
            projectDirectoryPath: "dbt_project"
            selectModels: ["model_1", "model_2"]

dataform>airflowWorker

modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          airflowWorker:
            projectDirectoryPath: "dataform_local"

dataform>dataformService

modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"

  - pipeline:
      name: "run_dataform_compilation"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
              invocationConfig:
                serviceAccount: "example-account@example-project.iam.gserviceaccount.com"

引擎

動作中使用的引擎。

dataprocOnGce > existingCluster

在現有的 Managed Service for Apache Spark 叢集中執行,該叢集由 clusterName、專案和位置識別。

您可以在部署設定中管理指定叢集,也可以在 Managed Service for Apache Spark 中手動管理。建議定期升級叢集。

金鑰:

  • clusterName:叢集名稱
  • location:叢集所在的區域
  • projectId:叢集所在專案的專案 ID
  • propertiesSpark 工作屬性的地圖。

範例:

engine:
  dataprocOnGce:
    existingCluster:
      clusterName: "example-dataproc-cluster"
      location: "us-central1"
      projectId: "example-project"
      impersonationChain: "example-account@example-project.iam.gserviceaccount.com"

dataprocOnGce > ephemeralCluster

在暫時性 Managed Service for Apache Spark 叢集中執行,該叢集會在工作執行完畢後建立及刪除。

金鑰:

  • clusterName:叢集名稱
  • location:叢集所在的區域
  • projectId:叢集所在專案的專案 ID
  • impersonationChain:用於執行動作的服務帳戶模擬鏈
  • resourceProfile:Managed Service for Apache Spark 叢集資源設定檔。

    如要查看可用欄位的說明,請參閱 Managed Service for Apache Spark 說明文件中的 ClusterConfig

    您可以透過下列方式指定資源設定檔:

    • inline:定義為管道設定的一部分。
    • path:定義於相對路徑的檔案中。
    • external_config_path:定義於 Cloud Storage bucket 中的檔案。與 inlinepath 選項不同,這些選項需要修訂並部署才能更新資源設定檔值,而外部資源設定檔會在每次管道執行時解析,因此您不必重新部署管道即可更新。

    您可以使用 override 鍵,對指定的資源設定檔套用覆寫。系統會將覆寫值深層合併至提供的預先來源設定檔。

  • propertiesSpark 工作屬性的地圖。

範例:

engine:
  dataprocOnGce:
    ephemeralCluster:
      projectId: "example-project"
      location: "us-central1"
      clusterName: "example-ephemeral-cluster"
      resourceProfile:
        inline:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
            workerConfig:
              numInstances: 2
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
      properties:
        spark.submit.deployMode: "cluster"

dataprocServerless

在 Managed Service for Apache Spark 中執行批次提交

金鑰:

  • location:必須執行 Spark 工作的區域。
  • impersonationChain:用於執行動作的服務帳戶模擬鏈
  • resourceProfile:Managed Service for Apache Spark 資源設定檔。

    您可以透過下列方式指定資源設定檔:

    • inline:定義為管道設定的一部分。
    • path:定義於相對路徑的檔案中。
    • external_config_path:定義於 Cloud Storage bucket 中的檔案。與 inlinepath 選項不同,這些選項需要修訂並部署才能更新資源設定檔值,而外部資源設定檔會在每次管道執行時解析,因此您不必重新部署管道即可更新。

    下列鍵值會指定資源設定檔設定:

    • environmentConfig:環境設定
    • runtimeConfig:執行階段設定

    如需可用欄位的說明,請參閱 Managed Service for Apache Spark 說明文件中的 RuntimeConfigEnvironmentConfig

    您可以使用 override 鍵,對指定的資源設定檔套用覆寫。系統會將覆寫值與提供的資源設定檔進行深層合併。

範例 (內嵌):

engine:
  dataprocServerless:
    location: "us-central1"
    resourceProfile:
      inline:
        environmentConfig:
          executionConfig:
            serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
            networkUri: "projects/example-project/global/networks/default"
        runtimeConfig:
          version: "2.3"
          properties:
            spark.app.name: "run-notebook-on-dataproc-serverless"
            spark.executor.instances: "2"
            spark.driver.cores: "4"

範例 (外部路徑和覆寫):

      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                    spark.app.name: "run-notebook-on-dataproc-serverless"
                    spark.executor.instances: "2"
                    spark.driver.cores: "4"

bigQuery

以 BigQuery 作業的形式執行。

金鑰:

  • location:目的地資料表所在的區域。
  • destinationTable:用於輸出資料的 BigQuery 資料表
  • impersonationChain:用於執行動作的服務帳戶模擬鏈

範例:

      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"

局部

在執行器環境中在本機執行。

如要瞭解如何設定虛擬環境,請參閱 python 動作。

範例:

    engine:
      local: {}

架構

動作中使用的架構。

dbt > airflowWorker

在執行器環境中,使用 dbt-core 在 Airflow 工作站上執行 dbt 模型。

金鑰:

  • projectDirectoryPath:包含 DBT 專案的資料夾相對路徑。
  • selectModels:要納入執行作業的模型名稱清單 (相當於 dbt --select)。
  • tags:要納入執行作業的模型清單 (相當於 dbt --select)。

範例:

framework:
  dbt:
    airflowWorker:
      projectDirectoryPath: "dbt_project"
      selectModels: ["model_1", "model_2"]

dataform > airflowWorker

使用 dataform core cli,在執行器環境的 Airflow 工作站上執行的 Dataform 工作流程。

金鑰:

  • projectDirectoryPath:包含 Dataform 工作流程定義的資料夾相對路徑。

範例:

framework:
  dataform:
    airflowWorker:
      projectDirectoryPath: "dataform_local"

dataform > dataformService

執行在 Dataform 服務上執行的 Dataform 工作流程。

金鑰:

  • location:Dataform 存放區所在位置。
  • projectId:Dataform 存放區所在的專案。
  • repositoryId:Dataform 存放區 ID
  • workflowInvocation:工作流程叫用設定,用於指定要執行的動作。請參閱 WorkflowInvocation

範例:

framework:
  dataform:
    dataformService:
      location: "us-central1"
      projectId: "example-project"
      repositoryId: "example-repository"
      workflowInvocation:
        compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
        invocationConfig:
          serviceAccount: "example-account@example-project.iam.gserviceaccount.com"