이 페이지에는 오케스트레이션 파이프라인 DSL 참조가 포함되어 있습니다.
미리보기의 제한사항
미리보기 버전에서는 오케스트레이션 파이프라인에 다음과 같은 제한사항이 적용됩니다.
pyspark및notebook작업의 경우:- 모든
pyspark및notebook작업에 하나의requirements.txt파일만 지원됩니다. - Windows 플랫폼은
uv도구를 통해 패키지를 빌드하는 데 지원되지 않습니다. - 사전 빌드된 바이너리가 있는 Python 패키지만 지원됩니다.
- 모든
sql작업의 경우:query키의inline정의는 지원되지 않습니다.
형식 및 값 정보
파이프라인은 YAML 형식으로 정의되며 저장소의 파이프라인별 별도 파일에 저장해야 합니다.
오케스트레이션 파이프라인은 파이프라인 정의 및 배포 구성에서 변수를 사용하는 여러 방법을 제공합니다. 예를 들어 맞춤 변수를 정의하고, GitHub 보안 비밀을 사용하고, 명령줄에서 변수 값을 대체할 수 있습니다. 자세한 내용은 변수, 보안 비밀, 대체를 참고하세요.
파이프라인 번들에 추가 파이프라인을 추가하는 방법에 관한 자세한 내용은 다른 파이프라인 추가를 참고하세요.
코드 예시
GitHub의 orchestration-pipelines 저장소에는 다양한 파이프라인 작업과 엔진 조합에 관한 최신 코드 예시가 있습니다. 오케스트레이션 파이프라인 기능을 살펴볼 때 이러한 예시를 시작점으로 사용하는 것이 좋습니다.
파이프라인 정의
파이프라인 정의에는 다음과 같은 최상위 키가 있습니다.
modelVersion: 파이프라인 정의 모델의 버전입니다. 최신 모델 버전은1.0입니다.pipelineId: 파이프라인의 고유 식별자입니다. 이 ID는 여러 배포와 버전에서 일관되게 유지되므로 논리 파이프라인 엔티티를 추적하고 관리할 수 있습니다.description: 파이프라인 설명으로, 러너 환경의 Airflow DAG 설명에 매핑됩니다.owner: 파이프라인의 소유자입니다.tags: 파이프라인에 적용된 문자열 식별자로, 파이프라인을 필터링하는 데 사용됩니다.notifications: 파이프라인 이벤트에 관한 알림입니다. 지원되는 알림 유형:onPipelineFailure: 파이프라인 실패에 관한 이메일입니다.
알림을 사용하려면 러너 환경에 SendGrid 이메일 서비스가 구성되어 있어야 합니다. 자세한 내용은 이메일 알림 구성을 참고하세요.
예:
notifications: onPipelineFailure: email: ["user1@example.com", "user2@example.com"]runner: 타겟 오케스트레이션 엔진을 지정합니다. 나중에 사용하기 위해 예약되어 있습니다. 이 값을airflow로 설정합니다.defaults: 특정 작업 내에서 재정의되지 않는 한 모든 작업에 적용되는project_id,location,executionConfig과 같은 속성의 기본값을 설정합니다.project_id및location속성은 개별 작업 속성으로 재정의할 수 있습니다.executionConfig속성은 개별 작업에서 재정의할 수 없으며retries필드에서 파이프라인의 모든 작업에 대한 재시도 횟수를 지정합니다.triggers: 파이프라인이 시작되는 방식을 정의합니다.값 없음. 파이프라인은 여전히 수동으로 트리거할 수 있습니다.
schedule. 크론 표현식을 사용하여 일정에 따라 파이프라인을 트리거합니다.예시 일정:
triggers: - schedule: interval: "0 5 * * *" startTime: "2025-10-01T00:00:00" endTime: "2026-10-01T00:00:00" catchup: false timezone: "UTC"
actions실행할 작업의 매핑입니다. 각 매핑 항목은 하나의 작업에 해당합니다. 작업을 참고하세요.
작업
파이프라인 작업은 파이프라인 실행의 개별 단계를 정의합니다. 각 작업에는 엔진이나 프레임워크가 지정되어 있어야 합니다. 엔진 또는 프레임워크는 작업을 실행하는 데 사용되는 리소스를 결정합니다.
오케스트레이션 파이프라인은 다음 작업을 지원합니다.
- PySpark (
pyspark): PySpark 스크립트를 실행합니다. - 노트북 (
notebook): 노트북 파일을 실행합니다. - SQL 쿼리 (
notebook): SQL 쿼리를 실행합니다. - Python (
python): Python 스크립트를 실행합니다. - 파이프라인 (
pipeline): 데이터 처리 파이프라인을 실행합니다.
오케스트레이션 파이프라인은 다음 엔진과 프레임워크를 지원합니다.
dataprocOnGce>existingCluster: clusterName, project, location으로 식별되는 Managed Service for Apache Spark 클러스터입니다.dataprocOnGce>ephemeral: 작업 실행 후 생성 및 삭제된 Managed Service for Apache Spark 클러스터dataprocServerless: Managed Service for Apache Spark 일괄 제출.bigquery: BigQuery 작업python>local: 러너 환경의 Airflow 작업자에서 실행되는 Python 스크립트입니다.dbt>airflowWorker:dbt-core를 사용하여 러너 환경의 Airflow 작업자에서 실행되는 dbt 모델입니다.dataform>airflowWorker: dataform core cli를 사용하여 러너 환경의 Airflow 작업자에서 실행되는 Dataform 워크플로dataform>dataformService: Dataform 서비스에서 실행되는 Dataform 워크플로
다음 표에는 가능한 작업 유형, 엔진, 프레임워크 조합이 나와 있습니다. 작업 코드 예시는 엔진 및 프레임워크 설명을 참고하세요.
| 작업 | 엔진 또는 프레임워크 | 다음으로 출력 |
|---|---|---|
pyspark |
dataprocOnGce > existingCluster |
Managed Service for Apache Spark 작업 로그 |
pyspark |
dataprocOnGce > ephemeralCluster |
Managed Service for Apache Spark 작업 로그 |
pyspark |
dataprocServerless |
Managed Service for Apache Spark 배치 로그 |
notebook |
dataprocOnGce > existingCluster |
composer_declarative_dags_resources 디렉터리 아래의 러너 버킷 |
notebook |
dataprocOnGce > ephemeralCluster |
Managed Service for Apache Spark 작업 로그 |
notebook |
dataprocServerless |
composer_declarative_dags_resources 디렉터리 아래의 러너 버킷 |
sql |
bigquery |
destinationTable 매개변수에 지정된 테이블 |
sql |
dataprocServerless |
Managed Service for Apache Spark 배치 로그입니다. |
python |
local (로컬 실행) |
로그 |
pipeline |
dbt > airflowWorker |
로그 및 BigQuery |
pipeline |
dataform > airflowWorker |
BigQuery의 지정된 테이블 |
pipeline |
dataform > dataformService |
Dataform |
모든 작업에는 다음과 같은 공통 키가 있습니다. 다른 키는 작업 유형에 따라 다릅니다.
name: 작업 이름입니다. 이 이름은 러너 환경의 Airflow 작업 이름에 매핑됩니다. 작업에 두 개 이상의 Airflow 작업이 필요한 경우 이 이름은 작업 그룹에 매핑됩니다.dependsOn: 이 작업이 종속된 업스트림 작업 이름의 목록으로, 실행 순서를 정의합니다. 업스트림 작업 중 하나라도 실패하면 이에 종속된 다운스트림 작업이 실행되지 않습니다.executionTimeout: 작업을 실행하는 제한 시간입니다. 예:1h,30m,40s
python
python 유형의 작업입니다. Python 스크립트를 실행합니다.
작업 유형별 키:
mainFilePath: Python 스크립트 파일의 상대 경로입니다.pythonCallable: Python 스크립트에서 실행할 Python 호출 가능 객체의 이름입니다.opKwargs: 연산자의 키워드 인수의 매핑입니다.(선택사항)
environment: 동적으로 생성된 Python 가상 환경 내에서 스크립트를 실행합니다.requirements: 가상 환경의 요구사항입니다. 요구사항은 런타임에 해결됩니다.systemSitePackages:true인 경우 가상 환경이 Airflow 작업자의 site-packages 디렉터리에서 패키지를 상속합니다. 러너 환경에 커스텀 PyPI 패키지를 설치할 수 있습니다.
engine:local: 러너 환경에서의 로컬 실행
예:
로컬
modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 1
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- python:
name: "first_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
systemSitePackages: true
requirements:
path: "scripts/requirements.txt"
- python:
name: "second_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
requirements:
inline:
list: ["pandas>=2.0.0"]
systemSitePackages: true
dependsOn: ["first_script_run"]
- python:
name: "third_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_2.py"
pythonCallable: "main"
engine:
local: {}
opKwargs:
api_endpoint: "https://api.my-vendor.example.com/v1/status"
api_key_secret_name: "my-vendor-api-key"
dependsOn: ["first_script_run"]
pyspark
pyspark 유형의 작업입니다. PySpark 스크립트를 실행합니다.
작업 유형별 키:
mainFilePath: PySpark 스크립트의 상대 경로입니다.archiveUris: 이 작업에 사용할 보관 파일 URI 목록입니다.stagingBucket: 이 작업에 사용할 Cloud Storage 버킷입니다.pyFiles: 이 Spark 작업에 사용할 Python 파일 목록입니다.environment: Python 환경 구성입니다.requirements: 사용할 Python 요구사항 파일입니다.path: 요구사항이 있는 파일의 경로입니다. 이 파일의 요구사항은 PEP-508에 따라 나열해야 합니다.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
예:
existingCluster
modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 4 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run-pyspark-with-pyfiles-on-existing-cluster"
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
ephemeralCluster
pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run_dataproc_ephemeral"
executionTimeout: "1h"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "ephemeral-cluster-inline"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pyspark:
name: "run-pyspark-on-dataproc-serverless"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
노트북
notebook 유형의 작업입니다. Papermill을 통해 .ipynb 노트북을 실행합니다.
작업 유형별 키:
mainFilePath: 노트북 파일의 상대 경로입니다.archiveUris: 이 작업에 사용할 보관 파일 URI 목록입니다.stagingBucket: 이 작업에 사용할 Cloud Storage 버킷입니다.environment: Python 환경 구성입니다.requirements: 사용할 Python 요구사항 파일입니다.path: 요구사항이 있는 파일의 경로입니다. 이 파일의 요구사항은 PEP-508에 따라 나열해야 합니다.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
예:
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- notebook:
name: "run-notebook-on-dataproc-serverless"
mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
archiveUris:
- "gs://example-bucket-additional-data/custom_venv.tar.gz"
staging_bucket: "example-bucket-additional-data-additional-data"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
sql
sql 유형의 작업입니다. SQL 쿼리를 실행합니다.
작업 유형별 키:
query: 쿼리를 정의합니다.path: 쿼리가 배포 구성 파일의 상대 경로에 있는 파일에 정의됩니다.inline: 쿼리가 인라인으로 정의됩니다.
engine:bigQuerydataprocServerlessdataprocOnGce>existingClusterdataprocOnGce>ephemeralCluster
bigQuery
modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run_bigquery_insert_job_create"
query:
inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
engine:
bigquery:
location: "US"
- sql:
name: "run_bigquery_insert_job_select"
query:
path: "sql-scripts/count_rows.sql"
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
dependsOn:
- "run_bigquery_insert_job_create"
dataprocServerless
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-dataproc"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocServerless:
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
resourceProfile:
inline:
runtimeConfig:
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
dataproc.sparkBqConnector.version: "0.42.3"
existingCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-existing-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
existingCluster:
clusterName: "cluster-sql"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
ephemeralCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-ephemeral-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
ephemeralCluster:
clusterName: "example-ephemeral-cluster"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
resourceProfile:
inline:
clusterConfig:
gceClusterConfig:
zoneUri: "us-central1-a"
metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
파이프라인
pipeline 유형의 작업입니다. 데이터 처리 파이프라인을 실행합니다.
작업 유형별 키:
framework:dbtdataform>airflowWorkerdataform>dataformService
예:
dbt
modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "dbt-action"
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform>airflowWorker
modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform>dataformService
modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"
- pipeline:
name: "run_dataform_compilation"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
엔진
작업에 사용된 엔진입니다.
dataprocOnGce > existingCluster
clusterName, project, location으로 식별되는 기존 Managed Service for Apache Spark 클러스터에서 실행합니다.
배포 구성에서 지정된 클러스터를 관리하거나 Managed Service for Apache Spark에서 수동으로 관리할 수 있습니다. 클러스터를 정기적으로 업그레이드하는 것이 좋습니다.
키:
clusterName: 클러스터 이름location: 클러스터가 있는 리전projectId: 클러스터가 있는 프로젝트의 프로젝트 IDproperties: Spark 작업 속성의 맵입니다.
예:
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
dataprocOnGce > ephemeralCluster
작업 실행 후 생성되고 삭제되는 임시 Managed Service for Apache Spark 클러스터에서 실행합니다.
키:
clusterName: 클러스터 이름location: 클러스터가 있는 리전projectId: 클러스터가 있는 프로젝트의 프로젝트 IDimpersonationChain: 작업을 실행하는 데 사용할 서비스 계정 가장 체인입니다.resourceProfile: Managed Service for Apache Spark 클러스터 리소스 프로필입니다.사용 가능한 필드에 관한 설명은 Managed Service for Apache Spark 문서의 ClusterConfig를 참고하세요.
리소스 프로필은 다음 방법으로 지정할 수 있습니다.
inline: 파이프라인 구성의 일부로 정의됩니다.path: 상대 경로에 있는 파일에 정의됩니다.external_config_path: Cloud Storage 버킷에 있는 파일에 정의됩니다. 리소스 프로필 값을 업데이트하기 위해 커밋하고 배포해야 하는inline및path옵션과 달리 외부 리소스 프로필은 각 파이프라인 실행 시 확인되며 파이프라인을 다시 배포하지 않고도 업데이트할 수 있습니다.
override키를 사용하여 지정된 리소스 프로필에 재정의를 적용할 수 있습니다. 재정의는 제공된 사전 소스 프로필에 딥 병합으로 적용됩니다.properties: Spark 작업 속성의 맵입니다.
예:
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "example-ephemeral-cluster"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
Managed Service for Apache Spark 일괄 제출에서 실행합니다.
키:
location: Spark 작업을 실행해야 하는 리전입니다.impersonationChain: 작업을 실행하는 데 사용할 서비스 계정 가장 체인입니다.resourceProfile: Managed Service for Apache Spark 리소스 프로필입니다.리소스 프로필은 다음 방법으로 지정할 수 있습니다.
inline: 파이프라인 구성의 일부로 정의됩니다.path: 상대 경로에 있는 파일에 정의됩니다.external_config_path: Cloud Storage 버킷에 있는 파일에 정의됩니다. 리소스 프로필 값을 업데이트하기 위해 커밋하고 배포해야 하는inline및path옵션과 달리 외부 리소스 프로필은 각 파이프라인 실행 시 확인되며 파이프라인을 다시 배포하지 않고도 업데이트할 수 있습니다.
다음 키는 리소스 프로필 구성을 지정합니다.
environmentConfig: 환경 구성runtimeConfig: 런타임 구성
사용 가능한 필드에 대한 설명은 Managed Service for Apache Spark 문서의 RuntimeConfig 및 EnvironmentConfig를 참고하세요.
override키를 사용하여 지정된 리소스 프로필에 재정의를 적용할 수 있습니다. 재정의는 제공된 리소스 프로필에 딥 병합으로 적용됩니다.
예 (인라인):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
예 (외부 경로 및 재정의):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
bigQuery
BigQuery 작업으로 실행합니다.
키:
location: 대상 테이블이 있는 리전입니다.destinationTable: 데이터를 출력할 BigQuery 테이블impersonationChain: 작업을 실행하는 데 사용할 서비스 계정 가장 체인입니다.
예:
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
로컬
러너 환경에서 로컬로 실행합니다.
가상 환경을 구성하는 방법은 python 작업을 참고하세요.
예:
engine:
local: {}
프레임워크
작업에 사용되는 프레임워크입니다.
dbt > airflowWorker
dbt-core를 사용하여 러너 환경에서 Airflow 작업자로 실행되는 dbt 모델을 실행합니다.
키:
projectDirectoryPath: DBT 프로젝트가 포함된 폴더의 상대 경로입니다.selectModels: 이름으로 실행에 포함할 모델 목록입니다 (dbt --select과 동일).tags: 태그별 실행에 포함할 모델 목록입니다 (dbt --select과 동일).
예:
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform > airflowWorker
dataform core cli를 사용하여 러너 환경의 Airflow 작업자에서 실행되는 Dataform 워크플로
키:
projectDirectoryPath: Dataform 워크플로 정의가 포함된 폴더의 상대 경로입니다.
예:
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform > dataformService
Dataform 서비스에서 실행된 Dataform 워크플로를 실행합니다.
키:
location: Dataform 저장소가 있는 위치입니다.projectId: Dataform 저장소가 있는 프로젝트입니다.repositoryId: Dataform 저장소 IDworkflowInvocation: 실행할 작업을 지정하는 워크플로 호출 구성입니다. WorkflowInvocation을 참고하세요.
예:
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"