Esta página contém a referência da DSL de Orquestração de Pipelines.
Limitação no pré-lançamento
Durante o pré-lançamento, a Orquestração de Pipelines tem as seguintes limitações:
Para ações
pysparkenotebook:- Apenas um arquivo
requirements.txtpara todas as açõespysparkenotebooké aceito. - A plataforma Windows não é compatível com a criação de pacotes usando a ferramenta
uv. - Apenas pacotes Python com binários pré-criados são compatíveis.
- Apenas um arquivo
Para ações do
sql:- A definição
inlinena chavequerynão é compatível.
- A definição
Sobre o formato e os valores
Os pipelines são definidos no formato YAML e precisam ser armazenados em arquivos separados, um por pipeline, no seu repositório.
A Orquestração de Pipelines oferece várias maneiras de usar variáveis nas definições de pipeline e na configuração de implantação. Por exemplo, é possível definir variáveis personalizadas, usar secrets do GitHub e substituir valores de variáveis na linha de comando. Para mais informações, consulte Variáveis, secrets e substituição.
Para mais informações sobre como adicionar outros pipelines ao pacote, consulte Adicionar outro pipeline.
Exemplos de código
O repositório orchestration-pipelines no GitHub tem os exemplos de código mais recentes para muitas ações de pipeline e combinações de mecanismos. Recomendamos esses exemplos como ponto de partida para conhecer os recursos da Orquestração de Pipelines.
Definição de pipeline
Uma definição de pipeline tem as seguintes chaves de nível superior:
modelVersion: a versão do modelo de definição de pipeline. A versão mais recente do modelo é1.0.pipelineId: um identificador exclusivo do pipeline. Esse ID permanece consistente em várias implantações e versões, permitindo o rastreamento e o gerenciamento da entidade de pipeline lógico.description: descrição do pipeline, que é mapeada para a descrição do DAG do Airflow no ambiente do executor.owner: proprietário do pipeline.tags: identificadores de string aplicados ao pipeline, usados para filtrar os pipelines.notifications: notificações sobre eventos do pipeline. Tipos de notificação compatíveis:onPipelineFailure: e-mail sobre falhas no pipeline.
As notificações exigem que os serviços de e-mail do SendGrid estejam configurados no ambiente do executor. Para instruções, consulte Configurar notificações por e-mail.
Exemplo:
notifications: onPipelineFailure: email: ["user1@example.com", "user2@example.com"]runner: especifica o mecanismo de orquestração de destino. Reservado para uso futuro. Defina esse valor comoairflow.defaults: define valores padrão para propriedades comoproject_id,locationeexecutionConfigque se aplicam a todas as ações, a menos que sejam substituídas em uma ação específica. As propriedadesproject_idelocationpodem ser substituídas por propriedades de ação individuais. A propriedadeexecutionConfignão pode ser substituída em ações individuais e especifica o número de novas tentativas para todas as ações no pipeline no camporetries.triggers: define como o pipeline é iniciado:Nenhum valor. O pipeline ainda pode ser acionado manualmente.
schedule. Acione o pipeline em uma programação usando expressões cron.Exemplo de programação:
triggers: - schedule: interval: "0 5 * * *" startTime: "2025-10-01T00:00:00" endTime: "2026-10-01T00:00:00" catchup: false timezone: "UTC"
actionsUm mapeamento de tarefas a serem executadas. Cada entrada de mapeamento corresponde a uma ação. Consulte Ações.
Ações
As ações de pipeline definem etapas individuais na execução do pipeline. Cada ação precisa ter um mecanismo ou framework especificado. O mecanismo ou framework determina quais recursos são usados para executar a ação.
Orquestração de Pipelines é compatível com as seguintes ações:
- PySpark (
pyspark): executa um script PySpark. - Notebook (
notebook): execute um arquivo de notebook. - Consulta SQL (
notebook): execute uma consulta SQL. - Python (
python): execute um script Python. - Pipeline (
pipeline): execute um pipeline de tratamento de dados.
A Orquestração de Pipelines é compatível com os seguintes mecanismos e frameworks:
dataprocOnGce>existingCluster: Cluster do Serviço Gerenciado para Apache Spark identificado por clusterName, projeto e local.dataprocOnGce>ephemeral: Cluster do Serviço Gerenciado para Apache Spark criado e excluído após a execução do job.dataprocServerless: Serviço Gerenciado para Apache Spark envio em lote.bigquery: um job do BigQuery.python>local: script Python executado em um worker do Airflow no ambiente do executor.dbt>airflowWorker: modelos do dbt executados em um worker do Airflow no ambiente do executor usandodbt-core.dataform>airflowWorker: fluxos de trabalho do Dataform executados em um worker do Airflow no ambiente do runner usando dataform core cli.dataform>dataformService: fluxos de trabalho do Dataform executados no serviço do Dataform.
A tabela a seguir lista as possíveis combinações de tipo de ação, mecanismo e framework. Consulte as descrições de mecanismos e frameworks para exemplos de código de ação.
| Ação | Mecanismo ou framework | Saídas para |
|---|---|---|
pyspark |
dataprocOnGce > existingCluster |
Registros de jobs do Serviço Gerenciado para Apache Spark |
pyspark |
dataprocOnGce > ephemeralCluster |
Registros de jobs do Serviço Gerenciado para Apache Spark |
pyspark |
dataprocServerless |
Registros em lote do Serviço Gerenciado para Apache Spark |
notebook |
dataprocOnGce > existingCluster |
Bucket do executor, no diretório composer_declarative_dags_resources |
notebook |
dataprocOnGce > ephemeralCluster |
Registros de jobs do Serviço Gerenciado para Apache Spark |
notebook |
dataprocServerless |
Bucket do executor, no diretório composer_declarative_dags_resources |
sql |
bigquery |
Tabela especificada no parâmetro destinationTable |
sql |
dataprocServerless |
Registros em lote do Serviço Gerenciado para Apache Spark. |
python |
local (execução local) |
Registros |
pipeline |
dbt > airflowWorker |
Registros e BigQuery |
pipeline |
dataform > airflowWorker |
Tabela especificada no BigQuery |
pipeline |
dataform > dataformService |
No Dataform |
Todas as ações têm as seguintes chaves comuns. Outras chaves dependem do tipo de ação.
name: nome da ação. Esse nome é mapeado para o nome da tarefa do Airflow no ambiente do runner. Se uma ação exigir mais de uma tarefa do Airflow, esse nome será mapeado para o grupo de tarefas.dependsOn: uma lista de nomes de ações upstream de que esta ação depende, definindo a ordem de execução. Se alguma das ações upstream falhar, as ações downstream que dependem delas não serão executadas.executionTimeout: tempo limite para executar a ação. Exemplos:1h,30m,40s.
python
Ações do tipo python. Executar scripts Python.
Chaves específicas do tipo de ação:
mainFilePath: caminho relativo para o arquivo de script Python.pythonCallable: nome do Python chamável a ser executado no script Python.opKwargs: um mapeamento de argumentos de palavra-chave para o operador.(Opcional)
environment: execute o script em um ambiente virtual do Python criado dinamicamente.requirements: requisitos para o ambiente virtual. Os requisitos são resolvidos no tempo de execução.inline: os requisitos são especificados inline.list: lista de requisitos. Liste os requisitos individuais de acordo com a PEP-508.Exemplo:
environment: requirements: inline: list: ["pandas>=2.0.0"]
(Alternativa)
path: caminho para o arquivo com requisitos. Os requisitos neste arquivo precisam ser listados de acordo com PEP-508.Exemplo:
environment: requirements: path: "scripts/requirements.txt"
systemSitePackages: setrue, o ambiente virtual vai herdar pacotes do diretório site-packages do worker do Airflow. É possível instalar pacotes PyPI personalizados no ambiente do runner.
engine:local: execução local no ambiente do executor
Exemplo:
Local
modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 1
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- python:
name: "first_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
systemSitePackages: true
requirements:
path: "scripts/requirements.txt"
- python:
name: "second_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
requirements:
inline:
list: ["pandas>=2.0.0"]
systemSitePackages: true
dependsOn: ["first_script_run"]
- python:
name: "third_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_2.py"
pythonCallable: "main"
engine:
local: {}
opKwargs:
api_endpoint: "https://api.my-vendor.example.com/v1/status"
api_key_secret_name: "my-vendor-api-key"
dependsOn: ["first_script_run"]
pyspark
Ações do tipo pyspark. Executar scripts PySpark.
Chaves específicas do tipo de ação:
mainFilePath: caminho relativo para o script PySpark.archiveUris: uma lista de URIs de arquivo a serem usados com essa ação.stagingBucket: bucket do Cloud Storage a ser usado com essa ação.pyFiles: uma lista de arquivos Python a serem usados com este job do Spark.environment: configuração do ambiente Python.requirements: arquivo de requisitos do Python a ser usado.path: caminho para o arquivo com requisitos. Os requisitos neste arquivo precisam ser listados de acordo com PEP-508.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
Exemplos:
existingCluster
modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 4 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run-pyspark-with-pyfiles-on-existing-cluster"
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
ephemeralCluster
pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run_dataproc_ephemeral"
executionTimeout: "1h"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "ephemeral-cluster-inline"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pyspark:
name: "run-pyspark-on-dataproc-serverless"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
notebook
Ações do tipo notebook. Executar um notebook .ipynb usando o Papermill.
Chaves específicas do tipo de ação:
mainFilePath: caminho relativo para o arquivo de notebook.archiveUris: uma lista de URIs de arquivo a serem usados com essa ação.stagingBucket: bucket do Cloud Storage a ser usado com essa ação.environment: configuração do ambiente Python.requirements: arquivo de requisitos do Python a ser usado.path: caminho para o arquivo com requisitos. Os requisitos neste arquivo precisam ser listados de acordo com PEP-508.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
Exemplo:
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- notebook:
name: "run-notebook-on-dataproc-serverless"
mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
archiveUris:
- "gs://example-bucket-additional-data/custom_venv.tar.gz"
staging_bucket: "example-bucket-additional-data-additional-data"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
sql
Ações do tipo sql. Executar consultas SQL.
Chaves específicas do tipo de ação:
query: define uma consulta.path: a consulta é definida em um arquivo localizado no caminho relativo ao arquivo de configuração da implantação.inline: a consulta é definida in-line.
engine:bigQuerydataprocServerlessdataprocOnGce>existingClusterdataprocOnGce>ephemeralCluster
bigQuery
modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run_bigquery_insert_job_create"
query:
inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
engine:
bigquery:
location: "US"
- sql:
name: "run_bigquery_insert_job_select"
query:
path: "sql-scripts/count_rows.sql"
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
dependsOn:
- "run_bigquery_insert_job_create"
dataprocServerless
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-dataproc"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocServerless:
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
resourceProfile:
inline:
runtimeConfig:
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
dataproc.sparkBqConnector.version: "0.42.3"
existingCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-existing-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
existingCluster:
clusterName: "cluster-sql"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
ephemeralCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-ephemeral-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
ephemeralCluster:
clusterName: "example-ephemeral-cluster"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
resourceProfile:
inline:
clusterConfig:
gceClusterConfig:
zoneUri: "us-central1-a"
metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
pipeline
Ações do tipo pipeline. Executar um pipeline de tratamento de dados.
Chaves específicas do tipo de ação:
framework:dbtdataform>airflowWorkerdataform>dataformService
Exemplos:
dbt
modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "dbt-action"
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform>airflowWorker
modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform>dataformService
modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"
- pipeline:
name: "run_dataform_compilation"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
Mecanismos
Mecanismos usados em ações.
dataprocOnGce > existingCluster
Executar em um cluster do Serviço Gerenciado para Apache Spark identificado por clusterName, projeto e local.
Você pode gerenciar o cluster especificado na sua configuração de implantação ou manualmente no Serviço Gerenciado para Apache Spark. Recomendamos manter o cluster atualizado regularmente.
Chaves:
clusterName: nome do cluster.location: região em que o cluster está localizado.projectId: ID do projeto em que o cluster está localizadoproperties: um mapa de propriedades de job do Spark.
Exemplo:
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
dataprocOnGce > ephemeralCluster
Executar em um cluster efêmero do Serviço Gerenciado para Apache Spark, que é criado e excluído após a execução do job.
Chaves:
clusterName: nome do cluster.location: região em que o cluster está localizado.projectId: ID do projeto em que o cluster está localizadoimpersonationChain: uma cadeia de identidade temporária de conta de serviço a ser usada para executar a ação.resourceProfile: perfil de recursos do cluster do Serviço Gerenciado para Apache Spark.Para a descrição dos campos disponíveis, consulte ClusterConfig na documentação do Serviço Gerenciado para Apache Spark.
Um perfil de recurso pode ser especificado das seguintes maneiras:
inline: definido como parte da configuração do pipeline.path: definido em um arquivo localizado no caminho relativo.external_config_path: definido em um arquivo localizado em um bucket do Cloud Storage. Ao contrário das opçõesinlineepath, que exigem fazer commit e implantar para atualizar os valores do perfil de recurso, um perfil de recurso externo é resolvido em cada execução do pipeline, e você pode atualizá-lo sem reimplantar o pipeline.
As substituições podem ser aplicadas ao perfil de recurso especificado com a chave
override. As substituições são aplicadas com mesclagem profunda ao perfil de pré-fonte fornecido.properties: um mapa de propriedades de job do Spark.
Exemplo:
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "example-ephemeral-cluster"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
Executar no Serviço Gerenciado para Apache Spark um envio em lote.
Chaves:
location: região em que o job do Spark precisa ser executado.impersonationChain: cadeia de identidade temporária de conta de serviço a ser usada para executar a ação.resourceProfile: perfil de recursos do Serviço Gerenciado para Apache Spark.Um perfil de recurso pode ser especificado das seguintes maneiras:
inline: definido como parte da configuração do pipeline.path: definido em um arquivo localizado no caminho relativo.external_config_path: definido em um arquivo localizado em um bucket do Cloud Storage. Ao contrário das opçõesinlineepath, que exigem fazer commit e implantar para atualizar os valores do perfil de recurso, um perfil de recurso externo é resolvido em cada execução do pipeline, e você pode atualizá-lo sem reimplantar o pipeline.
As chaves a seguir especificam a configuração do perfil de recurso:
environmentConfig: configuração do ambienteruntimeConfig: configuração do ambiente de execução
Para a descrição dos campos disponíveis, consulte RuntimeConfig e EnvironmentConfig na documentação do Serviço gerenciado para Apache Spark.
As substituições podem ser aplicadas ao perfil de recurso especificado com a chave
override. As substituições são aplicadas com mesclagem profunda ao perfil de recurso fornecido.
Exemplo (inline):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
Exemplo (caminho externo e substituições):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
bigQuery
Executar como um job do BigQuery.
Chaves:
location: região em que a tabela de destino está localizada.destinationTable: tabela do BigQuery para gerar os dadosimpersonationChain: cadeia de identidade temporária de conta de serviço a ser usada para executar a ação.
Exemplo:
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
Local
Executar localmente no ambiente do executor.
Consulte a ação python para saber como configurar o ambiente virtual.
Exemplo:
engine:
local: {}
Frameworks
Estruturas usadas em ações.
dbt > airflowWorker
Execute um modelo do dbt em um worker do Airflow no ambiente do runner
usando dbt-core.
Chaves:
projectDirectoryPath: caminho relativo para uma pasta que contém o projeto do DBT.selectModels: lista de modelos a serem incluídos na execução por nome (equivalente adbt --select).tags: lista de modelos a serem incluídos na execução por tag (equivalente adbt --select).
Exemplo:
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform > airflowWorker
Fluxos de trabalho do Dataform executados em um worker do Airflow no ambiente de execução usando a CLI principal do Dataform.
Chaves:
projectDirectoryPath: caminho relativo para uma pasta que contém as definições de fluxo de trabalho do Dataform.
Exemplo:
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform > dataformService
Executa fluxos de trabalho do Dataform executados no serviço do Dataform.
Chaves:
location: local onde o repositório do Dataform está localizado.projectId: projeto em que o repositório do Dataform está localizado.repositoryId: ID do repositório do DataformworkflowInvocation: configuração para a invocação do fluxo de trabalho, que especifica quais ações executar. Consulte WorkflowInvocation.
Exemplo:
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"