오케스트레이션 파이프라인 DSL 참조

이 페이지에는 오케스트레이션 파이프라인 DSL 참조가 포함되어 있습니다.

미리보기의 제한사항

미리보기 버전에서는 오케스트레이션 파이프라인에 다음과 같은 제한사항이 적용됩니다.

  • pysparknotebook 작업의 경우:

    • 모든 pysparknotebook 작업에 하나의 requirements.txt 파일만 지원됩니다.
    • Windows 플랫폼은 uv 도구를 통해 패키지를 빌드하는 데 지원되지 않습니다.
    • 사전 빌드된 바이너리가 있는 Python 패키지만 지원됩니다.
  • sql 작업의 경우:

    • query 키의 inline 정의는 지원되지 않습니다.

형식 및 값 정보

파이프라인은 YAML 형식으로 정의되며 저장소의 파이프라인별 별도 파일에 저장해야 합니다.

오케스트레이션 파이프라인은 파이프라인 정의 및 배포 구성에서 변수를 사용하는 여러 방법을 제공합니다. 예를 들어 맞춤 변수를 정의하고, GitHub 보안 비밀을 사용하고, 명령줄에서 변수 값을 대체할 수 있습니다. 자세한 내용은 변수, 보안 비밀, 대체를 참고하세요.

파이프라인 번들에 추가 파이프라인을 추가하는 방법에 관한 자세한 내용은 다른 파이프라인 추가를 참고하세요.

코드 예시

GitHub의 orchestration-pipelines 저장소에는 다양한 파이프라인 작업과 엔진 조합에 관한 최신 코드 예시가 있습니다. 오케스트레이션 파이프라인 기능을 살펴볼 때 이러한 예시를 시작점으로 사용하는 것이 좋습니다.

파이프라인 정의

파이프라인 정의에는 다음과 같은 최상위 키가 있습니다.

  • modelVersion: 파이프라인 정의 모델의 버전입니다. 최신 모델 버전은 1.0입니다.

  • pipelineId: 파이프라인의 고유 식별자입니다. 이 ID는 여러 배포와 버전에서 일관되게 유지되므로 논리 파이프라인 엔티티를 추적하고 관리할 수 있습니다.

  • description: 파이프라인 설명으로, 러너 환경의 Airflow DAG 설명에 매핑됩니다.

  • owner: 파이프라인의 소유자입니다.

  • tags: 파이프라인에 적용된 문자열 식별자로, 파이프라인을 필터링하는 데 사용됩니다.

  • notifications: 파이프라인 이벤트에 관한 알림입니다. 지원되는 알림 유형:

    • onPipelineFailure: 파이프라인 실패에 관한 이메일입니다.

    알림을 사용하려면 러너 환경에 SendGrid 이메일 서비스가 구성되어 있어야 합니다. 자세한 내용은 이메일 알림 구성을 참고하세요.

    예:

    notifications:
      onPipelineFailure:
        email: ["user1@example.com", "user2@example.com"]
    
  • runner: 타겟 오케스트레이션 엔진을 지정합니다. 나중에 사용하기 위해 예약되어 있습니다. 이 값을 airflow로 설정합니다.

  • defaults: 특정 작업 내에서 재정의되지 않는 한 모든 작업에 적용되는 project_id, location, executionConfig과 같은 속성의 기본값을 설정합니다. project_idlocation 속성은 개별 작업 속성으로 재정의할 수 있습니다. executionConfig 속성은 개별 작업에서 재정의할 수 없으며 retries 필드에서 파이프라인의 모든 작업에 대한 재시도 횟수를 지정합니다.

  • triggers: 파이프라인이 시작되는 방식을 정의합니다.

    • 값 없음. 파이프라인은 여전히 수동으로 트리거할 수 있습니다.

    • schedule. 크론 표현식을 사용하여 일정에 따라 파이프라인을 트리거합니다.

      예시 일정:

      triggers:
        - schedule:
            interval: "0 5 * * *"
            startTime: "2025-10-01T00:00:00"
            endTime: "2026-10-01T00:00:00"
            catchup: false
            timezone: "UTC"
      
  • actions

    실행할 작업의 매핑입니다. 각 매핑 항목은 하나의 작업에 해당합니다. 작업을 참고하세요.

작업

파이프라인 작업은 파이프라인 실행의 개별 단계를 정의합니다. 각 작업에는 엔진이나 프레임워크가 지정되어 있어야 합니다. 엔진 또는 프레임워크는 작업을 실행하는 데 사용되는 리소스를 결정합니다.

오케스트레이션 파이프라인은 다음 작업을 지원합니다.

  • PySpark (pyspark): PySpark 스크립트를 실행합니다.
  • 노트북 (notebook): 노트북 파일을 실행합니다.
  • SQL 쿼리 (notebook): SQL 쿼리를 실행합니다.
  • Python (python): Python 스크립트를 실행합니다.
  • 파이프라인 (pipeline): 데이터 처리 파이프라인을 실행합니다.

오케스트레이션 파이프라인은 다음 엔진과 프레임워크를 지원합니다.

  • dataprocOnGce > existingCluster: clusterName, project, location으로 식별되는 Managed Service for Apache Spark 클러스터입니다.

  • dataprocOnGce > ephemeral: 작업 실행 후 생성 및 삭제된 Managed Service for Apache Spark 클러스터

  • dataprocServerless: Managed Service for Apache Spark 일괄 제출.

  • bigquery: BigQuery 작업

  • python > local: 러너 환경의 Airflow 작업자에서 실행되는 Python 스크립트입니다.

  • dbt > airflowWorker: dbt-core를 사용하여 러너 환경의 Airflow 작업자에서 실행되는 dbt 모델입니다.

  • dataform > airflowWorker: dataform core cli를 사용하여 러너 환경의 Airflow 작업자에서 실행되는 Dataform 워크플로

  • dataform > dataformService: Dataform 서비스에서 실행되는 Dataform 워크플로

다음 표에는 가능한 작업 유형, 엔진, 프레임워크 조합이 나와 있습니다. 작업 코드 예시는 엔진 및 프레임워크 설명을 참고하세요.

작업 엔진 또는 프레임워크 다음으로 출력
pyspark dataprocOnGce > existingCluster Managed Service for Apache Spark 작업 로그
pyspark dataprocOnGce > ephemeralCluster Managed Service for Apache Spark 작업 로그
pyspark dataprocServerless Managed Service for Apache Spark 배치 로그
notebook dataprocOnGce > existingCluster composer_declarative_dags_resources 디렉터리 아래의 러너 버킷
notebook dataprocOnGce > ephemeralCluster Managed Service for Apache Spark 작업 로그
notebook dataprocServerless composer_declarative_dags_resources 디렉터리 아래의 러너 버킷
sql bigquery destinationTable 매개변수에 지정된 테이블
sql dataprocServerless Managed Service for Apache Spark 배치 로그입니다.
python local (로컬 실행) 로그
pipeline dbt > airflowWorker 로그 및 BigQuery
pipeline dataform > airflowWorker BigQuery의 지정된 테이블
pipeline dataform > dataformService Dataform

모든 작업에는 다음과 같은 공통 키가 있습니다. 다른 키는 작업 유형에 따라 다릅니다.

  • name: 작업 이름입니다. 이 이름은 러너 환경의 Airflow 작업 이름에 매핑됩니다. 작업에 두 개 이상의 Airflow 작업이 필요한 경우 이 이름은 작업 그룹에 매핑됩니다.

  • dependsOn: 이 작업이 종속된 업스트림 작업 이름의 목록으로, 실행 순서를 정의합니다. 업스트림 작업 중 하나라도 실패하면 이에 종속된 다운스트림 작업이 실행되지 않습니다.

  • executionTimeout: 작업을 실행하는 제한 시간입니다. 예: 1h, 30m, 40s

python

python 유형의 작업입니다. Python 스크립트를 실행합니다.

작업 유형별 키:

  • mainFilePath: Python 스크립트 파일의 상대 경로입니다.
  • pythonCallable: Python 스크립트에서 실행할 Python 호출 가능 객체의 이름입니다.
  • opKwargs: 연산자의 키워드 인수의 매핑입니다.
  • (선택사항) environment: 동적으로 생성된 Python 가상 환경 내에서 스크립트를 실행합니다.

    • requirements: 가상 환경의 요구사항입니다. 요구사항은 런타임에 해결됩니다.

      • inline: 요구사항이 인라인으로 지정됩니다.

        • list: 요구사항 목록입니다. PEP-508에 따라 개별 요구사항을 나열합니다.

          예:

          environment:
            requirements:
              inline:
                list: ["pandas>=2.0.0"]
          
      • (대체) path: 요구사항이 있는 파일의 경로입니다. 이 파일의 요구사항은 PEP-508에 따라 나열해야 합니다.

        예:

        environment:
          requirements:
            path: "scripts/requirements.txt"
        
    • systemSitePackages: true인 경우 가상 환경이 Airflow 작업자의 site-packages 디렉터리에서 패키지를 상속합니다. 러너 환경에 커스텀 PyPI 패키지를 설치할 수 있습니다.

  • engine:

    • local: 러너 환경에서의 로컬 실행

예:

로컬

modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 1

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - python:
      name: "first_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        systemSitePackages: true
        requirements:
          path: "scripts/requirements.txt"

  - python:
      name: "second_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        requirements:
          inline:
            list: ["pandas>=2.0.0"]
        systemSitePackages: true
      dependsOn: ["first_script_run"]

  - python:
      name: "third_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_2.py"
      pythonCallable: "main"
      engine:
        local: {}
      opKwargs:
        api_endpoint: "https://api.my-vendor.example.com/v1/status"
        api_key_secret_name: "my-vendor-api-key"
      dependsOn: ["first_script_run"]

pyspark

pyspark 유형의 작업입니다. PySpark 스크립트를 실행합니다.

작업 유형별 키:

  • mainFilePath: PySpark 스크립트의 상대 경로입니다.
  • archiveUris: 이 작업에 사용할 보관 파일 URI 목록입니다.
  • stagingBucket: 이 작업에 사용할 Cloud Storage 버킷입니다.
  • pyFiles: 이 Spark 작업에 사용할 Python 파일 목록입니다.
  • environment: Python 환경 구성입니다.

    • requirements: 사용할 Python 요구사항 파일입니다.

      • path: 요구사항이 있는 파일의 경로입니다. 이 파일의 요구사항은 PEP-508에 따라 나열해야 합니다.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

예:

existingCluster

modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 4 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-dataproc-cluster"
            location: "us-central1"
            projectId: "example-project"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"

ephemeralCluster

pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - pyspark:
      name: "run_dataproc_ephemeral"
      executionTimeout: "1h"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            projectId: "example-project"
            location: "us-central1"
            clusterName: "ephemeral-cluster-inline"
            resourceProfile:
              inline:
                config:
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
            properties:
              spark.submit.deployMode: "cluster"

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pyspark:
      name: "run-pyspark-on-dataproc-serverless"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            inline:
              environmentConfig:
                executionConfig:
                  serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
                  networkUri: "projects/example-project/global/networks/default"
              runtimeConfig:
                version: "2.3"
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

노트북

notebook 유형의 작업입니다. Papermill을 통해 .ipynb 노트북을 실행합니다.

작업 유형별 키:

  • mainFilePath: 노트북 파일의 상대 경로입니다.
  • archiveUris: 이 작업에 사용할 보관 파일 URI 목록입니다.
  • stagingBucket: 이 작업에 사용할 Cloud Storage 버킷입니다.
  • environment: Python 환경 구성입니다.

    • requirements: 사용할 Python 요구사항 파일입니다.

      • path: 요구사항이 있는 파일의 경로입니다. 이 파일의 요구사항은 PEP-508에 따라 나열해야 합니다.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

예:

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - notebook:
      name: "run-notebook-on-dataproc-serverless"
      mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
      archiveUris:
        - "gs://example-bucket-additional-data/custom_venv.tar.gz"
      staging_bucket: "example-bucket-additional-data-additional-data"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

sql

sql 유형의 작업입니다. SQL 쿼리를 실행합니다.

작업 유형별 키:

  • query: 쿼리를 정의합니다.

    • path: 쿼리가 배포 구성 파일의 상대 경로에 있는 파일에 정의됩니다.
    • inline: 쿼리가 인라인으로 정의됩니다.

  • engine:

    • bigQuery
    • dataprocServerless
    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster

bigQuery

modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run_bigquery_insert_job_create"
      query:
        inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
      engine:
        bigquery:
          location: "US"
  - sql:
      name: "run_bigquery_insert_job_select"
      query:
        path: "sql-scripts/count_rows.sql"
      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"
      dependsOn:
        - "run_bigquery_insert_job_create"

dataprocServerless

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-dataproc"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocServerless:
          location: "us-central1"
          impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
          resourceProfile:
            inline:
              runtimeConfig:
                properties:
                  spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
                  spark.sql.catalog.bigquery.project: "example-project"
                  dataproc.sparkBqConnector.version: "0.42.3"

existingCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-existing-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "cluster-sql"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"

ephemeralCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-ephemeral-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            clusterName: "example-ephemeral-cluster"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
            resourceProfile:
              inline:
                clusterConfig:
                  gceClusterConfig:
                    zoneUri: "us-central1-a"
                    metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
                      SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"

파이프라인

pipeline 유형의 작업입니다. 데이터 처리 파이프라인을 실행합니다.

작업 유형별 키:

  • framework:

    • dbt
    • dataform > airflowWorker
    • dataform > dataformService

예:

dbt

modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "dbt-action"
      framework:
        dbt:
          airflowWorker:
            projectDirectoryPath: "dbt_project"
            selectModels: ["model_1", "model_2"]

dataform>airflowWorker

modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          airflowWorker:
            projectDirectoryPath: "dataform_local"

dataform>dataformService

modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"

  - pipeline:
      name: "run_dataform_compilation"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
              invocationConfig:
                serviceAccount: "example-account@example-project.iam.gserviceaccount.com"

엔진

작업에 사용된 엔진입니다.

dataprocOnGce > existingCluster

clusterName, project, location으로 식별되는 기존 Managed Service for Apache Spark 클러스터에서 실행합니다.

배포 구성에서 지정된 클러스터를 관리하거나 Managed Service for Apache Spark에서 수동으로 관리할 수 있습니다. 클러스터를 정기적으로 업그레이드하는 것이 좋습니다.

키:

  • clusterName: 클러스터 이름
  • location: 클러스터가 있는 리전
  • projectId: 클러스터가 있는 프로젝트의 프로젝트 ID
  • properties: Spark 작업 속성의 맵입니다.

예:

engine:
  dataprocOnGce:
    existingCluster:
      clusterName: "example-dataproc-cluster"
      location: "us-central1"
      projectId: "example-project"
      impersonationChain: "example-account@example-project.iam.gserviceaccount.com"

dataprocOnGce > ephemeralCluster

작업 실행 후 생성되고 삭제되는 임시 Managed Service for Apache Spark 클러스터에서 실행합니다.

키:

  • clusterName: 클러스터 이름
  • location: 클러스터가 있는 리전
  • projectId: 클러스터가 있는 프로젝트의 프로젝트 ID
  • impersonationChain: 작업을 실행하는 데 사용할 서비스 계정 가장 체인입니다.
  • resourceProfile: Managed Service for Apache Spark 클러스터 리소스 프로필입니다.

    사용 가능한 필드에 관한 설명은 Managed Service for Apache Spark 문서의 ClusterConfig를 참고하세요.

    리소스 프로필은 다음 방법으로 지정할 수 있습니다.

    • inline: 파이프라인 구성의 일부로 정의됩니다.
    • path: 상대 경로에 있는 파일에 정의됩니다.
    • external_config_path: Cloud Storage 버킷에 있는 파일에 정의됩니다. 리소스 프로필 값을 업데이트하기 위해 커밋하고 배포해야 하는 inlinepath 옵션과 달리 외부 리소스 프로필은 각 파이프라인 실행 시 확인되며 파이프라인을 다시 배포하지 않고도 업데이트할 수 있습니다.

    override 키를 사용하여 지정된 리소스 프로필에 재정의를 적용할 수 있습니다. 재정의는 제공된 사전 소스 프로필에 딥 병합으로 적용됩니다.

  • properties: Spark 작업 속성의 맵입니다.

예:

engine:
  dataprocOnGce:
    ephemeralCluster:
      projectId: "example-project"
      location: "us-central1"
      clusterName: "example-ephemeral-cluster"
      resourceProfile:
        inline:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
            workerConfig:
              numInstances: 2
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
      properties:
        spark.submit.deployMode: "cluster"

dataprocServerless

Managed Service for Apache Spark 일괄 제출에서 실행합니다.

키:

  • location: Spark 작업을 실행해야 하는 리전입니다.
  • impersonationChain: 작업을 실행하는 데 사용할 서비스 계정 가장 체인입니다.
  • resourceProfile: Managed Service for Apache Spark 리소스 프로필입니다.

    리소스 프로필은 다음 방법으로 지정할 수 있습니다.

    • inline: 파이프라인 구성의 일부로 정의됩니다.
    • path: 상대 경로에 있는 파일에 정의됩니다.
    • external_config_path: Cloud Storage 버킷에 있는 파일에 정의됩니다. 리소스 프로필 값을 업데이트하기 위해 커밋하고 배포해야 하는 inlinepath 옵션과 달리 외부 리소스 프로필은 각 파이프라인 실행 시 확인되며 파이프라인을 다시 배포하지 않고도 업데이트할 수 있습니다.

    다음 키는 리소스 프로필 구성을 지정합니다.

    • environmentConfig: 환경 구성
    • runtimeConfig: 런타임 구성

    사용 가능한 필드에 대한 설명은 Managed Service for Apache Spark 문서의 RuntimeConfigEnvironmentConfig를 참고하세요.

    override 키를 사용하여 지정된 리소스 프로필에 재정의를 적용할 수 있습니다. 재정의는 제공된 리소스 프로필에 딥 병합으로 적용됩니다.

예 (인라인):

engine:
  dataprocServerless:
    location: "us-central1"
    resourceProfile:
      inline:
        environmentConfig:
          executionConfig:
            serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
            networkUri: "projects/example-project/global/networks/default"
        runtimeConfig:
          version: "2.3"
          properties:
            spark.app.name: "run-notebook-on-dataproc-serverless"
            spark.executor.instances: "2"
            spark.driver.cores: "4"

예 (외부 경로 및 재정의):

      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                    spark.app.name: "run-notebook-on-dataproc-serverless"
                    spark.executor.instances: "2"
                    spark.driver.cores: "4"

bigQuery

BigQuery 작업으로 실행합니다.

키:

  • location: 대상 테이블이 있는 리전입니다.
  • destinationTable: 데이터를 출력할 BigQuery 테이블
  • impersonationChain: 작업을 실행하는 데 사용할 서비스 계정 가장 체인입니다.

예:

      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"

로컬

러너 환경에서 로컬로 실행합니다.

가상 환경을 구성하는 방법은 python 작업을 참고하세요.

예:

    engine:
      local: {}

프레임워크

작업에 사용되는 프레임워크입니다.

dbt > airflowWorker

dbt-core를 사용하여 러너 환경에서 Airflow 작업자로 실행되는 dbt 모델을 실행합니다.

키:

  • projectDirectoryPath: DBT 프로젝트가 포함된 폴더의 상대 경로입니다.
  • selectModels: 이름으로 실행에 포함할 모델 목록입니다 (dbt --select과 동일).
  • tags: 태그별 실행에 포함할 모델 목록입니다 (dbt --select과 동일).

예:

framework:
  dbt:
    airflowWorker:
      projectDirectoryPath: "dbt_project"
      selectModels: ["model_1", "model_2"]

dataform > airflowWorker

dataform core cli를 사용하여 러너 환경의 Airflow 작업자에서 실행되는 Dataform 워크플로

키:

  • projectDirectoryPath: Dataform 워크플로 정의가 포함된 폴더의 상대 경로입니다.

예:

framework:
  dataform:
    airflowWorker:
      projectDirectoryPath: "dataform_local"

dataform > dataformService

Dataform 서비스에서 실행된 Dataform 워크플로를 실행합니다.

키:

  • location: Dataform 저장소가 있는 위치입니다.
  • projectId: Dataform 저장소가 있는 프로젝트입니다.
  • repositoryId: Dataform 저장소 ID
  • workflowInvocation: 실행할 작업을 지정하는 워크플로 호출 구성입니다. WorkflowInvocation을 참고하세요.

예:

framework:
  dataform:
    dataformService:
      location: "us-central1"
      projectId: "example-project"
      repositoryId: "example-repository"
      workflowInvocation:
        compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
        invocationConfig:
          serviceAccount: "example-account@example-project.iam.gserviceaccount.com"