Orchestration Pipelines DSL リファレンス

このページでは、Orchestration Pipelines DSL のリファレンスについて説明します。

プレビュー版の制限事項

プレビュー版では、Orchestration Pipelines に次の制限があります。

  • pyspark アクションと notebook アクションの場合:

    • すべての pyspark アクションと notebook アクションで 1 つの requirements.txt ファイルのみがサポートされます。
    • uv ツールを使用したパッケージのビルドは、Windows プラットフォームではサポートされていません。
    • 事前構築済みのバイナリを含む Python パッケージのみがサポートされます。
  • sql アクションの場合:

    • query キーの inline 定義はサポートされていません。

形式と値について

パイプラインは YAML 形式で定義され、リポジトリ内のパイプラインごとに別々のファイルに保存する必要があります。

Orchestration Pipelines では、パイプライン定義とデプロイ構成で変数を使用する方法がいくつか用意されています。たとえば、カスタム変数を定義したり、GitHub シークレットを使用したり、コマンドラインで変数値を置き換えたりできます。詳細については、変数、シークレット、置換をご覧ください。

パイプライン バンドルにパイプラインを追加する方法については、別のパイプラインを追加するをご覧ください。

コードの例

GitHub の orchestration-pipelines リポジトリには、多くのパイプライン アクションとエンジンを組み合わせた最新のコード例が用意されています。これらの例は、Orchestration Pipelines の機能を調べるための出発点として使用することをおすすめします。

パイプラインの定義

パイプライン定義には、次の最上位キーがあります。

  • modelVersion: パイプライン定義モデルのバージョン。最新のモデル バージョンは 1.0 です。

  • pipelineId: パイプラインの固有識別子。この ID は複数のデプロイとバージョンで一貫性が保たれるため、論理パイプライン エンティティの追跡と管理が可能になります。

  • description: パイプラインの説明。ランナー環境の Airflow DAG の説明にマッピングされます。

  • owner: パイプラインのオーナー。

  • tags: パイプラインに適用される文字列識別子。パイプラインのフィルタリングに使用されます。

  • notifications: パイプライン イベントに関する通知。サポートされている通知タイプ:

    • onPipelineFailure: パイプラインの失敗に関するメール。

    通知には、ランナー環境で構成された SendGrid メールサービスが必要です。手順については、メール通知を構成するをご覧ください。

    例:

    notifications:
      onPipelineFailure:
        email: ["user1@example.com", "user2@example.com"]
    
  • runner: ターゲット オーケストレーション エンジンを指定します。将来の使用のために予約されています。airflow に設定します。

  • defaults: 特定のアクション内でオーバーライドされない限り、すべてのアクションに適用される project_idlocationexecutionConfig などのプロパティのデフォルト値を設定します。project_id プロパティと location プロパティは、個々のアクション プロパティでオーバーライドできます。executionConfig プロパティは個々のアクションでオーバーライドできません。retries フィールドで、パイプライン内のすべてのアクションの再試行回数を指定します。

  • triggers: パイプラインの開始方法を定義します。

    • 値なし。パイプラインは手動でトリガーできます。

    • schedule。cron 式を使用して、スケジュールに基づいてパイプラインをトリガーします。

      スケジュール例:

      triggers:
        - schedule:
            interval: "0 5 * * *"
            startTime: "2025-10-01T00:00:00"
            endTime: "2026-10-01T00:00:00"
            catchup: false
            timezone: "UTC"
      
  • actions

    実行されるタスクのマッピング。各マッピング エントリは 1 つのアクションに対応します。アクションをご覧ください。

操作

パイプライン アクションは、パイプライン実行の個々のステップを定義します。各アクションには、エンジンまたはフレームワークを指定する必要があります。エンジンまたはフレームワークによって、アクションの実行に使用されるリソースが決まります。

Orchestration Pipelines は、次のアクションをサポートしています。

  • Pyspark(pyspark): PySpark スクリプトを実行します。
  • ノートブック(notebook): ノートブック ファイルを実行します。
  • SQL クエリ(notebook): SQL クエリを実行します。
  • Python(python): Python スクリプトを実行します。
  • パイプライン(pipeline): データ処理パイプラインを実行します。

Orchestration Pipelines は、次のエンジンとフレームワークをサポートしています。

  • dataprocOnGce > existingCluster: clusterName、プロジェクト、ロケーションで識別される Managed Service for Apache Spark クラスタ

  • dataprocOnGce > ephemeral: ジョブの実行後に Managed Service for Apache Spark クラスタが作成され、削除されます。

  • dataprocServerless: Managed Service for Apache Spark のバッチ送信

  • bigquery: BigQuery ジョブ

  • python > local: ランナー環境の Airflow ワーカーで実行される Python スクリプト。

  • dbt > airflowWorker: dbt-core を使用してランナー環境の Airflow ワーカーで実行される dbt モデル。

  • dataform > airflowWorker: dataform core cli を使用して、ランナー環境の Airflow ワーカーで実行される Dataform ワークフロー。

  • dataform > dataformService: Dataform サービスで実行された Dataform ワークフロー。

次の表に、考えられるアクション タイプ、エンジン、フレームワークの組み合わせを示します。アクション コードの例については、エンジンとフレームワークの説明をご覧ください。

アクション エンジンまたはフレームワーク 出力先
pyspark dataprocOnGce > existingCluster Managed Service for Apache Spark ジョブログ
pyspark dataprocOnGce > ephemeralCluster Managed Service for Apache Spark ジョブログ
pyspark dataprocServerless Managed Service for Apache Spark バッチログ
notebook dataprocOnGce > existingCluster composer_declarative_dags_resources ディレクトリの Runner バケット
notebook dataprocOnGce > ephemeralCluster Managed Service for Apache Spark ジョブログ
notebook dataprocServerless composer_declarative_dags_resources ディレクトリの Runner バケット
sql bigquery destinationTable パラメータで指定されたテーブル
sql dataprocServerless Managed Service for Apache Spark バッチログ。
python local(ローカル実行) ログ
pipeline dbt > airflowWorker ログと BigQuery
pipeline dataform > airflowWorker BigQuery で指定されたテーブル
pipeline dataform > dataformService Dataform で

すべてのアクションには、次の共通鍵があります。その他のキーはアクション タイプによって異なります。

  • name: アクション名。この名前は、ランナー環境の Airflow タスク名にマッピングされます。アクションに複数の Airflow タスクが必要な場合、この名前はタスクグループにマッピングされます。

  • dependsOn: このアクションが依存するアップストリーム アクション名のリスト。実行順序を定義します。上流アクションのいずれかが失敗すると、それに依存する下流アクションは実行されません。

  • executionTimeout: アクションを実行するタイムアウト。例: 1h30m40s

python

python タイプの操作。Python スクリプトを実行します。

アクション タイプ固有のキー:

  • mainFilePath: Python スクリプト ファイルの相対パス。
  • pythonCallable: Python スクリプトで実行する Python 呼び出し可能関数の名前。
  • opKwargs: オペレータのキーワード引数のマッピング。
  • (省略可)environment: 動的に作成された Python 仮想環境内でスクリプトを実行します。

    • requirements: 仮想環境の要件。要件は実行時に解決されます。

      • inline: 要件がインラインで指定されています。

        • list: 要件のリスト。PEP-508 に従って個々の要件をリストします。

          例:

          environment:
            requirements:
              inline:
                list: ["pandas>=2.0.0"]
          
      • (代替)path: 要件を含むファイルのパス。このファイルの要件は、PEP-508 に従ってリストする必要があります。

        例:

        environment:
          requirements:
            path: "scripts/requirements.txt"
        
    • systemSitePackages: true の場合、仮想環境は Airflow ワーカーの site-packages ディレクトリからパッケージを継承します。ランナー環境にカスタム PyPI パッケージをインストールできます。

  • engine:

    • local: ランナー環境でのローカル実行

例:

ローカル

modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 1

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - python:
      name: "first_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        systemSitePackages: true
        requirements:
          path: "scripts/requirements.txt"

  - python:
      name: "second_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        requirements:
          inline:
            list: ["pandas>=2.0.0"]
        systemSitePackages: true
      dependsOn: ["first_script_run"]

  - python:
      name: "third_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_2.py"
      pythonCallable: "main"
      engine:
        local: {}
      opKwargs:
        api_endpoint: "https://api.my-vendor.example.com/v1/status"
        api_key_secret_name: "my-vendor-api-key"
      dependsOn: ["first_script_run"]

pyspark

pyspark タイプの操作。PySpark スクリプトを実行します。

アクション タイプ固有のキー:

  • mainFilePath: PySpark スクリプトへの相対パス。
  • archiveUris: このアクションで使用するアーカイブ URI のリスト。
  • stagingBucket: このアクションで使用する Cloud Storage バケット。
  • pyFiles: この Spark ジョブで使用する Python ファイルのリスト。
  • environment: Python 環境の構成。

    • requirements: 使用する Python 要件ファイル。

      • path: 要件を含むファイルへのパス。このファイルの要件は、PEP-508 に従ってリストする必要があります。
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

例:

existingCluster

modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 4 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-dataproc-cluster"
            location: "us-central1"
            projectId: "example-project"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"

ephemeralCluster

pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - pyspark:
      name: "run_dataproc_ephemeral"
      executionTimeout: "1h"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            projectId: "example-project"
            location: "us-central1"
            clusterName: "ephemeral-cluster-inline"
            resourceProfile:
              inline:
                config:
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
            properties:
              spark.submit.deployMode: "cluster"

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pyspark:
      name: "run-pyspark-on-dataproc-serverless"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            inline:
              environmentConfig:
                executionConfig:
                  serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
                  networkUri: "projects/example-project/global/networks/default"
              runtimeConfig:
                version: "2.3"
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

ノートブック

notebook タイプの操作。Papermill を使用して .ipynb ノートブックを実行します。

アクション タイプ固有のキー:

  • mainFilePath: ノートブック ファイルの相対パス。
  • archiveUris: このアクションで使用するアーカイブ URI のリスト。
  • stagingBucket: このアクションで使用する Cloud Storage バケット。
  • environment: Python 環境の構成。

    • requirements: 使用する Python 要件ファイル。

      • path: 要件を含むファイルへのパス。このファイルの要件は、PEP-508 に従ってリストする必要があります。
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

例:

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - notebook:
      name: "run-notebook-on-dataproc-serverless"
      mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
      archiveUris:
        - "gs://example-bucket-additional-data/custom_venv.tar.gz"
      staging_bucket: "example-bucket-additional-data-additional-data"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

sql

sql タイプの操作。SQL クエリを実行します。

アクション タイプ固有のキー:

  • query: クエリを定義します。

    • path: クエリは、デプロイ構成ファイルへの相対パスにあるファイルで定義されます。
    • inline: クエリがインラインで定義されています。

  • engine:

    • bigQuery
    • dataprocServerless
    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster

bigQuery

modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run_bigquery_insert_job_create"
      query:
        inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
      engine:
        bigquery:
          location: "US"
  - sql:
      name: "run_bigquery_insert_job_select"
      query:
        path: "sql-scripts/count_rows.sql"
      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"
      dependsOn:
        - "run_bigquery_insert_job_create"

dataprocServerless

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-dataproc"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocServerless:
          location: "us-central1"
          impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
          resourceProfile:
            inline:
              runtimeConfig:
                properties:
                  spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
                  spark.sql.catalog.bigquery.project: "example-project"
                  dataproc.sparkBqConnector.version: "0.42.3"

existingCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-existing-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "cluster-sql"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"

ephemeralCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-ephemeral-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            clusterName: "example-ephemeral-cluster"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
            resourceProfile:
              inline:
                clusterConfig:
                  gceClusterConfig:
                    zoneUri: "us-central1-a"
                    metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
                      SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"

パイプライン

pipeline タイプの操作。データ処理パイプラインを実行します。

アクション タイプ固有のキー:

  • framework:

    • dbt
    • dataform > airflowWorker
    • dataform > dataformService

例:

dbt

modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "dbt-action"
      framework:
        dbt:
          airflowWorker:
            projectDirectoryPath: "dbt_project"
            selectModels: ["model_1", "model_2"]

dataform>airflowWorker

modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          airflowWorker:
            projectDirectoryPath: "dataform_local"

dataform>dataformService

modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"

  - pipeline:
      name: "run_dataform_compilation"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
              invocationConfig:
                serviceAccount: "example-account@example-project.iam.gserviceaccount.com"

エンジン

アクションで使用されるエンジン。

dataprocOnGce > existingCluster

clusterName、プロジェクト、ロケーションで識別される既存の Managed Service for Apache Spark クラスタで実行します。

指定したクラスタは、デプロイ構成で管理するか、Managed Service for Apache Spark で手動で管理できます。クラスタは定期的にアップグレードすることをおすすめします。

キー:

  • clusterName: クラスタの名前
  • location: クラスタが配置されているリージョン
  • projectId: クラスタが配置されているプロジェクトのプロジェクト ID
  • properties: Spark ジョブ プロパティのマップ。

例:

engine:
  dataprocOnGce:
    existingCluster:
      clusterName: "example-dataproc-cluster"
      location: "us-central1"
      projectId: "example-project"
      impersonationChain: "example-account@example-project.iam.gserviceaccount.com"

dataprocOnGce > ephemeralCluster

ジョブの実行後に作成および削除されるエフェメラル Managed Service for Apache Spark クラスタで実行します。

キー:

  • clusterName: クラスタの名前
  • location: クラスタが配置されているリージョン
  • projectId: クラスタが配置されているプロジェクトのプロジェクト ID
  • impersonationChain: アクションの実行に使用するサービス アカウントの権限借用チェーン
  • resourceProfile: Managed Service for Apache Spark クラスタのリソース プロファイル。

    使用可能なフィールドの説明については、Managed Service for Apache Spark ドキュメントの ClusterConfig をご覧ください。

    リソース プロファイルは次の方法で指定できます。

    • inline: パイプライン構成の一部として定義されます。
    • path: 相対パスにあるファイルで定義されます。
    • external_config_path: Cloud Storage バケットにあるファイルで定義されます。inline オプションと path オプションでは、リソース プロファイル値を更新するために commit とデプロイが必要ですが、外部リソース プロファイルはパイプラインの実行ごとに解決されるため、パイプラインを再デプロイせずに更新できます。

    オーバーライドは、override キーを使用して、指定されたリソース プロファイルに適用できます。オーバーライドは、指定された事前ソース プロファイルにディープ マージで適用されます。

  • properties: Spark ジョブ プロパティのマップ。

例:

engine:
  dataprocOnGce:
    ephemeralCluster:
      projectId: "example-project"
      location: "us-central1"
      clusterName: "example-ephemeral-cluster"
      resourceProfile:
        inline:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
            workerConfig:
              numInstances: 2
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
      properties:
        spark.submit.deployMode: "cluster"

dataprocServerless

Managed Service for Apache Spark のバッチ送信で実行します。

キー:

  • location: Spark ジョブを実行する必要があるリージョン。
  • impersonationChain: アクションの実行に使用するサービス アカウントの権限借用チェーン
  • resourceProfile: Managed Service for Apache Spark リソース プロファイル。

    リソース プロファイルは次の方法で指定できます。

    • inline: パイプライン構成の一部として定義されます。
    • path: 相対パスにあるファイルで定義されます。
    • external_config_path: Cloud Storage バケットにあるファイルで定義されます。inline オプションと path オプションでは、リソース プロファイル値を更新するために commit とデプロイが必要ですが、外部リソース プロファイルはパイプラインの実行ごとに解決されるため、パイプラインを再デプロイせずに更新できます。

    次のキーは、リソース プロファイル構成を指定します。

    • environmentConfig: 環境構成
    • runtimeConfig: ランタイム構成

    使用可能なフィールドの説明については、Managed Service for Apache Spark のドキュメントの RuntimeConfigEnvironmentConfig をご覧ください。

    オーバーライドは、override キーを使用して、指定されたリソース プロファイルに適用できます。オーバーライドは、指定されたリソース プロファイルにディープ マージで適用されます。

例(インライン):

engine:
  dataprocServerless:
    location: "us-central1"
    resourceProfile:
      inline:
        environmentConfig:
          executionConfig:
            serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
            networkUri: "projects/example-project/global/networks/default"
        runtimeConfig:
          version: "2.3"
          properties:
            spark.app.name: "run-notebook-on-dataproc-serverless"
            spark.executor.instances: "2"
            spark.driver.cores: "4"

例(外部パスとオーバーライド):

      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                    spark.app.name: "run-notebook-on-dataproc-serverless"
                    spark.executor.instances: "2"
                    spark.driver.cores: "4"

bigQuery

BigQuery ジョブとして実行します。

キー:

例:

      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"

ローカル

ランナー環境でローカルに実行します。

VirtualEnvironment の構成方法については、python アクションをご覧ください。

例:

    engine:
      local: {}

フレームワーク

アクションで使用されるフレームワーク。

dbt > airflowWorker

dbt-core を使用して、ランナー環境で Airflow ワーカーで実行される dbt モデルを実行します。

キー:

  • projectDirectoryPath: DBT プロジェクトを含むフォルダの相対パス。
  • selectModels: 実行に含めるモデルのリスト(名前別)(dbt --select と同等)。
  • tags: タグで実行に含めるモデルのリスト(dbt --select と同等)。

例:

framework:
  dbt:
    airflowWorker:
      projectDirectoryPath: "dbt_project"
      selectModels: ["model_1", "model_2"]

dataform > airflowWorker

dataform core cli を使用して、ランナー環境の Airflow ワーカーで実行された Dataform ワークフロー。

キー:

  • projectDirectoryPath: Dataform ワークフロー定義を含むフォルダの相対パス。

例:

framework:
  dataform:
    airflowWorker:
      projectDirectoryPath: "dataform_local"

dataform > dataformService

Dataform サービスで実行された Dataform ワークフローを実行します。

キー:

  • location: Dataform リポジトリが配置されているロケーション。
  • projectId: Dataform リポジトリが配置されているプロジェクト。
  • repositoryId: Dataform リポジトリ ID
  • workflowInvocation: ワークフロー呼び出しの構成。実行するアクションを指定します。WorkflowInvocation をご覧ください。

例:

framework:
  dataform:
    dataformService:
      location: "us-central1"
      projectId: "example-project"
      repositoryId: "example-repository"
      workflowInvocation:
        compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
        invocationConfig:
          serviceAccount: "example-account@example-project.iam.gserviceaccount.com"