Riferimento DSL di Orchestration Pipelines

Questa pagina contiene il riferimento al DSL di Orchestration Pipelines.

Limitazione nell'anteprima

Durante l'anteprima, Orchestration Pipelines presenta le seguenti limitazioni:

  • Per le azioni pyspark e notebook:

    • È supportato un solo file requirements.txt per tutte le azioni pyspark e notebook.
    • La piattaforma Windows non è supportata per la creazione di pacchetti tramite lo strumento uv.
    • Sono supportati solo i pacchetti Python con binari precompilati.
  • Per le azioni sql:

    • La definizione di inline nella chiave query non è supportata.

Informazioni sul formato e sui valori

Le pipeline sono definite nel formato YAML e devono essere archiviate in file separati, uno per pipeline, nel repository.

Orchestration Pipelines offre diversi modi per utilizzare le variabili nelle definizioni delle pipeline e nella configurazione di deployment. Ad esempio, puoi definire variabili personalizzate, utilizzare i secret di GitHub e sostituire i valori delle variabili nella riga di comando. Per saperne di più, consulta Variabili, secret e sostituzione.

Per ulteriori informazioni sull'aggiunta di pipeline aggiuntive al bundle di pipeline, vedi Aggiungere un'altra pipeline.

Esempi di codice

Il repository orchestration-pipelines su GitHub contiene gli esempi di codice più recenti per molte combinazioni di azioni e motori delle pipeline. Ti consigliamo di utilizzare questi esempi come punto di partenza per esplorare le funzionalità di Orchestration Pipelines.

Definizione della pipeline

Una definizione di pipeline ha le seguenti chiavi di primo livello:

  • modelVersion: la versione del modello di definizione della pipeline. L'ultima versione del modello è 1.0.

  • pipelineId: un identificatore univoco per la pipeline. Questo ID rimane coerente in più deployment e versioni, consentendo il monitoraggio e la gestione dell'entità pipeline logica.

  • description: descrizione della pipeline, mappata alla descrizione del DAG Airflow nell'ambiente runner.

  • owner: proprietario della pipeline.

  • tags: identificatori stringa applicati alla pipeline, utilizzati per filtrare le pipeline.

  • notifications: notifiche sugli eventi della pipeline. Tipi di notifiche supportati:

    • onPipelineFailure: email sugli errori della pipeline.

    Le notifiche richiedono servizi email SendGrid configurati nell'ambiente runner. Per le istruzioni, vedi Configurare le notifiche email.

    Esempio:

    notifications:
      onPipelineFailure:
        email: ["user1@example.com", "user2@example.com"]
    
  • runner: specifica il motore di orchestrazione di destinazione. Riservato per l'uso futuro. Imposta airflow per questo valore.

  • defaults: imposta i valori predefiniti per proprietà come project_id, location e executionConfig che si applicano a tutte le azioni, a meno che non vengano sostituite all'interno di un'azione specifica. Le proprietà project_id e location possono essere ignorate dalle singole proprietà dell'azione. La proprietà executionConfig non può essere sostituita nelle singole azioni e specifica il numero di tentativi per tutte le azioni nella pipeline nel campo retries.

  • triggers: definisce la modalità di avvio della pipeline:

    • Nessun valore. La pipeline può comunque essere attivata manualmente.

    • schedule. Attiva la pipeline in base a una pianificazione, utilizzando le espressioni cron.

      Esempio di programma:

      triggers:
        - schedule:
            interval: "0 5 * * *"
            startTime: "2025-10-01T00:00:00"
            endTime: "2026-10-01T00:00:00"
            catchup: false
            timezone: "UTC"
      
  • actions

    Una mappatura delle attività da eseguire. Ogni voce di mappatura corrisponde a un'azione. Vedi Azioni.

Azioni

Le azioni pipeline definiscono i singoli passaggi dell'esecuzione della pipeline. Per ogni azione deve essere specificato un motore o un framework. Il motore o il framework determina quali risorse vengono utilizzate per eseguire l'azione.

Orchestration Pipelines supporta le seguenti azioni:

  • Pyspark (pyspark): esegui uno script PySpark.
  • Notebook (notebook): esegui un file notebook.
  • Query SQL (notebook): esegui una query SQL.
  • Python (python): esegui uno script Python.
  • Pipeline (pipeline): esegui una pipeline di elaborazione dati.

Orchestration Pipelines supporta i seguenti motori e framework:

  • dataprocOnGce > existingCluster: Cluster Managed Service for Apache Spark identificato da clusterName, project e location.

  • dataprocOnGce > ephemeral: Cluster Managed Service for Apache Spark creato ed eliminato dopo l'esecuzione del job.

  • dataprocServerless: invio batch di Managed Service for Apache Spark .

  • bigquery: job BigQuery.

  • python > local: script Python eseguito su un worker Airflow nell'ambiente runner.

  • dbt > airflowWorker: modelli dbt eseguiti su un worker Airflow nell'ambiente runner utilizzando dbt-core.

  • dataform > airflowWorker: flussi di lavoro Dataform eseguiti su un worker Airflow nell'ambiente runner utilizzando dataform core cli.

  • dataform > dataformService: flussi di lavoro Dataform eseguiti sul servizio Dataform.

La tabella seguente elenca le possibili combinazioni di tipo di azione, motore e framework. Consulta le descrizioni dei motori e dei framework per esempi di codici di azione.

Azione Motore o framework Output in
pyspark dataprocOnGce > existingCluster Log dei job Managed Service for Apache Spark
pyspark dataprocOnGce > ephemeralCluster Log dei job Managed Service for Apache Spark
pyspark dataprocServerless Log batch di Managed Service for Apache Spark
notebook dataprocOnGce > existingCluster Bucket runner, nella directory composer_declarative_dags_resources
notebook dataprocOnGce > ephemeralCluster Log dei job Managed Service for Apache Spark
notebook dataprocServerless Bucket runner, nella directory composer_declarative_dags_resources
sql bigquery Tabella specificata nel parametro destinationTable
sql dataprocServerless Log batch di Managed Service for Apache Spark.
python local (esecuzione locale) Log
pipeline dbt > airflowWorker Log e BigQuery
pipeline dataform > airflowWorker Tabella specificata in BigQuery
pipeline dataform > dataformService In Dataform

Tutte le azioni hanno le seguenti chiavi comuni. Gli altri tasti dipendono dal tipo di azione.

  • name: Nome dell'azione. Questo nome viene mappato al nome dell'attività Airflow nell'ambiente runner. Se un'azione richiede più di un'attività Airflow, questo nome viene mappato al gruppo di attività.

  • dependsOn: un elenco di nomi di azioni upstream da cui dipende questa azione, che definisce l'ordine di esecuzione. Se una delle azioni upstream non va a buon fine, le azioni downstream che dipendono da essa non vengono eseguite.

  • executionTimeout: timeout per l'esecuzione dell'azione. Esempi: 1h, 30m, 40s.

python

Azioni di tipo python. Esegui script Python.

Chiavi specifiche per il tipo di azione:

  • mainFilePath: percorso relativo del file script Python.
  • pythonCallable: nome del richiamabile Python da eseguire nello script Python.
  • opKwargs: una mappatura degli argomenti delle parole chiave per l'operatore.
  • (Facoltativo) environment: esegui lo script all'interno di un ambiente virtuale Python creato dinamicamente.

    • requirements: requisiti per l'ambiente virtuale. I requisiti vengono risolti in fase di esecuzione.

      • inline: i requisiti sono specificati inline.

        • list: elenco dei requisiti. Elenca i singoli requisiti in base a PEP-508.

          Esempio:

          environment:
            requirements:
              inline:
                list: ["pandas>=2.0.0"]
          
      • (Alternativa) path: percorso del file con i requisiti. I requisiti in questo file devono essere elencati in base a PEP-508.

        Esempio:

        environment:
          requirements:
            path: "scripts/requirements.txt"
        
    • systemSitePackages: se true, l'ambiente virtuale eredita i pacchetti dalla directory site-packages del worker Airflow. Puoi installare pacchetti PyPI personalizzati nel tuo ambiente runner.

  • engine:

    • local: esecuzione locale nell'ambiente runner

Esempio:

locale

modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 1

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - python:
      name: "first_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        systemSitePackages: true
        requirements:
          path: "scripts/requirements.txt"

  - python:
      name: "second_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        requirements:
          inline:
            list: ["pandas>=2.0.0"]
        systemSitePackages: true
      dependsOn: ["first_script_run"]

  - python:
      name: "third_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_2.py"
      pythonCallable: "main"
      engine:
        local: {}
      opKwargs:
        api_endpoint: "https://api.my-vendor.example.com/v1/status"
        api_key_secret_name: "my-vendor-api-key"
      dependsOn: ["first_script_run"]

pyspark

Azioni di tipo pyspark. Esegui script PySpark.

Chiavi specifiche per il tipo di azione:

  • mainFilePath: percorso relativo allo script PySpark.
  • archiveUris: un elenco di URI archivio da utilizzare con questa azione.
  • stagingBucket: il bucket Cloud Storage da utilizzare con questa azione.
  • pyFiles: un elenco di file Python da utilizzare con questo job Spark.
  • environment: configurazione dell'ambiente Python.

    • requirements: il file dei requisiti Python da utilizzare.

      • path: Percorso del file con i requisiti. I requisiti in questo file devono essere elencati in base a PEP-508.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Esempi:

existingCluster

modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 4 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-dataproc-cluster"
            location: "us-central1"
            projectId: "example-project"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"

ephemeralCluster

pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - pyspark:
      name: "run_dataproc_ephemeral"
      executionTimeout: "1h"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            projectId: "example-project"
            location: "us-central1"
            clusterName: "ephemeral-cluster-inline"
            resourceProfile:
              inline:
                config:
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
            properties:
              spark.submit.deployMode: "cluster"

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pyspark:
      name: "run-pyspark-on-dataproc-serverless"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            inline:
              environmentConfig:
                executionConfig:
                  serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
                  networkUri: "projects/example-project/global/networks/default"
              runtimeConfig:
                version: "2.3"
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

notebook

Azioni di tipo notebook. Esegui un notebook .ipynb tramite Papermill.

Chiavi specifiche per il tipo di azione:

  • mainFilePath: il percorso relativo del file del blocco note.
  • archiveUris: un elenco di URI archivio da utilizzare con questa azione.
  • stagingBucket: il bucket Cloud Storage da utilizzare con questa azione.
  • environment: configurazione dell'ambiente Python.

    • requirements: il file dei requisiti Python da utilizzare.

      • path: Percorso del file con i requisiti. I requisiti in questo file devono essere elencati in base a PEP-508.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Esempio:

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - notebook:
      name: "run-notebook-on-dataproc-serverless"
      mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
      archiveUris:
        - "gs://example-bucket-additional-data/custom_venv.tar.gz"
      staging_bucket: "example-bucket-additional-data-additional-data"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

sql

Azioni di tipo sql. Esegui query SQL.

Chiavi specifiche per il tipo di azione:

  • query: definisce una query.

    • path: la query è definita in un file che si trova nel percorso relativo del file di configurazione del deployment.
    • inline: la query è definita inline.

  • engine:

    • bigQuery
    • dataprocServerless
    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster

bigQuery

modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run_bigquery_insert_job_create"
      query:
        inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
      engine:
        bigquery:
          location: "US"
  - sql:
      name: "run_bigquery_insert_job_select"
      query:
        path: "sql-scripts/count_rows.sql"
      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"
      dependsOn:
        - "run_bigquery_insert_job_create"

dataprocServerless

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-dataproc"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocServerless:
          location: "us-central1"
          impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
          resourceProfile:
            inline:
              runtimeConfig:
                properties:
                  spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
                  spark.sql.catalog.bigquery.project: "example-project"
                  dataproc.sparkBqConnector.version: "0.42.3"

existingCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-existing-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "cluster-sql"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"

ephemeralCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-ephemeral-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            clusterName: "example-ephemeral-cluster"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
            resourceProfile:
              inline:
                clusterConfig:
                  gceClusterConfig:
                    zoneUri: "us-central1-a"
                    metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
                      SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"

pipeline

Azioni di tipo pipeline. Esegui una pipeline di elaborazione dati.

Chiavi specifiche per il tipo di azione:

  • framework:

    • dbt
    • dataform > airflowWorker
    • dataform > dataformService

Esempi:

Logo dbt

modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "dbt-action"
      framework:
        dbt:
          airflowWorker:
            projectDirectoryPath: "dbt_project"
            selectModels: ["model_1", "model_2"]

dataform>airflowWorker

modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          airflowWorker:
            projectDirectoryPath: "dataform_local"

dataform>dataformService

modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"

  - pipeline:
      name: "run_dataform_compilation"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
              invocationConfig:
                serviceAccount: "example-account@example-project.iam.gserviceaccount.com"

Motori

Motori utilizzati nelle azioni.

dataprocOnGce > existingCluster

Esegui in un cluster Managed Service for Apache Spark esistente identificato da clusterName, progetto e località.

Puoi gestire il cluster specificato nella configurazione di deployment o manualmente in Managed Service for Apache Spark. Ti consigliamo di eseguire regolarmente l'upgrade del cluster.

Chiavi:

  • clusterName: Nome del cluster
  • location: la regione in cui si trova il cluster
  • projectId: l'ID progetto del progetto in cui si trova il cluster
  • properties: una mappa delle proprietà del job Spark.

Esempio:

engine:
  dataprocOnGce:
    existingCluster:
      clusterName: "example-dataproc-cluster"
      location: "us-central1"
      projectId: "example-project"
      impersonationChain: "example-account@example-project.iam.gserviceaccount.com"

dataprocOnGce > ephemeralCluster

Esegui in un cluster Managed Service for Apache Spark effimero, che viene creato ed eliminato dopo l'esecuzione del job.

Chiavi:

  • clusterName: Nome del cluster
  • location: la regione in cui si trova il cluster
  • projectId: l'ID progetto del progetto in cui si trova il cluster
  • impersonationChain: la catena di simulazione dell'identità del service account da utilizzare per eseguire l'azione.
  • resourceProfile: profilo delle risorse del cluster Managed Service for Apache Spark.

    Per la descrizione dei campi disponibili, consulta ClusterConfig nella documentazione di Managed Service for Apache Spark.

    Un profilo della risorsa può essere specificato nei seguenti modi:

    • inline: definito come parte della configurazione della pipeline.
    • path: definito in un file che si trova nel percorso relativo.
    • external_config_path: definito in un file che si trova in un bucket Cloud Storage. A differenza delle opzioni inline e path, che richiedono l'esecuzione del commit e del deployment per aggiornare i valori del profilo della risorsa, un profilo della risorsa esterna viene risolto a ogni esecuzione della pipeline e puoi aggiornarlo senza eseguire nuovamente il deployment della pipeline.

    Gli override possono essere applicati al profilo della risorsa specificato con la chiave override. Gli override vengono applicati con l'unione profonda al profilo presource fornito.

  • properties: una mappa delle proprietà del job Spark.

Esempio:

engine:
  dataprocOnGce:
    ephemeralCluster:
      projectId: "example-project"
      location: "us-central1"
      clusterName: "example-ephemeral-cluster"
      resourceProfile:
        inline:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
            workerConfig:
              numInstances: 2
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
      properties:
        spark.submit.deployMode: "cluster"

dataprocServerless

Esegui l'invio di batch in Managed Service for Apache Spark.

Chiavi:

  • location: la regione in cui deve essere eseguito il job Spark.
  • impersonationChain: catena di simulazione dell'identità del service account da utilizzare per eseguire l'azione.
  • resourceProfile: profilo delle risorse di Managed Service for Apache Spark.

    Un profilo della risorsa può essere specificato nei seguenti modi:

    • inline: definito come parte della configurazione della pipeline.
    • path: definito in un file che si trova nel percorso relativo.
    • external_config_path: definito in un file che si trova in un bucket Cloud Storage. A differenza delle opzioni inline e path, che richiedono l'esecuzione del commit e del deployment per aggiornare i valori del profilo della risorsa, un profilo della risorsa esterna viene risolto a ogni esecuzione della pipeline e puoi aggiornarlo senza eseguire nuovamente il deployment della pipeline.

    Le seguenti chiavi specificano la configurazione del profilo delle risorse:

    • environmentConfig: configurazione dell'ambiente
    • runtimeConfig: configurazione di runtime

    Per la descrizione dei campi disponibili, consulta RuntimeConfig e EnvironmentConfig nella documentazione di Managed Service for Apache Spark.

    Gli override possono essere applicati al profilo della risorsa specificato con la chiave override. Gli override vengono applicati con l'unione profonda al profilo della risorsa fornito.

Esempio (in linea):

engine:
  dataprocServerless:
    location: "us-central1"
    resourceProfile:
      inline:
        environmentConfig:
          executionConfig:
            serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
            networkUri: "projects/example-project/global/networks/default"
        runtimeConfig:
          version: "2.3"
          properties:
            spark.app.name: "run-notebook-on-dataproc-serverless"
            spark.executor.instances: "2"
            spark.driver.cores: "4"

Esempio (percorso esterno e override):

      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                    spark.app.name: "run-notebook-on-dataproc-serverless"
                    spark.executor.instances: "2"
                    spark.driver.cores: "4"

bigQuery

Esegui come job BigQuery.

Chiavi:

Esempio:

      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"

locale

Esegui localmente nell'ambiente runner.

Consulta l'azione python per scoprire come configurare l'ambiente virtuale.

Esempio:

    engine:
      local: {}

Framework

Framework utilizzati nelle azioni.

dbt > airflowWorker

Esegui un modello dbt eseguito su un worker Airflow nell'ambiente runner utilizzando dbt-core.

Chiavi:

  • projectDirectoryPath: percorso relativo a una cartella che contiene il progetto DBT.
  • selectModels: Elenco dei modelli da includere nell'esecuzione per nome (equivalente a dbt --select).
  • tags: Elenco dei modelli da includere nell'esecuzione per tag (equivalente a dbt --select).

Esempio:

framework:
  dbt:
    airflowWorker:
      projectDirectoryPath: "dbt_project"
      selectModels: ["model_1", "model_2"]

dataform > airflowWorker

Workflow Dataform eseguiti su un worker Airflow nell'ambiente runner utilizzando dataform core cli.

Chiavi:

  • projectDirectoryPath: percorso relativo a una cartella che contiene le definizioni del workflow Dataform.

Esempio:

framework:
  dataform:
    airflowWorker:
      projectDirectoryPath: "dataform_local"

dataform > dataformService

Esegue i workflow Dataform eseguiti sul servizio Dataform.

Chiavi:

  • location: la posizione in cui si trova il repository Dataform.
  • projectId: il progetto in cui si trova il repository Dataform.
  • repositoryId: ID repository Dataform
  • workflowInvocation: configurazione per la chiamata del workflow, che specifica le azioni da eseguire. Vedi WorkflowInvocation.

Esempio:

framework:
  dataform:
    dataformService:
      location: "us-central1"
      projectId: "example-project"
      repositoryId: "example-repository"
      workflowInvocation:
        compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
        invocationConfig:
          serviceAccount: "example-account@example-project.iam.gserviceaccount.com"