このページでは、Orchestration Pipelines DSL のリファレンスについて説明します。
プレビュー版の制限事項
プレビュー版では、Orchestration Pipelines に次の制限があります。
pysparkアクションとnotebookアクションの場合:- すべての
pysparkアクションとnotebookアクションで 1 つのrequirements.txtファイルのみがサポートされます。 uvツールを使用したパッケージのビルドは、Windows プラットフォームではサポートされていません。- 事前構築済みのバイナリを含む Python パッケージのみがサポートされます。
- すべての
sqlアクションの場合:queryキーのinline定義はサポートされていません。
形式と値について
パイプラインは YAML 形式で定義され、リポジトリ内のパイプラインごとに別々のファイルに保存する必要があります。
Orchestration Pipelines では、パイプライン定義とデプロイ構成で変数を使用する方法がいくつか用意されています。たとえば、カスタム変数を定義したり、GitHub シークレットを使用したり、コマンドラインで変数値を置き換えたりできます。詳細については、変数、シークレット、置換をご覧ください。
パイプライン バンドルにパイプラインを追加する方法については、別のパイプラインを追加するをご覧ください。
コードの例
GitHub の orchestration-pipelines リポジトリには、多くのパイプライン アクションとエンジンを組み合わせた最新のコード例が用意されています。これらの例は、Orchestration Pipelines の機能を調べるための出発点として使用することをおすすめします。
パイプラインの定義
パイプライン定義には、次の最上位キーがあります。
modelVersion: パイプライン定義モデルのバージョン。最新のモデル バージョンは1.0です。pipelineId: パイプラインの固有識別子。この ID は複数のデプロイとバージョンで一貫性が保たれるため、論理パイプライン エンティティの追跡と管理が可能になります。description: パイプラインの説明。ランナー環境の Airflow DAG の説明にマッピングされます。owner: パイプラインのオーナー。tags: パイプラインに適用される文字列識別子。パイプラインのフィルタリングに使用されます。notifications: パイプライン イベントに関する通知。サポートされている通知タイプ:onPipelineFailure: パイプラインの失敗に関するメール。
通知には、ランナー環境で構成された SendGrid メールサービスが必要です。手順については、メール通知を構成するをご覧ください。
例:
notifications: onPipelineFailure: email: ["user1@example.com", "user2@example.com"]runner: ターゲット オーケストレーション エンジンを指定します。将来の使用のために予約されています。airflowに設定します。defaults: 特定のアクション内でオーバーライドされない限り、すべてのアクションに適用されるproject_id、location、executionConfigなどのプロパティのデフォルト値を設定します。project_idプロパティとlocationプロパティは、個々のアクション プロパティでオーバーライドできます。executionConfigプロパティは個々のアクションでオーバーライドできません。retriesフィールドで、パイプライン内のすべてのアクションの再試行回数を指定します。triggers: パイプラインの開始方法を定義します。値なし。パイプラインは手動でトリガーできます。
schedule。cron 式を使用して、スケジュールに基づいてパイプラインをトリガーします。スケジュール例:
triggers: - schedule: interval: "0 5 * * *" startTime: "2025-10-01T00:00:00" endTime: "2026-10-01T00:00:00" catchup: false timezone: "UTC"
actions実行されるタスクのマッピング。各マッピング エントリは 1 つのアクションに対応します。アクションをご覧ください。
操作
パイプライン アクションは、パイプライン実行の個々のステップを定義します。各アクションには、エンジンまたはフレームワークを指定する必要があります。エンジンまたはフレームワークによって、アクションの実行に使用されるリソースが決まります。
Orchestration Pipelines は、次のアクションをサポートしています。
- Pyspark(
pyspark): PySpark スクリプトを実行します。 - ノートブック(
notebook): ノートブック ファイルを実行します。 - SQL クエリ(
notebook): SQL クエリを実行します。 - Python(
python): Python スクリプトを実行します。 - パイプライン(
pipeline): データ処理パイプラインを実行します。
Orchestration Pipelines は、次のエンジンとフレームワークをサポートしています。
dataprocOnGce>existingCluster: clusterName、プロジェクト、ロケーションで識別される Managed Service for Apache Spark クラスタ。dataprocOnGce>ephemeral: ジョブの実行後に Managed Service for Apache Spark クラスタが作成され、削除されます。dataprocServerless: Managed Service for Apache Spark のバッチ送信。bigquery: BigQuery ジョブ。python>local: ランナー環境の Airflow ワーカーで実行される Python スクリプト。dbt>airflowWorker:dbt-coreを使用してランナー環境の Airflow ワーカーで実行される dbt モデル。dataform>airflowWorker: dataform core cli を使用して、ランナー環境の Airflow ワーカーで実行される Dataform ワークフロー。dataform>dataformService: Dataform サービスで実行された Dataform ワークフロー。
次の表に、考えられるアクション タイプ、エンジン、フレームワークの組み合わせを示します。アクション コードの例については、エンジンとフレームワークの説明をご覧ください。
| アクション | エンジンまたはフレームワーク | 出力先 |
|---|---|---|
pyspark |
dataprocOnGce > existingCluster |
Managed Service for Apache Spark ジョブログ |
pyspark |
dataprocOnGce > ephemeralCluster |
Managed Service for Apache Spark ジョブログ |
pyspark |
dataprocServerless |
Managed Service for Apache Spark バッチログ |
notebook |
dataprocOnGce > existingCluster |
composer_declarative_dags_resources ディレクトリの Runner バケット |
notebook |
dataprocOnGce > ephemeralCluster |
Managed Service for Apache Spark ジョブログ |
notebook |
dataprocServerless |
composer_declarative_dags_resources ディレクトリの Runner バケット |
sql |
bigquery |
destinationTable パラメータで指定されたテーブル |
sql |
dataprocServerless |
Managed Service for Apache Spark バッチログ。 |
python |
local(ローカル実行) |
ログ |
pipeline |
dbt > airflowWorker |
ログと BigQuery |
pipeline |
dataform > airflowWorker |
BigQuery で指定されたテーブル |
pipeline |
dataform > dataformService |
Dataform で |
すべてのアクションには、次の共通鍵があります。その他のキーはアクション タイプによって異なります。
name: アクション名。この名前は、ランナー環境の Airflow タスク名にマッピングされます。アクションに複数の Airflow タスクが必要な場合、この名前はタスクグループにマッピングされます。dependsOn: このアクションが依存するアップストリーム アクション名のリスト。実行順序を定義します。上流アクションのいずれかが失敗すると、それに依存する下流アクションは実行されません。executionTimeout: アクションを実行するタイムアウト。例:1h、30m、40s。
python
python タイプの操作。Python スクリプトを実行します。
アクション タイプ固有のキー:
mainFilePath: Python スクリプト ファイルの相対パス。pythonCallable: Python スクリプトで実行する Python 呼び出し可能関数の名前。opKwargs: オペレータのキーワード引数のマッピング。(省略可)
environment: 動的に作成された Python 仮想環境内でスクリプトを実行します。requirements: 仮想環境の要件。要件は実行時に解決されます。systemSitePackages:trueの場合、仮想環境は Airflow ワーカーの site-packages ディレクトリからパッケージを継承します。ランナー環境にカスタム PyPI パッケージをインストールできます。
engine:local: ランナー環境でのローカル実行
例:
ローカル
modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 1
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- python:
name: "first_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
systemSitePackages: true
requirements:
path: "scripts/requirements.txt"
- python:
name: "second_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
requirements:
inline:
list: ["pandas>=2.0.0"]
systemSitePackages: true
dependsOn: ["first_script_run"]
- python:
name: "third_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_2.py"
pythonCallable: "main"
engine:
local: {}
opKwargs:
api_endpoint: "https://api.my-vendor.example.com/v1/status"
api_key_secret_name: "my-vendor-api-key"
dependsOn: ["first_script_run"]
pyspark
pyspark タイプの操作。PySpark スクリプトを実行します。
アクション タイプ固有のキー:
mainFilePath: PySpark スクリプトへの相対パス。archiveUris: このアクションで使用するアーカイブ URI のリスト。stagingBucket: このアクションで使用する Cloud Storage バケット。pyFiles: この Spark ジョブで使用する Python ファイルのリスト。environment: Python 環境の構成。requirements: 使用する Python 要件ファイル。path: 要件を含むファイルへのパス。このファイルの要件は、PEP-508 に従ってリストする必要があります。
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
例:
existingCluster
modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 4 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run-pyspark-with-pyfiles-on-existing-cluster"
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
ephemeralCluster
pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run_dataproc_ephemeral"
executionTimeout: "1h"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "ephemeral-cluster-inline"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pyspark:
name: "run-pyspark-on-dataproc-serverless"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
ノートブック
notebook タイプの操作。Papermill を使用して .ipynb ノートブックを実行します。
アクション タイプ固有のキー:
mainFilePath: ノートブック ファイルの相対パス。archiveUris: このアクションで使用するアーカイブ URI のリスト。stagingBucket: このアクションで使用する Cloud Storage バケット。environment: Python 環境の構成。requirements: 使用する Python 要件ファイル。path: 要件を含むファイルへのパス。このファイルの要件は、PEP-508 に従ってリストする必要があります。
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
例:
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- notebook:
name: "run-notebook-on-dataproc-serverless"
mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
archiveUris:
- "gs://example-bucket-additional-data/custom_venv.tar.gz"
staging_bucket: "example-bucket-additional-data-additional-data"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
sql
sql タイプの操作。SQL クエリを実行します。
アクション タイプ固有のキー:
query: クエリを定義します。path: クエリは、デプロイ構成ファイルへの相対パスにあるファイルで定義されます。inline: クエリがインラインで定義されています。
engine:bigQuerydataprocServerlessdataprocOnGce>existingClusterdataprocOnGce>ephemeralCluster
bigQuery
modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run_bigquery_insert_job_create"
query:
inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
engine:
bigquery:
location: "US"
- sql:
name: "run_bigquery_insert_job_select"
query:
path: "sql-scripts/count_rows.sql"
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
dependsOn:
- "run_bigquery_insert_job_create"
dataprocServerless
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-dataproc"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocServerless:
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
resourceProfile:
inline:
runtimeConfig:
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
dataproc.sparkBqConnector.version: "0.42.3"
existingCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-existing-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
existingCluster:
clusterName: "cluster-sql"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
ephemeralCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-ephemeral-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
ephemeralCluster:
clusterName: "example-ephemeral-cluster"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
resourceProfile:
inline:
clusterConfig:
gceClusterConfig:
zoneUri: "us-central1-a"
metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
パイプライン
pipeline タイプの操作。データ処理パイプラインを実行します。
アクション タイプ固有のキー:
framework:dbtdataform>airflowWorkerdataform>dataformService
例:
dbt
modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "dbt-action"
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform>airflowWorker
modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform>dataformService
modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"
- pipeline:
name: "run_dataform_compilation"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
エンジン
アクションで使用されるエンジン。
dataprocOnGce > existingCluster
clusterName、プロジェクト、ロケーションで識別される既存の Managed Service for Apache Spark クラスタで実行します。
指定したクラスタは、デプロイ構成で管理するか、Managed Service for Apache Spark で手動で管理できます。クラスタは定期的にアップグレードすることをおすすめします。
キー:
clusterName: クラスタの名前location: クラスタが配置されているリージョンprojectId: クラスタが配置されているプロジェクトのプロジェクト IDproperties: Spark ジョブ プロパティのマップ。
例:
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
dataprocOnGce > ephemeralCluster
ジョブの実行後に作成および削除されるエフェメラル Managed Service for Apache Spark クラスタで実行します。
キー:
clusterName: クラスタの名前location: クラスタが配置されているリージョンprojectId: クラスタが配置されているプロジェクトのプロジェクト IDimpersonationChain: アクションの実行に使用するサービス アカウントの権限借用チェーン。resourceProfile: Managed Service for Apache Spark クラスタのリソース プロファイル。使用可能なフィールドの説明については、Managed Service for Apache Spark ドキュメントの ClusterConfig をご覧ください。
リソース プロファイルは次の方法で指定できます。
inline: パイプライン構成の一部として定義されます。path: 相対パスにあるファイルで定義されます。external_config_path: Cloud Storage バケットにあるファイルで定義されます。inlineオプションとpathオプションでは、リソース プロファイル値を更新するために commit とデプロイが必要ですが、外部リソース プロファイルはパイプラインの実行ごとに解決されるため、パイプラインを再デプロイせずに更新できます。
オーバーライドは、
overrideキーを使用して、指定されたリソース プロファイルに適用できます。オーバーライドは、指定された事前ソース プロファイルにディープ マージで適用されます。properties: Spark ジョブ プロパティのマップ。
例:
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "example-ephemeral-cluster"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
Managed Service for Apache Spark のバッチ送信で実行します。
キー:
location: Spark ジョブを実行する必要があるリージョン。impersonationChain: アクションの実行に使用するサービス アカウントの権限借用チェーン。resourceProfile: Managed Service for Apache Spark リソース プロファイル。リソース プロファイルは次の方法で指定できます。
inline: パイプライン構成の一部として定義されます。path: 相対パスにあるファイルで定義されます。external_config_path: Cloud Storage バケットにあるファイルで定義されます。inlineオプションとpathオプションでは、リソース プロファイル値を更新するために commit とデプロイが必要ですが、外部リソース プロファイルはパイプラインの実行ごとに解決されるため、パイプラインを再デプロイせずに更新できます。
次のキーは、リソース プロファイル構成を指定します。
environmentConfig: 環境構成runtimeConfig: ランタイム構成
使用可能なフィールドの説明については、Managed Service for Apache Spark のドキュメントの RuntimeConfig と EnvironmentConfig をご覧ください。
オーバーライドは、
overrideキーを使用して、指定されたリソース プロファイルに適用できます。オーバーライドは、指定されたリソース プロファイルにディープ マージで適用されます。
例(インライン):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
例(外部パスとオーバーライド):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
bigQuery
BigQuery ジョブとして実行します。
キー:
location: 宛先テーブルが配置されているリージョン。destinationTable: データを出力する BigQuery テーブルimpersonationChain: アクションの実行に使用するサービス アカウントの権限借用チェーン。
例:
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
ローカル
ランナー環境でローカルに実行します。
VirtualEnvironment の構成方法については、python アクションをご覧ください。
例:
engine:
local: {}
フレームワーク
アクションで使用されるフレームワーク。
dbt > airflowWorker
dbt-core を使用して、ランナー環境で Airflow ワーカーで実行される dbt モデルを実行します。
キー:
projectDirectoryPath: DBT プロジェクトを含むフォルダの相対パス。selectModels: 実行に含めるモデルのリスト(名前別)(dbt --selectと同等)。tags: タグで実行に含めるモデルのリスト(dbt --selectと同等)。
例:
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform > airflowWorker
dataform core cli を使用して、ランナー環境の Airflow ワーカーで実行された Dataform ワークフロー。
キー:
projectDirectoryPath: Dataform ワークフロー定義を含むフォルダの相対パス。
例:
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform > dataformService
Dataform サービスで実行された Dataform ワークフローを実行します。
キー:
location: Dataform リポジトリが配置されているロケーション。projectId: Dataform リポジトリが配置されているプロジェクト。repositoryId: Dataform リポジトリ IDworkflowInvocation: ワークフロー呼び出しの構成。実行するアクションを指定します。WorkflowInvocation をご覧ください。
例:
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"