Referensi DSL Orchestration Pipelines

Halaman ini berisi referensi DSL Orchestration Pipelines.

Batasan dalam Pratinjau

Saat dalam Pratinjau, Orchestration Pipelines memiliki batasan berikut:

  • Untuk tindakan pyspark dan notebook:

    • Hanya satu file requirements.txt untuk semua tindakan pyspark dan notebook yang didukung.
    • Platform Windows tidak didukung untuk membuat paket melalui alat uv.
    • Hanya paket Python dengan biner yang telah dibuat sebelumnya yang didukung.
  • Untuk tindakan sql:

    • Definisi inline di kunci query tidak didukung.

Tentang format dan nilai

Pipeline ditentukan dalam format YAML, dan harus disimpan dalam file terpisah, satu per pipeline, di repositori Anda.

Pipeline Orkestrasi menyediakan beberapa cara untuk menggunakan variabel dalam definisi pipeline dan konfigurasi deployment. Misalnya, Anda dapat menentukan variabel kustom, menggunakan secret GitHub, dan mengganti nilai variabel di command line. Untuk mengetahui informasi selengkapnya, lihat Variabel, secret, dan penggantian.

Untuk mengetahui informasi selengkapnya tentang cara menambahkan pipeline tambahan ke paket pipeline, lihat Menambahkan pipeline lain.

Contoh kode

Repositori orchestration-pipelines di GitHub memiliki contoh kode terbaru untuk berbagai kombinasi tindakan dan mesin pipeline. Sebaiknya gunakan contoh ini sebagai titik awal dalam mempelajari kemampuan Pipeline Orkestrasi.

Definisi pipeline

Definisi pipeline memiliki kunci tingkat teratas berikut:

  • modelVersion: Versi model definisi pipeline. Versi model terbaru adalah 1.0.

  • pipelineId: ID unik untuk pipeline. ID ini tetap konsisten di beberapa deployment dan versi, sehingga memungkinkan pelacakan dan pengelolaan entitas pipeline logis.

  • description: Deskripsi pipeline, yang dipetakan ke deskripsi DAG Airflow di lingkungan runner.

  • owner: Pemilik pipeline.

  • tags: ID string yang diterapkan pada pipeline, digunakan untuk memfilter pipeline.

  • notifications: notifikasi tentang peristiwa pipeline. Jenis notifikasi yang didukung:

    • onPipelineFailure: email saat pipeline gagal.

    Notifikasi memerlukan layanan email SendGrid yang dikonfigurasi di lingkungan runner Anda. Untuk mengetahui petunjuknya, lihat Mengonfigurasi notifikasi email.

    Contoh:

    notifications:
      onPipelineFailure:
        email: ["user1@example.com", "user2@example.com"]
    
  • runner: Menentukan mesin orkestrasi target. Dipesan untuk penggunaan di masa mendatang. Tetapkan nilai ini ke airflow.

  • defaults: Menetapkan nilai default untuk properti seperti project_id, location, dan executionConfig yang berlaku untuk semua tindakan kecuali jika diganti dalam tindakan tertentu. Properti project_id dan location dapat diganti oleh properti tindakan individual. Properti executionConfig tidak dapat diganti dalam setiap tindakan dan menentukan jumlah percobaan ulang untuk semua tindakan dalam pipeline di kolom retries.

  • triggers: Menentukan cara pipeline dimulai:

    • Tidak ada nilai. Pipeline masih dapat dipicu secara manual.

    • schedule. Memicu pipeline sesuai jadwal, menggunakan ekspresi cron.

      Contoh jadwal:

      triggers:
        - schedule:
            interval: "0 5 * * *"
            startTime: "2025-10-01T00:00:00"
            endTime: "2026-10-01T00:00:00"
            catchup: false
            timezone: "UTC"
      
  • actions

    Pemetaan tugas yang akan dieksekusi. Setiap entri pemetaan sesuai dengan satu tindakan. Lihat Tindakan.

Tindakan

Tindakan pipeline menentukan setiap langkah dalam eksekusi pipeline. Setiap tindakan harus memiliki mesin atau framework yang ditentukan untuknya. Mesin atau framework menentukan resource mana yang digunakan untuk menjalankan tindakan.

Pipeline Orkestrasi mendukung tindakan berikut:

  • Pyspark (pyspark): Menjalankan skrip PySpark.
  • Notebook (notebook): Menjalankan file notebook.
  • Kueri SQL (notebook): Jalankan kueri SQL.
  • Python (python): Jalankan skrip Python.
  • Pipeline (pipeline): Menjalankan pipeline pemrosesan data.

Pipeline Orkestrasi mendukung mesin dan framework berikut:

  • dataprocOnGce > existingCluster: Cluster Managed Service untuk Apache Spark yang diidentifikasi oleh clusterName, project, dan lokasi.

  • dataprocOnGce > ephemeral: Cluster Managed Service untuk Apache Spark dibuat dan dihapus setelah menjalankan tugas.

  • dataprocServerless: Managed Service untuk Apache Spark pengiriman batch.

  • bigquery: Tugas BigQuery.

  • python > local: Skrip Python yang dieksekusi di Airflow Worker dalam lingkungan pelaksana.

  • dbt > airflowWorker: Model dbt yang dieksekusi di pekerja Airflow dalam lingkungan runner menggunakan dbt-core.

  • dataform > airflowWorker: Alur kerja Dataform yang dijalankan di worker Airflow dalam lingkungan runner menggunakan dataform core cli.

  • dataform > dataformService: Alur kerja Dataform yang dijalankan di layanan Dataform.

Tabel berikut mencantumkan kemungkinan kombinasi jenis tindakan, mesin, dan framework. Lihat deskripsi mesin dan framework untuk contoh kode tindakan.

Tindakan Mesin atau Framework Output ke
pyspark dataprocOnGce > existingCluster Log tugas Managed Service untuk Apache Spark
pyspark dataprocOnGce > ephemeralCluster Log tugas Managed Service untuk Apache Spark
pyspark dataprocServerless Log Batch Managed Service untuk Apache Spark
notebook dataprocOnGce > existingCluster Bucket runner, di direktori composer_declarative_dags_resources
notebook dataprocOnGce > ephemeralCluster Log tugas Managed Service untuk Apache Spark
notebook dataprocServerless Bucket runner, di direktori composer_declarative_dags_resources
sql bigquery Tabel yang ditentukan dalam parameter destinationTable
sql dataprocServerless Log Batch Managed Service untuk Apache Spark.
python local (eksekusi lokal) Log
pipeline dbt > airflowWorker Log dan BigQuery
pipeline dataform > airflowWorker Tabel yang ditentukan di BigQuery
pipeline dataform > dataformService Di Dataform

Semua tindakan memiliki kunci umum berikut. Kunci lainnya bergantung pada jenis tindakan.

  • name: Nama tindakan. Nama ini dipetakan ke nama tugas Airflow di lingkungan pelaksana. Jika tindakan memerlukan lebih dari satu tugas Airflow, nama ini dipetakan ke grup tugas.

  • dependsOn: Daftar nama tindakan upstream yang menjadi dependensi tindakan ini, yang menentukan urutan eksekusi. Jika salah satu tindakan upstream gagal, tindakan downstream yang bergantung padanya tidak akan dijalankan.

  • executionTimeout: Waktu tunggu untuk menjalankan tindakan. Contoh: 1h, 30m, 40s.

python

Tindakan jenis python. Jalankan skrip Python.

Tombol khusus jenis tindakan:

  • mainFilePath: jalur relatif ke file skrip Python.
  • pythonCallable: nama yang dapat dipanggil Python untuk dieksekusi dalam skrip Python.
  • opKwargs: pemetaan argumen kata kunci untuk operator.
  • (Opsional) environment: jalankan skrip dalam Lingkungan Virtual Python yang dibuat secara dinamis.

    • requirements: persyaratan untuk Lingkungan Virtual. Persyaratan diselesaikan saat runtime.

      • inline: persyaratan ditentukan secara inline.

        • list: daftar persyaratan. Cantumkan setiap persyaratan sesuai dengan PEP-508.

          Contoh:

          environment:
            requirements:
              inline:
                list: ["pandas>=2.0.0"]
          
      • (Alternatif) path: Jalur ke file dengan persyaratan. Persyaratan dalam file ini harus dicantumkan sesuai dengan PEP-508.

        Contoh:

        environment:
          requirements:
            path: "scripts/requirements.txt"
        
    • systemSitePackages: Jika true, Lingkungan Virtual mewarisi paket dari direktori site-packages pekerja Airflow. Anda dapat menginstal paket PyPI kustom di lingkungan runner.

  • engine:

    • local: eksekusi lokal di lingkungan runner

Contoh:

lokal

modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 1

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - python:
      name: "first_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        systemSitePackages: true
        requirements:
          path: "scripts/requirements.txt"

  - python:
      name: "second_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        requirements:
          inline:
            list: ["pandas>=2.0.0"]
        systemSitePackages: true
      dependsOn: ["first_script_run"]

  - python:
      name: "third_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_2.py"
      pythonCallable: "main"
      engine:
        local: {}
      opKwargs:
        api_endpoint: "https://api.my-vendor.example.com/v1/status"
        api_key_secret_name: "my-vendor-api-key"
      dependsOn: ["first_script_run"]

pyspark

Tindakan jenis pyspark. Jalankan skrip PySpark.

Tombol khusus jenis tindakan:

  • mainFilePath: Jalur relatif ke skrip PySpark.
  • archiveUris: Daftar URI arsip yang akan digunakan dengan tindakan ini.
  • stagingBucket: Bucket Cloud Storage yang akan digunakan dengan tindakan ini.
  • pyFiles: Daftar file Python yang akan digunakan dengan tugas Spark ini.
  • environment: Konfigurasi lingkungan Python.

    • requirements: File persyaratan Python yang akan digunakan.

      • path: Jalur ke file dengan persyaratan. Persyaratan dalam file ini harus dicantumkan sesuai dengan PEP-508.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Contoh:

existingCluster

modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 4 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-dataproc-cluster"
            location: "us-central1"
            projectId: "example-project"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"

ephemeralCluster

pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - pyspark:
      name: "run_dataproc_ephemeral"
      executionTimeout: "1h"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            projectId: "example-project"
            location: "us-central1"
            clusterName: "ephemeral-cluster-inline"
            resourceProfile:
              inline:
                config:
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
            properties:
              spark.submit.deployMode: "cluster"

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pyspark:
      name: "run-pyspark-on-dataproc-serverless"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            inline:
              environmentConfig:
                executionConfig:
                  serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
                  networkUri: "projects/example-project/global/networks/default"
              runtimeConfig:
                version: "2.3"
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

notebook

Tindakan jenis notebook. Jalankan notebook .ipynb melalui Papermill.

Tombol khusus jenis tindakan:

  • mainFilePath: jalur relatif ke file notebook.
  • archiveUris: Daftar URI arsip yang akan digunakan dengan tindakan ini.
  • stagingBucket: Bucket Cloud Storage yang akan digunakan dengan tindakan ini.
  • environment: Konfigurasi lingkungan Python.

    • requirements: File persyaratan Python yang akan digunakan.

      • path: Jalur ke file dengan persyaratan. Persyaratan dalam file ini harus dicantumkan sesuai dengan PEP-508.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Contoh:

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - notebook:
      name: "run-notebook-on-dataproc-serverless"
      mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
      archiveUris:
        - "gs://example-bucket-additional-data/custom_venv.tar.gz"
      staging_bucket: "example-bucket-additional-data-additional-data"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

sql

Tindakan jenis sql. Menjalankan kueri SQL.

Tombol khusus jenis tindakan:

  • query: menentukan kueri.

    • path: kueri ditentukan dalam file yang berada di jalur relatif ke file konfigurasi deployment.
    • inline: kueri ditentukan secara inline.

  • engine:

    • bigQuery
    • dataprocServerless
    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster

bigQuery

modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run_bigquery_insert_job_create"
      query:
        inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
      engine:
        bigquery:
          location: "US"
  - sql:
      name: "run_bigquery_insert_job_select"
      query:
        path: "sql-scripts/count_rows.sql"
      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"
      dependsOn:
        - "run_bigquery_insert_job_create"

dataprocServerless

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-dataproc"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocServerless:
          location: "us-central1"
          impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
          resourceProfile:
            inline:
              runtimeConfig:
                properties:
                  spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
                  spark.sql.catalog.bigquery.project: "example-project"
                  dataproc.sparkBqConnector.version: "0.42.3"

existingCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-existing-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "cluster-sql"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"

ephemeralCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-ephemeral-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            clusterName: "example-ephemeral-cluster"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
            resourceProfile:
              inline:
                clusterConfig:
                  gceClusterConfig:
                    zoneUri: "us-central1-a"
                    metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
                      SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"

pipeline

Tindakan jenis pipeline. Jalankan pipeline pemrosesan data.

Tombol khusus jenis tindakan:

  • framework:

    • dbt
    • dataform > airflowWorker
    • dataform > dataformService

Contoh:

dbt

modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "dbt-action"
      framework:
        dbt:
          airflowWorker:
            projectDirectoryPath: "dbt_project"
            selectModels: ["model_1", "model_2"]

dataform>airflowWorker

modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          airflowWorker:
            projectDirectoryPath: "dataform_local"

dataform>dataformService

modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"

  - pipeline:
      name: "run_dataform_compilation"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
              invocationConfig:
                serviceAccount: "example-account@example-project.iam.gserviceaccount.com"

Mesin

Mesin yang digunakan dalam tindakan.

dataprocOnGce > existingCluster

Jalankan di cluster Managed Service untuk Apache Spark yang ada, yang diidentifikasi oleh clusterName, project, dan lokasi.

Anda dapat mengelola cluster yang ditentukan dalam konfigurasi deployment, atau secara manual di Managed Service untuk Apache Spark. Sebaiknya upgrade cluster secara rutin.

Kunci:

  • clusterName: Nama cluster
  • location: Region tempat cluster berada
  • projectId: Project ID project tempat cluster berada
  • properties: Peta properti tugas Spark.

Contoh:

engine:
  dataprocOnGce:
    existingCluster:
      clusterName: "example-dataproc-cluster"
      location: "us-central1"
      projectId: "example-project"
      impersonationChain: "example-account@example-project.iam.gserviceaccount.com"

dataprocOnGce > ephemeralCluster

Dieksekusi di cluster Managed Service untuk Apache Spark sementara, yang dibuat dan dihapus setelah menjalankan tugas.

Kunci:

  • clusterName: Nama cluster
  • location: Region tempat cluster berada
  • projectId: Project ID project tempat cluster berada
  • impersonationChain: Rantai peniruan akun layanan yang akan digunakan untuk menjalankan tindakan.
  • resourceProfile: Profil resource cluster Managed Service for Apache Spark.

    Untuk mengetahui deskripsi kolom yang tersedia, lihat ClusterConfig dalam dokumentasi Managed Service untuk Apache Spark.

    Profil resource dapat ditentukan dengan cara berikut:

    • inline: ditentukan sebagai bagian dari konfigurasi pipeline.
    • path: ditentukan dalam file yang berada di jalur relatif.
    • external_config_path: ditentukan dalam file yang berada di bucket Cloud Storage. Tidak seperti opsi inline dan path, yang mengharuskan penerapan dan men-deploy untuk memperbarui nilai profil resource, profil resource eksternal diselesaikan di setiap eksekusi pipeline dan Anda dapat memperbaruinya tanpa men-deploy ulang pipeline.

    Penggantian dapat diterapkan ke profil resource yang ditentukan dengan kunci override. Penggantian diterapkan dengan penggabungan mendalam ke profil presource yang disediakan.

  • properties: Peta properti tugas Spark.

Contoh:

engine:
  dataprocOnGce:
    ephemeralCluster:
      projectId: "example-project"
      location: "us-central1"
      clusterName: "example-ephemeral-cluster"
      resourceProfile:
        inline:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
            workerConfig:
              numInstances: 2
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
      properties:
        spark.submit.deployMode: "cluster"

dataprocServerless

Eksekusi di Managed Service untuk Apache Spark pengiriman batch.

Kunci:

  • location: Region tempat tugas Spark harus dijalankan.
  • impersonationChain: rantai peniruan akun layanan yang akan digunakan untuk menjalankan tindakan.
  • resourceProfile: Profil resource Managed Service for Apache Spark.

    Profil resource dapat ditentukan dengan cara berikut:

    • inline: ditentukan sebagai bagian dari konfigurasi pipeline.
    • path: ditentukan dalam file yang berada di jalur relatif.
    • external_config_path: ditentukan dalam file yang berada di bucket Cloud Storage. Tidak seperti opsi inline dan path, yang mengharuskan penerapan dan men-deploy untuk memperbarui nilai profil resource, profil resource eksternal diselesaikan di setiap eksekusi pipeline dan Anda dapat memperbaruinya tanpa men-deploy ulang pipeline.

    Kunci berikut menentukan konfigurasi profil resource:

    • environmentConfig: konfigurasi lingkungan
    • runtimeConfig: konfigurasi runtime

    Untuk deskripsi kolom yang tersedia, lihat RuntimeConfig dan EnvironmentConfig dalam dokumentasi Managed Service untuk Apache Spark.

    Penggantian dapat diterapkan ke profil resource yang ditentukan dengan kunci override. Penggantian diterapkan dengan penggabungan mendalam ke profil resource yang diberikan.

Contoh (inline):

engine:
  dataprocServerless:
    location: "us-central1"
    resourceProfile:
      inline:
        environmentConfig:
          executionConfig:
            serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
            networkUri: "projects/example-project/global/networks/default"
        runtimeConfig:
          version: "2.3"
          properties:
            spark.app.name: "run-notebook-on-dataproc-serverless"
            spark.executor.instances: "2"
            spark.driver.cores: "4"

Contoh (jalur eksternal dan penggantian):

      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                    spark.app.name: "run-notebook-on-dataproc-serverless"
                    spark.executor.instances: "2"
                    spark.driver.cores: "4"

bigQuery

Dieksekusi sebagai tugas BigQuery.

Kunci:

  • location: Region tempat tabel tujuan berada.
  • destinationTable: Tabel BigQuery untuk menampilkan data
  • impersonationChain: Rantai peniruan akun layanan yang akan digunakan untuk menjalankan tindakan.

Contoh:

      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"

lokal

Dieksekusi secara lokal di lingkungan runner.

Lihat tindakan python untuk mengetahui cara mengonfigurasi Lingkungan Virtual.

Contoh:

    engine:
      local: {}

Framework

Framework yang digunakan dalam tindakan.

dbt > airflowWorker

Jalankan model dbt yang dijalankan di pekerja Airflow di lingkungan runner menggunakan dbt-core.

Kunci:

  • projectDirectoryPath: Jalur relatif ke folder yang berisi project DBT.
  • selectModels: Daftar model yang akan disertakan dalam operasi menurut nama (setara dengan dbt --select).
  • tags: Daftar model yang akan disertakan dalam proses berdasarkan tag (setara dengan dbt --select).

Contoh:

framework:
  dbt:
    airflowWorker:
      projectDirectoryPath: "dbt_project"
      selectModels: ["model_1", "model_2"]

dataform > airflowWorker

Alur kerja Dataform yang dijalankan di pekerja Airflow di lingkungan runner menggunakan dataform core cli.

Kunci:

  • projectDirectoryPath: Jalur relatif ke folder yang berisi definisi alur kerja Dataform.

Contoh:

framework:
  dataform:
    airflowWorker:
      projectDirectoryPath: "dataform_local"

dataform > dataformService

Menjalankan alur kerja Dataform yang dieksekusi di layanan Dataform.

Kunci:

  • location: Lokasi tempat repositori Dataform berada.
  • projectId: Project tempat repositori Dataform berada.
  • repositoryId: ID repositori Dataform
  • workflowInvocation: Konfigurasi untuk pemanggilan alur kerja, yang menentukan tindakan mana yang akan dijalankan. Lihat WorkflowInvocation.

Contoh:

framework:
  dataform:
    dataformService:
      location: "us-central1"
      projectId: "example-project"
      repositoryId: "example-repository"
      workflowInvocation:
        compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
        invocationConfig:
          serviceAccount: "example-account@example-project.iam.gserviceaccount.com"