Referência da DSL de Orquestração de Pipelines

Esta página contém a referência da DSL de Orquestração de Pipelines.

Limitação no pré-lançamento

Durante o pré-lançamento, a Orquestração de Pipelines tem as seguintes limitações:

  • Para ações pyspark e notebook:

    • Apenas um arquivo requirements.txt para todas as ações pyspark e notebook é aceito.
    • A plataforma Windows não é compatível com a criação de pacotes usando a ferramenta uv.
    • Apenas pacotes Python com binários pré-criados são compatíveis.
  • Para ações do sql:

    • A definição inline na chave query não é compatível.

Sobre o formato e os valores

Os pipelines são definidos no formato YAML e precisam ser armazenados em arquivos separados, um por pipeline, no seu repositório.

A Orquestração de Pipelines oferece várias maneiras de usar variáveis nas definições de pipeline e na configuração de implantação. Por exemplo, é possível definir variáveis personalizadas, usar secrets do GitHub e substituir valores de variáveis na linha de comando. Para mais informações, consulte Variáveis, secrets e substituição.

Para mais informações sobre como adicionar outros pipelines ao pacote, consulte Adicionar outro pipeline.

Exemplos de código

O repositório orchestration-pipelines no GitHub tem os exemplos de código mais recentes para muitas ações de pipeline e combinações de mecanismos. Recomendamos esses exemplos como ponto de partida para conhecer os recursos da Orquestração de Pipelines.

Definição de pipeline

Uma definição de pipeline tem as seguintes chaves de nível superior:

  • modelVersion: a versão do modelo de definição de pipeline. A versão mais recente do modelo é 1.0.

  • pipelineId: um identificador exclusivo do pipeline. Esse ID permanece consistente em várias implantações e versões, permitindo o rastreamento e o gerenciamento da entidade de pipeline lógico.

  • description: descrição do pipeline, que é mapeada para a descrição do DAG do Airflow no ambiente do executor.

  • owner: proprietário do pipeline.

  • tags: identificadores de string aplicados ao pipeline, usados para filtrar os pipelines.

  • notifications: notificações sobre eventos do pipeline. Tipos de notificação compatíveis:

    • onPipelineFailure: e-mail sobre falhas no pipeline.

    As notificações exigem que os serviços de e-mail do SendGrid estejam configurados no ambiente do executor. Para instruções, consulte Configurar notificações por e-mail.

    Exemplo:

    notifications:
      onPipelineFailure:
        email: ["user1@example.com", "user2@example.com"]
    
  • runner: especifica o mecanismo de orquestração de destino. Reservado para uso futuro. Defina esse valor como airflow.

  • defaults: define valores padrão para propriedades como project_id, location e executionConfig que se aplicam a todas as ações, a menos que sejam substituídas em uma ação específica. As propriedades project_id e location podem ser substituídas por propriedades de ação individuais. A propriedade executionConfig não pode ser substituída em ações individuais e especifica o número de novas tentativas para todas as ações no pipeline no campo retries.

  • triggers: define como o pipeline é iniciado:

    • Nenhum valor. O pipeline ainda pode ser acionado manualmente.

    • schedule. Acione o pipeline em uma programação usando expressões cron.

      Exemplo de programação:

      triggers:
        - schedule:
            interval: "0 5 * * *"
            startTime: "2025-10-01T00:00:00"
            endTime: "2026-10-01T00:00:00"
            catchup: false
            timezone: "UTC"
      
  • actions

    Um mapeamento de tarefas a serem executadas. Cada entrada de mapeamento corresponde a uma ação. Consulte Ações.

Ações

As ações de pipeline definem etapas individuais na execução do pipeline. Cada ação precisa ter um mecanismo ou framework especificado. O mecanismo ou framework determina quais recursos são usados para executar a ação.

Orquestração de Pipelines é compatível com as seguintes ações:

  • PySpark (pyspark): executa um script PySpark.
  • Notebook (notebook): execute um arquivo de notebook.
  • Consulta SQL (notebook): execute uma consulta SQL.
  • Python (python): execute um script Python.
  • Pipeline (pipeline): execute um pipeline de tratamento de dados.

A Orquestração de Pipelines é compatível com os seguintes mecanismos e frameworks:

  • dataprocOnGce > existingCluster: Cluster do Serviço Gerenciado para Apache Spark identificado por clusterName, projeto e local.

  • dataprocOnGce > ephemeral: Cluster do Serviço Gerenciado para Apache Spark criado e excluído após a execução do job.

  • dataprocServerless: Serviço Gerenciado para Apache Spark envio em lote.

  • bigquery: um job do BigQuery.

  • python > local: script Python executado em um worker do Airflow no ambiente do executor.

  • dbt > airflowWorker: modelos do dbt executados em um worker do Airflow no ambiente do executor usando dbt-core.

  • dataform > airflowWorker: fluxos de trabalho do Dataform executados em um worker do Airflow no ambiente do runner usando dataform core cli.

  • dataform > dataformService: fluxos de trabalho do Dataform executados no serviço do Dataform.

A tabela a seguir lista as possíveis combinações de tipo de ação, mecanismo e framework. Consulte as descrições de mecanismos e frameworks para exemplos de código de ação.

Ação Mecanismo ou framework Saídas para
pyspark dataprocOnGce > existingCluster Registros de jobs do Serviço Gerenciado para Apache Spark
pyspark dataprocOnGce > ephemeralCluster Registros de jobs do Serviço Gerenciado para Apache Spark
pyspark dataprocServerless Registros em lote do Serviço Gerenciado para Apache Spark
notebook dataprocOnGce > existingCluster Bucket do executor, no diretório composer_declarative_dags_resources
notebook dataprocOnGce > ephemeralCluster Registros de jobs do Serviço Gerenciado para Apache Spark
notebook dataprocServerless Bucket do executor, no diretório composer_declarative_dags_resources
sql bigquery Tabela especificada no parâmetro destinationTable
sql dataprocServerless Registros em lote do Serviço Gerenciado para Apache Spark.
python local (execução local) Registros
pipeline dbt > airflowWorker Registros e BigQuery
pipeline dataform > airflowWorker Tabela especificada no BigQuery
pipeline dataform > dataformService No Dataform

Todas as ações têm as seguintes chaves comuns. Outras chaves dependem do tipo de ação.

  • name: nome da ação. Esse nome é mapeado para o nome da tarefa do Airflow no ambiente do runner. Se uma ação exigir mais de uma tarefa do Airflow, esse nome será mapeado para o grupo de tarefas.

  • dependsOn: uma lista de nomes de ações upstream de que esta ação depende, definindo a ordem de execução. Se alguma das ações upstream falhar, as ações downstream que dependem delas não serão executadas.

  • executionTimeout: tempo limite para executar a ação. Exemplos: 1h, 30m, 40s.

python

Ações do tipo python. Executar scripts Python.

Chaves específicas do tipo de ação:

  • mainFilePath: caminho relativo para o arquivo de script Python.
  • pythonCallable: nome do Python chamável a ser executado no script Python.
  • opKwargs: um mapeamento de argumentos de palavra-chave para o operador.
  • (Opcional) environment: execute o script em um ambiente virtual do Python criado dinamicamente.

    • requirements: requisitos para o ambiente virtual. Os requisitos são resolvidos no tempo de execução.

      • inline: os requisitos são especificados inline.

        • list: lista de requisitos. Liste os requisitos individuais de acordo com a PEP-508.

          Exemplo:

          environment:
            requirements:
              inline:
                list: ["pandas>=2.0.0"]
          
      • (Alternativa) path: caminho para o arquivo com requisitos. Os requisitos neste arquivo precisam ser listados de acordo com PEP-508.

        Exemplo:

        environment:
          requirements:
            path: "scripts/requirements.txt"
        
    • systemSitePackages: se true, o ambiente virtual vai herdar pacotes do diretório site-packages do worker do Airflow. É possível instalar pacotes PyPI personalizados no ambiente do runner.

  • engine:

    • local: execução local no ambiente do executor

Exemplo:

Local

modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 1

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - python:
      name: "first_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        systemSitePackages: true
        requirements:
          path: "scripts/requirements.txt"

  - python:
      name: "second_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        requirements:
          inline:
            list: ["pandas>=2.0.0"]
        systemSitePackages: true
      dependsOn: ["first_script_run"]

  - python:
      name: "third_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_2.py"
      pythonCallable: "main"
      engine:
        local: {}
      opKwargs:
        api_endpoint: "https://api.my-vendor.example.com/v1/status"
        api_key_secret_name: "my-vendor-api-key"
      dependsOn: ["first_script_run"]

pyspark

Ações do tipo pyspark. Executar scripts PySpark.

Chaves específicas do tipo de ação:

  • mainFilePath: caminho relativo para o script PySpark.
  • archiveUris: uma lista de URIs de arquivo a serem usados com essa ação.
  • stagingBucket: bucket do Cloud Storage a ser usado com essa ação.
  • pyFiles: uma lista de arquivos Python a serem usados com este job do Spark.
  • environment: configuração do ambiente Python.

    • requirements: arquivo de requisitos do Python a ser usado.

      • path: caminho para o arquivo com requisitos. Os requisitos neste arquivo precisam ser listados de acordo com PEP-508.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Exemplos:

existingCluster

modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 4 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-dataproc-cluster"
            location: "us-central1"
            projectId: "example-project"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"

ephemeralCluster

pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - pyspark:
      name: "run_dataproc_ephemeral"
      executionTimeout: "1h"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            projectId: "example-project"
            location: "us-central1"
            clusterName: "ephemeral-cluster-inline"
            resourceProfile:
              inline:
                config:
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
            properties:
              spark.submit.deployMode: "cluster"

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pyspark:
      name: "run-pyspark-on-dataproc-serverless"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            inline:
              environmentConfig:
                executionConfig:
                  serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
                  networkUri: "projects/example-project/global/networks/default"
              runtimeConfig:
                version: "2.3"
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

notebook

Ações do tipo notebook. Executar um notebook .ipynb usando o Papermill.

Chaves específicas do tipo de ação:

  • mainFilePath: caminho relativo para o arquivo de notebook.
  • archiveUris: uma lista de URIs de arquivo a serem usados com essa ação.
  • stagingBucket: bucket do Cloud Storage a ser usado com essa ação.
  • environment: configuração do ambiente Python.

    • requirements: arquivo de requisitos do Python a ser usado.

      • path: caminho para o arquivo com requisitos. Os requisitos neste arquivo precisam ser listados de acordo com PEP-508.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Exemplo:

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - notebook:
      name: "run-notebook-on-dataproc-serverless"
      mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
      archiveUris:
        - "gs://example-bucket-additional-data/custom_venv.tar.gz"
      staging_bucket: "example-bucket-additional-data-additional-data"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

sql

Ações do tipo sql. Executar consultas SQL.

Chaves específicas do tipo de ação:

  • query: define uma consulta.

    • path: a consulta é definida em um arquivo localizado no caminho relativo ao arquivo de configuração da implantação.
    • inline: a consulta é definida in-line.

  • engine:

    • bigQuery
    • dataprocServerless
    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster

bigQuery

modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run_bigquery_insert_job_create"
      query:
        inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
      engine:
        bigquery:
          location: "US"
  - sql:
      name: "run_bigquery_insert_job_select"
      query:
        path: "sql-scripts/count_rows.sql"
      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"
      dependsOn:
        - "run_bigquery_insert_job_create"

dataprocServerless

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-dataproc"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocServerless:
          location: "us-central1"
          impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
          resourceProfile:
            inline:
              runtimeConfig:
                properties:
                  spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
                  spark.sql.catalog.bigquery.project: "example-project"
                  dataproc.sparkBqConnector.version: "0.42.3"

existingCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-existing-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "cluster-sql"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"

ephemeralCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-ephemeral-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            clusterName: "example-ephemeral-cluster"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
            resourceProfile:
              inline:
                clusterConfig:
                  gceClusterConfig:
                    zoneUri: "us-central1-a"
                    metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
                      SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"

pipeline

Ações do tipo pipeline. Executar um pipeline de tratamento de dados.

Chaves específicas do tipo de ação:

  • framework:

    • dbt
    • dataform > airflowWorker
    • dataform > dataformService

Exemplos:

dbt

modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "dbt-action"
      framework:
        dbt:
          airflowWorker:
            projectDirectoryPath: "dbt_project"
            selectModels: ["model_1", "model_2"]

dataform>airflowWorker

modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          airflowWorker:
            projectDirectoryPath: "dataform_local"

dataform>dataformService

modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"

  - pipeline:
      name: "run_dataform_compilation"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
              invocationConfig:
                serviceAccount: "example-account@example-project.iam.gserviceaccount.com"

Mecanismos

Mecanismos usados em ações.

dataprocOnGce > existingCluster

Executar em um cluster do Serviço Gerenciado para Apache Spark identificado por clusterName, projeto e local.

Você pode gerenciar o cluster especificado na sua configuração de implantação ou manualmente no Serviço Gerenciado para Apache Spark. Recomendamos manter o cluster atualizado regularmente.

Chaves:

  • clusterName: nome do cluster.
  • location: região em que o cluster está localizado.
  • projectId: ID do projeto em que o cluster está localizado
  • properties: um mapa de propriedades de job do Spark.

Exemplo:

engine:
  dataprocOnGce:
    existingCluster:
      clusterName: "example-dataproc-cluster"
      location: "us-central1"
      projectId: "example-project"
      impersonationChain: "example-account@example-project.iam.gserviceaccount.com"

dataprocOnGce > ephemeralCluster

Executar em um cluster efêmero do Serviço Gerenciado para Apache Spark, que é criado e excluído após a execução do job.

Chaves:

  • clusterName: nome do cluster.
  • location: região em que o cluster está localizado.
  • projectId: ID do projeto em que o cluster está localizado
  • impersonationChain: uma cadeia de identidade temporária de conta de serviço a ser usada para executar a ação.
  • resourceProfile: perfil de recursos do cluster do Serviço Gerenciado para Apache Spark.

    Para a descrição dos campos disponíveis, consulte ClusterConfig na documentação do Serviço Gerenciado para Apache Spark.

    Um perfil de recurso pode ser especificado das seguintes maneiras:

    • inline: definido como parte da configuração do pipeline.
    • path: definido em um arquivo localizado no caminho relativo.
    • external_config_path: definido em um arquivo localizado em um bucket do Cloud Storage. Ao contrário das opções inline e path, que exigem fazer commit e implantar para atualizar os valores do perfil de recurso, um perfil de recurso externo é resolvido em cada execução do pipeline, e você pode atualizá-lo sem reimplantar o pipeline.

    As substituições podem ser aplicadas ao perfil de recurso especificado com a chave override. As substituições são aplicadas com mesclagem profunda ao perfil de pré-fonte fornecido.

  • properties: um mapa de propriedades de job do Spark.

Exemplo:

engine:
  dataprocOnGce:
    ephemeralCluster:
      projectId: "example-project"
      location: "us-central1"
      clusterName: "example-ephemeral-cluster"
      resourceProfile:
        inline:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
            workerConfig:
              numInstances: 2
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
      properties:
        spark.submit.deployMode: "cluster"

dataprocServerless

Executar no Serviço Gerenciado para Apache Spark um envio em lote.

Chaves:

  • location: região em que o job do Spark precisa ser executado.
  • impersonationChain: cadeia de identidade temporária de conta de serviço a ser usada para executar a ação.
  • resourceProfile: perfil de recursos do Serviço Gerenciado para Apache Spark.

    Um perfil de recurso pode ser especificado das seguintes maneiras:

    • inline: definido como parte da configuração do pipeline.
    • path: definido em um arquivo localizado no caminho relativo.
    • external_config_path: definido em um arquivo localizado em um bucket do Cloud Storage. Ao contrário das opções inline e path, que exigem fazer commit e implantar para atualizar os valores do perfil de recurso, um perfil de recurso externo é resolvido em cada execução do pipeline, e você pode atualizá-lo sem reimplantar o pipeline.

    As chaves a seguir especificam a configuração do perfil de recurso:

    • environmentConfig: configuração do ambiente
    • runtimeConfig: configuração do ambiente de execução

    Para a descrição dos campos disponíveis, consulte RuntimeConfig e EnvironmentConfig na documentação do Serviço gerenciado para Apache Spark.

    As substituições podem ser aplicadas ao perfil de recurso especificado com a chave override. As substituições são aplicadas com mesclagem profunda ao perfil de recurso fornecido.

Exemplo (inline):

engine:
  dataprocServerless:
    location: "us-central1"
    resourceProfile:
      inline:
        environmentConfig:
          executionConfig:
            serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
            networkUri: "projects/example-project/global/networks/default"
        runtimeConfig:
          version: "2.3"
          properties:
            spark.app.name: "run-notebook-on-dataproc-serverless"
            spark.executor.instances: "2"
            spark.driver.cores: "4"

Exemplo (caminho externo e substituições):

      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                    spark.app.name: "run-notebook-on-dataproc-serverless"
                    spark.executor.instances: "2"
                    spark.driver.cores: "4"

bigQuery

Executar como um job do BigQuery.

Chaves:

Exemplo:

      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"

Local

Executar localmente no ambiente do executor.

Consulte a ação python para saber como configurar o ambiente virtual.

Exemplo:

    engine:
      local: {}

Frameworks

Estruturas usadas em ações.

dbt > airflowWorker

Execute um modelo do dbt em um worker do Airflow no ambiente do runner usando dbt-core.

Chaves:

  • projectDirectoryPath: caminho relativo para uma pasta que contém o projeto do DBT.
  • selectModels: lista de modelos a serem incluídos na execução por nome (equivalente a dbt --select).
  • tags: lista de modelos a serem incluídos na execução por tag (equivalente a dbt --select).

Exemplo:

framework:
  dbt:
    airflowWorker:
      projectDirectoryPath: "dbt_project"
      selectModels: ["model_1", "model_2"]

dataform > airflowWorker

Fluxos de trabalho do Dataform executados em um worker do Airflow no ambiente de execução usando a CLI principal do Dataform.

Chaves:

  • projectDirectoryPath: caminho relativo para uma pasta que contém as definições de fluxo de trabalho do Dataform.

Exemplo:

framework:
  dataform:
    airflowWorker:
      projectDirectoryPath: "dataform_local"

dataform > dataformService

Executa fluxos de trabalho do Dataform executados no serviço do Dataform.

Chaves:

  • location: local onde o repositório do Dataform está localizado.
  • projectId: projeto em que o repositório do Dataform está localizado.
  • repositoryId: ID do repositório do Dataform
  • workflowInvocation: configuração para a invocação do fluxo de trabalho, que especifica quais ações executar. Consulte WorkflowInvocation.

Exemplo:

framework:
  dataform:
    dataformService:
      location: "us-central1"
      projectId: "example-project"
      repositoryId: "example-repository"
      workflowInvocation:
        compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
        invocationConfig:
          serviceAccount: "example-account@example-project.iam.gserviceaccount.com"