Questa pagina contiene il riferimento al DSL di Orchestration Pipelines.
Limitazione nell'anteprima
Durante l'anteprima, Orchestration Pipelines presenta le seguenti limitazioni:
Per le azioni
pysparkenotebook:- È supportato un solo file
requirements.txtper tutte le azionipysparkenotebook. - La piattaforma Windows non è supportata per la creazione di pacchetti tramite lo strumento
uv. - Sono supportati solo i pacchetti Python con binari precompilati.
- È supportato un solo file
Per le azioni
sql:- La definizione di
inlinenella chiavequerynon è supportata.
- La definizione di
Informazioni sul formato e sui valori
Le pipeline sono definite nel formato YAML e devono essere archiviate in file separati, uno per pipeline, nel repository.
Orchestration Pipelines offre diversi modi per utilizzare le variabili nelle definizioni delle pipeline e nella configurazione di deployment. Ad esempio, puoi definire variabili personalizzate, utilizzare i secret di GitHub e sostituire i valori delle variabili nella riga di comando. Per saperne di più, consulta Variabili, secret e sostituzione.
Per ulteriori informazioni sull'aggiunta di pipeline aggiuntive al bundle di pipeline, vedi Aggiungere un'altra pipeline.
Esempi di codice
Il repository orchestration-pipelines su GitHub contiene gli esempi di codice più recenti per molte combinazioni di azioni e motori delle pipeline. Ti consigliamo di utilizzare questi esempi come punto di partenza per esplorare le funzionalità di Orchestration Pipelines.
Definizione della pipeline
Una definizione di pipeline ha le seguenti chiavi di primo livello:
modelVersion: la versione del modello di definizione della pipeline. L'ultima versione del modello è1.0.pipelineId: un identificatore univoco per la pipeline. Questo ID rimane coerente in più deployment e versioni, consentendo il monitoraggio e la gestione dell'entità pipeline logica.description: descrizione della pipeline, mappata alla descrizione del DAG Airflow nell'ambiente runner.owner: proprietario della pipeline.tags: identificatori stringa applicati alla pipeline, utilizzati per filtrare le pipeline.notifications: notifiche sugli eventi della pipeline. Tipi di notifiche supportati:onPipelineFailure: email sugli errori della pipeline.
Le notifiche richiedono servizi email SendGrid configurati nell'ambiente runner. Per le istruzioni, vedi Configurare le notifiche email.
Esempio:
notifications: onPipelineFailure: email: ["user1@example.com", "user2@example.com"]runner: specifica il motore di orchestrazione di destinazione. Riservato per l'uso futuro. Impostaairflowper questo valore.defaults: imposta i valori predefiniti per proprietà comeproject_id,locationeexecutionConfigche si applicano a tutte le azioni, a meno che non vengano sostituite all'interno di un'azione specifica. Le proprietàproject_idelocationpossono essere ignorate dalle singole proprietà dell'azione. La proprietàexecutionConfignon può essere sostituita nelle singole azioni e specifica il numero di tentativi per tutte le azioni nella pipeline nel camporetries.triggers: definisce la modalità di avvio della pipeline:Nessun valore. La pipeline può comunque essere attivata manualmente.
schedule. Attiva la pipeline in base a una pianificazione, utilizzando le espressioni cron.Esempio di programma:
triggers: - schedule: interval: "0 5 * * *" startTime: "2025-10-01T00:00:00" endTime: "2026-10-01T00:00:00" catchup: false timezone: "UTC"
actionsUna mappatura delle attività da eseguire. Ogni voce di mappatura corrisponde a un'azione. Vedi Azioni.
Azioni
Le azioni pipeline definiscono i singoli passaggi dell'esecuzione della pipeline. Per ogni azione deve essere specificato un motore o un framework. Il motore o il framework determina quali risorse vengono utilizzate per eseguire l'azione.
Orchestration Pipelines supporta le seguenti azioni:
- Pyspark (
pyspark): esegui uno script PySpark. - Notebook (
notebook): esegui un file notebook. - Query SQL (
notebook): esegui una query SQL. - Python (
python): esegui uno script Python. - Pipeline (
pipeline): esegui una pipeline di elaborazione dati.
Orchestration Pipelines supporta i seguenti motori e framework:
dataprocOnGce>existingCluster: Cluster Managed Service for Apache Spark identificato da clusterName, project e location.dataprocOnGce>ephemeral: Cluster Managed Service for Apache Spark creato ed eliminato dopo l'esecuzione del job.dataprocServerless: invio batch di Managed Service for Apache Spark .bigquery: job BigQuery.python>local: script Python eseguito su un worker Airflow nell'ambiente runner.dbt>airflowWorker: modelli dbt eseguiti su un worker Airflow nell'ambiente runner utilizzandodbt-core.dataform>airflowWorker: flussi di lavoro Dataform eseguiti su un worker Airflow nell'ambiente runner utilizzando dataform core cli.dataform>dataformService: flussi di lavoro Dataform eseguiti sul servizio Dataform.
La tabella seguente elenca le possibili combinazioni di tipo di azione, motore e framework. Consulta le descrizioni dei motori e dei framework per esempi di codici di azione.
| Azione | Motore o framework | Output in |
|---|---|---|
pyspark |
dataprocOnGce > existingCluster |
Log dei job Managed Service for Apache Spark |
pyspark |
dataprocOnGce > ephemeralCluster |
Log dei job Managed Service for Apache Spark |
pyspark |
dataprocServerless |
Log batch di Managed Service for Apache Spark |
notebook |
dataprocOnGce > existingCluster |
Bucket runner, nella directory composer_declarative_dags_resources |
notebook |
dataprocOnGce > ephemeralCluster |
Log dei job Managed Service for Apache Spark |
notebook |
dataprocServerless |
Bucket runner, nella directory composer_declarative_dags_resources |
sql |
bigquery |
Tabella specificata nel parametro destinationTable |
sql |
dataprocServerless |
Log batch di Managed Service for Apache Spark. |
python |
local (esecuzione locale) |
Log |
pipeline |
dbt > airflowWorker |
Log e BigQuery |
pipeline |
dataform > airflowWorker |
Tabella specificata in BigQuery |
pipeline |
dataform > dataformService |
In Dataform |
Tutte le azioni hanno le seguenti chiavi comuni. Gli altri tasti dipendono dal tipo di azione.
name: Nome dell'azione. Questo nome viene mappato al nome dell'attività Airflow nell'ambiente runner. Se un'azione richiede più di un'attività Airflow, questo nome viene mappato al gruppo di attività.dependsOn: un elenco di nomi di azioni upstream da cui dipende questa azione, che definisce l'ordine di esecuzione. Se una delle azioni upstream non va a buon fine, le azioni downstream che dipendono da essa non vengono eseguite.executionTimeout: timeout per l'esecuzione dell'azione. Esempi:1h,30m,40s.
python
Azioni di tipo python. Esegui script Python.
Chiavi specifiche per il tipo di azione:
mainFilePath: percorso relativo del file script Python.pythonCallable: nome del richiamabile Python da eseguire nello script Python.opKwargs: una mappatura degli argomenti delle parole chiave per l'operatore.(Facoltativo)
environment: esegui lo script all'interno di un ambiente virtuale Python creato dinamicamente.requirements: requisiti per l'ambiente virtuale. I requisiti vengono risolti in fase di esecuzione.inline: i requisiti sono specificati inline.list: elenco dei requisiti. Elenca i singoli requisiti in base a PEP-508.Esempio:
environment: requirements: inline: list: ["pandas>=2.0.0"]
(Alternativa)
path: percorso del file con i requisiti. I requisiti in questo file devono essere elencati in base a PEP-508.Esempio:
environment: requirements: path: "scripts/requirements.txt"
systemSitePackages: setrue, l'ambiente virtuale eredita i pacchetti dalla directory site-packages del worker Airflow. Puoi installare pacchetti PyPI personalizzati nel tuo ambiente runner.
engine:local: esecuzione locale nell'ambiente runner
Esempio:
locale
modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 1
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- python:
name: "first_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
systemSitePackages: true
requirements:
path: "scripts/requirements.txt"
- python:
name: "second_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
requirements:
inline:
list: ["pandas>=2.0.0"]
systemSitePackages: true
dependsOn: ["first_script_run"]
- python:
name: "third_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_2.py"
pythonCallable: "main"
engine:
local: {}
opKwargs:
api_endpoint: "https://api.my-vendor.example.com/v1/status"
api_key_secret_name: "my-vendor-api-key"
dependsOn: ["first_script_run"]
pyspark
Azioni di tipo pyspark. Esegui script PySpark.
Chiavi specifiche per il tipo di azione:
mainFilePath: percorso relativo allo script PySpark.archiveUris: un elenco di URI archivio da utilizzare con questa azione.stagingBucket: il bucket Cloud Storage da utilizzare con questa azione.pyFiles: un elenco di file Python da utilizzare con questo job Spark.environment: configurazione dell'ambiente Python.requirements: il file dei requisiti Python da utilizzare.path: Percorso del file con i requisiti. I requisiti in questo file devono essere elencati in base a PEP-508.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
Esempi:
existingCluster
modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 4 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run-pyspark-with-pyfiles-on-existing-cluster"
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
ephemeralCluster
pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run_dataproc_ephemeral"
executionTimeout: "1h"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "ephemeral-cluster-inline"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pyspark:
name: "run-pyspark-on-dataproc-serverless"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
notebook
Azioni di tipo notebook. Esegui un notebook .ipynb tramite Papermill.
Chiavi specifiche per il tipo di azione:
mainFilePath: il percorso relativo del file del blocco note.archiveUris: un elenco di URI archivio da utilizzare con questa azione.stagingBucket: il bucket Cloud Storage da utilizzare con questa azione.environment: configurazione dell'ambiente Python.requirements: il file dei requisiti Python da utilizzare.path: Percorso del file con i requisiti. I requisiti in questo file devono essere elencati in base a PEP-508.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
Esempio:
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- notebook:
name: "run-notebook-on-dataproc-serverless"
mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
archiveUris:
- "gs://example-bucket-additional-data/custom_venv.tar.gz"
staging_bucket: "example-bucket-additional-data-additional-data"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
sql
Azioni di tipo sql. Esegui query SQL.
Chiavi specifiche per il tipo di azione:
query: definisce una query.path: la query è definita in un file che si trova nel percorso relativo del file di configurazione del deployment.inline: la query è definita inline.
engine:bigQuerydataprocServerlessdataprocOnGce>existingClusterdataprocOnGce>ephemeralCluster
bigQuery
modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run_bigquery_insert_job_create"
query:
inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
engine:
bigquery:
location: "US"
- sql:
name: "run_bigquery_insert_job_select"
query:
path: "sql-scripts/count_rows.sql"
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
dependsOn:
- "run_bigquery_insert_job_create"
dataprocServerless
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-dataproc"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocServerless:
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
resourceProfile:
inline:
runtimeConfig:
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
dataproc.sparkBqConnector.version: "0.42.3"
existingCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-existing-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
existingCluster:
clusterName: "cluster-sql"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
ephemeralCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-ephemeral-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
ephemeralCluster:
clusterName: "example-ephemeral-cluster"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
resourceProfile:
inline:
clusterConfig:
gceClusterConfig:
zoneUri: "us-central1-a"
metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
pipeline
Azioni di tipo pipeline. Esegui una pipeline di elaborazione dati.
Chiavi specifiche per il tipo di azione:
framework:dbtdataform>airflowWorkerdataform>dataformService
Esempi:
Logo dbt
modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "dbt-action"
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform>airflowWorker
modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform>dataformService
modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"
- pipeline:
name: "run_dataform_compilation"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
Motori
Motori utilizzati nelle azioni.
dataprocOnGce > existingCluster
Esegui in un cluster Managed Service for Apache Spark esistente identificato da clusterName, progetto e località.
Puoi gestire il cluster specificato nella configurazione di deployment o manualmente in Managed Service for Apache Spark. Ti consigliamo di eseguire regolarmente l'upgrade del cluster.
Chiavi:
clusterName: Nome del clusterlocation: la regione in cui si trova il clusterprojectId: l'ID progetto del progetto in cui si trova il clusterproperties: una mappa delle proprietà del job Spark.
Esempio:
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
dataprocOnGce > ephemeralCluster
Esegui in un cluster Managed Service for Apache Spark effimero, che viene creato ed eliminato dopo l'esecuzione del job.
Chiavi:
clusterName: Nome del clusterlocation: la regione in cui si trova il clusterprojectId: l'ID progetto del progetto in cui si trova il clusterimpersonationChain: la catena di simulazione dell'identità del service account da utilizzare per eseguire l'azione.resourceProfile: profilo delle risorse del cluster Managed Service for Apache Spark.Per la descrizione dei campi disponibili, consulta ClusterConfig nella documentazione di Managed Service for Apache Spark.
Un profilo della risorsa può essere specificato nei seguenti modi:
inline: definito come parte della configurazione della pipeline.path: definito in un file che si trova nel percorso relativo.external_config_path: definito in un file che si trova in un bucket Cloud Storage. A differenza delle opzioniinlineepath, che richiedono l'esecuzione del commit e del deployment per aggiornare i valori del profilo della risorsa, un profilo della risorsa esterna viene risolto a ogni esecuzione della pipeline e puoi aggiornarlo senza eseguire nuovamente il deployment della pipeline.
Gli override possono essere applicati al profilo della risorsa specificato con la chiave
override. Gli override vengono applicati con l'unione profonda al profilo presource fornito.properties: una mappa delle proprietà del job Spark.
Esempio:
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "example-ephemeral-cluster"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
Esegui l'invio di batch in Managed Service for Apache Spark.
Chiavi:
location: la regione in cui deve essere eseguito il job Spark.impersonationChain: catena di simulazione dell'identità del service account da utilizzare per eseguire l'azione.resourceProfile: profilo delle risorse di Managed Service for Apache Spark.Un profilo della risorsa può essere specificato nei seguenti modi:
inline: definito come parte della configurazione della pipeline.path: definito in un file che si trova nel percorso relativo.external_config_path: definito in un file che si trova in un bucket Cloud Storage. A differenza delle opzioniinlineepath, che richiedono l'esecuzione del commit e del deployment per aggiornare i valori del profilo della risorsa, un profilo della risorsa esterna viene risolto a ogni esecuzione della pipeline e puoi aggiornarlo senza eseguire nuovamente il deployment della pipeline.
Le seguenti chiavi specificano la configurazione del profilo delle risorse:
environmentConfig: configurazione dell'ambienteruntimeConfig: configurazione di runtime
Per la descrizione dei campi disponibili, consulta RuntimeConfig e EnvironmentConfig nella documentazione di Managed Service for Apache Spark.
Gli override possono essere applicati al profilo della risorsa specificato con la chiave
override. Gli override vengono applicati con l'unione profonda al profilo della risorsa fornito.
Esempio (in linea):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
Esempio (percorso esterno e override):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
bigQuery
Esegui come job BigQuery.
Chiavi:
location: la regione in cui si trova la tabella di destinazione.destinationTable: la tabella BigQuery in cui restituire i datiimpersonationChain: la catena di simulazione dell'identità del service account da utilizzare per eseguire l'azione.
Esempio:
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
locale
Esegui localmente nell'ambiente runner.
Consulta l'azione python per scoprire come configurare l'ambiente virtuale.
Esempio:
engine:
local: {}
Framework
Framework utilizzati nelle azioni.
dbt > airflowWorker
Esegui un modello dbt eseguito su un worker Airflow nell'ambiente runner
utilizzando dbt-core.
Chiavi:
projectDirectoryPath: percorso relativo a una cartella che contiene il progetto DBT.selectModels: Elenco dei modelli da includere nell'esecuzione per nome (equivalente adbt --select).tags: Elenco dei modelli da includere nell'esecuzione per tag (equivalente adbt --select).
Esempio:
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform > airflowWorker
Workflow Dataform eseguiti su un worker Airflow nell'ambiente runner utilizzando dataform core cli.
Chiavi:
projectDirectoryPath: percorso relativo a una cartella che contiene le definizioni del workflow Dataform.
Esempio:
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform > dataformService
Esegue i workflow Dataform eseguiti sul servizio Dataform.
Chiavi:
location: la posizione in cui si trova il repository Dataform.projectId: il progetto in cui si trova il repository Dataform.repositoryId: ID repository DataformworkflowInvocation: configurazione per la chiamata del workflow, che specifica le azioni da eseguire. Vedi WorkflowInvocation.
Esempio:
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"