Esta página contiene la referencia del DSL de Canalizaciones de organización.
Limitación en la versión preliminar
Durante la versión preliminar, Canalizaciones de organización tiene las siguientes limitaciones:
Para las acciones de
pysparkynotebook, haz lo siguiente:- Solo se admite un archivo
requirements.txtpara todas las acciones depysparkynotebook. - La plataforma de Windows no es compatible con la compilación de paquetes a través de la herramienta
uv. - Solo se admiten paquetes de Python con archivos binarios prediseñados.
- Solo se admite un archivo
Para las acciones de
sql, haz lo siguiente:- No se admite la definición de
inlineen la clavequery.
- No se admite la definición de
Acerca del formato y los valores
Las canalizaciones se definen en formato YAML y deben almacenarse en archivos separados, uno por canalización, en tu repositorio.
Canalizaciones de organización ofrece varias formas de usar variables en las definiciones de canalización y la configuración de implementación. Por ejemplo, puedes definir variables personalizadas, usar secretos de GitHub y sustituir valores de variables en la línea de comandos. Para obtener más información, consulta Variables, secretos y sustitución.
Para obtener más información sobre cómo agregar canalizaciones adicionales al paquete de canalizaciones, consulta Cómo agregar otra canalización.
Ejemplos de código
El repositorio orchestration-pipelines en GitHub contiene los ejemplos de código más recientes para muchas combinaciones de acciones y motores de canalizaciones. Te recomendamos que uses estos ejemplos como punto de partida para explorar las capacidades de las canalizaciones de organización.
Definición de canalización
Una definición de canalización tiene las siguientes claves de nivel superior:
modelVersion: Es la versión del modelo de definición de la canalización. La versión más reciente del modelo es1.0.pipelineId: Es un identificador único de la canalización. Este ID se mantiene coherente en múltiples implementaciones y versiones, lo que permite el seguimiento y la administración de la entidad de canalización lógica.description: Es la descripción de la canalización, que se asigna a la descripción del DAG de Airflow en el entorno del ejecutor.owner: Es el propietario de la canalización.tags: Son identificadores de cadena aplicados en la canalización que se usan para filtrar las canalizaciones.notifications: Notificaciones sobre eventos de canalización Tipos de notificaciones admitidos:onPipelineFailure: Correo electrónico sobre fallas en la canalización.
Las notificaciones requieren que se configuren los servicios de correo electrónico de SendGrid en tu entorno de ejecución. Para obtener instrucciones, consulta Configura las notificaciones por correo electrónico.
Ejemplo:
notifications: onPipelineFailure: email: ["user1@example.com", "user2@example.com"]runner: Especifica el motor de orquestación de destino. Se reserva para usarlo más adelante. Establece este valor enairflow.defaults: Establece valores predeterminados para propiedades comoproject_id,locationyexecutionConfigque se aplican a todas las acciones, a menos que se anulen dentro de una acción específica. Las propiedadesproject_idylocationpueden anularse con propiedades de acción individuales. La propiedadexecutionConfigno se puede anular en acciones individuales y especifica la cantidad de reintentos para todas las acciones de la canalización en el camporetries.triggers: Define cómo se inicia la canalización:Sin valor. La canalización aún se puede activar manualmente.
schedule: Activa la canalización según un programa, con expresiones cron.Ejemplo de programa:
triggers: - schedule: interval: "0 5 * * *" startTime: "2025-10-01T00:00:00" endTime: "2026-10-01T00:00:00" catchup: false timezone: "UTC"
actionsEs una asignación de las tareas que se ejecutarán. Cada entrada de asignación corresponde a una acción. Consulta Acciones.
Acciones
Las acciones de canalización definen pasos individuales en la ejecución de la canalización. Cada acción debe tener especificado un motor o un framework. El motor o el framework determinan qué recursos se usan para ejecutar la acción.
Las canalizaciones de organización admiten las siguientes acciones:
- PySpark (
pyspark): Ejecuta una secuencia de comandos de PySpark. - Notebook (
notebook): Ejecuta un archivo de notebook. - Consulta en SQL (
notebook): Ejecuta una consulta en SQL. - Python (
python): Ejecuta una secuencia de comandos de Python. - Canalización (
pipeline): Ejecuta canalizaciones de procesamiento de datos.
Canalizaciones de organización admite los siguientes motores y frameworks:
dataprocOnGce>existingCluster: Clúster de Managed Service para Apache Spark identificado por clusterName, project y location.dataprocOnGce>ephemeral: Se crea y borra un clúster de Managed Service para Apache Spark después de ejecutar el trabajo.dataprocServerless: Managed Service para Apache Spark envío por lotes.bigquery: Es un trabajo de BigQuery.python>local: Es una secuencia de comandos de Python que se ejecuta en un Airflow Worker en el entorno del ejecutor.dbt>airflowWorker: Modelos de dbt ejecutados en un trabajador de Airflow en el entorno del ejecutor condbt-core.dataform>airflowWorker: Flujos de trabajo de Dataform ejecutados en un trabajador de Airflow en el entorno del ejecutor con dataform core cli.dataform>dataformService: Flujos de trabajo de Dataform ejecutados en el servicio de Dataform.
En la siguiente tabla, se enumeran las posibles combinaciones de tipo de acción, motor y framework. Consulta las descripciones de los motores y los frameworks para ver ejemplos de código de acciones.
| Acción | Motor o framework | Salidas a |
|---|---|---|
pyspark |
dataprocOnGce > existingCluster |
Registros de trabajos de Managed Service para Apache Spark |
pyspark |
dataprocOnGce > ephemeralCluster |
Registros de trabajos de Managed Service para Apache Spark |
pyspark |
dataprocServerless |
Registros de Managed Service para Apache Spark Batch |
notebook |
dataprocOnGce > existingCluster |
Bucket de ejecución, en el directorio composer_declarative_dags_resources |
notebook |
dataprocOnGce > ephemeralCluster |
Registros de trabajos de Managed Service para Apache Spark |
notebook |
dataprocServerless |
Bucket de ejecución, en el directorio composer_declarative_dags_resources |
sql |
bigquery |
Tabla especificada en el parámetro destinationTable |
sql |
dataprocServerless |
Registros de Managed Service para Apache Spark Batch |
python |
local (ejecución local) |
Registros |
pipeline |
dbt > airflowWorker |
Registros y BigQuery |
pipeline |
dataform > airflowWorker |
Tabla especificada en BigQuery |
pipeline |
dataform > dataformService |
En Dataform |
Todas las acciones tienen las siguientes claves comunes. Otras claves dependen del tipo de acción.
name: Nombre de la acción. Este nombre se asigna al nombre de la tarea de Airflow en el entorno del ejecutor. Si una acción requiere más de una tarea de Airflow, este nombre se asigna al grupo de tareas.dependsOn: Es una lista de nombres de acciones upstream de las que depende esta acción, que definen el orden de ejecución. Si alguna de las acciones upstream falla, no se ejecutan las acciones downstream que dependen de ella.executionTimeout: Es el tiempo de espera para ejecutar la acción. Ejemplos:1h,30m,40s.
python
Son acciones de tipo python. Ejecutar secuencias de comandos de Python
Teclas específicas para cada tipo de acción:
mainFilePath: Es la ruta de acceso relativa al archivo de secuencia de comandos de Python.pythonCallable: Es el nombre de la función de Python que se ejecutará en la secuencia de comandos de Python.opKwargs: Es una asignación de argumentos de palabras clave para el operador.(Opcional)
environment: Ejecuta el script dentro de un entorno virtual de Python creado de forma dinámica.requirements: Requisitos del entorno virtual Los requisitos se resuelven en el tiempo de ejecución.inline: Los requisitos se especifican de forma intercalada.list: Lista de requisitos. Enumera los requisitos individuales según la PEP-508.Ejemplo:
environment: requirements: inline: list: ["pandas>=2.0.0"]
(Alternativa)
path: Es la ruta de acceso al archivo con los requisitos. Los requisitos de este archivo deben enumerarse según PEP-508.Ejemplo:
environment: requirements: path: "scripts/requirements.txt"
systemSitePackages: Si estrue, el entorno virtual hereda paquetes del directorio site-packages del trabajador de Airflow. Puedes instalar paquetes PyPI personalizados en tu entorno de ejecutor.
engine:local: Ejecución local en el entorno del ejecutor
Ejemplo:
local
modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 1
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- python:
name: "first_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
systemSitePackages: true
requirements:
path: "scripts/requirements.txt"
- python:
name: "second_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
requirements:
inline:
list: ["pandas>=2.0.0"]
systemSitePackages: true
dependsOn: ["first_script_run"]
- python:
name: "third_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_2.py"
pythonCallable: "main"
engine:
local: {}
opKwargs:
api_endpoint: "https://api.my-vendor.example.com/v1/status"
api_key_secret_name: "my-vendor-api-key"
dependsOn: ["first_script_run"]
pyspark
Son acciones de tipo pyspark. Ejecutar secuencias de comandos de PySpark
Teclas específicas para cada tipo de acción:
mainFilePath: Es la ruta relativa a la secuencia de comandos de PySpark.archiveUris: Es una lista de URIs de archivo para usar con esta acción.stagingBucket: Bucket de Cloud Storage que se usará con esta acción.pyFiles: Es una lista de archivos de Python que se usarán con este trabajo de Spark.environment: Es la configuración del entorno de Python.requirements: Archivo de requisitos de Python que se usará.path: Es la ruta de acceso al archivo con los requisitos. Los requisitos de este archivo deben enumerarse según PEP-508.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
Ejemplos:
existingCluster
modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 4 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run-pyspark-with-pyfiles-on-existing-cluster"
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
ephemeralCluster
pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run_dataproc_ephemeral"
executionTimeout: "1h"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "ephemeral-cluster-inline"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pyspark:
name: "run-pyspark-on-dataproc-serverless"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
notebook
Son acciones de tipo notebook. Ejecuta un notebook .ipynb a través de Papermill.
Teclas específicas para cada tipo de acción:
mainFilePath: Es la ruta de acceso relativa al archivo del notebook.archiveUris: Es una lista de URIs de archivo para usar con esta acción.stagingBucket: Bucket de Cloud Storage que se usará con esta acción.environment: Es la configuración del entorno de Python.requirements: Archivo de requisitos de Python que se usará.path: Es la ruta de acceso al archivo con los requisitos. Los requisitos de este archivo deben enumerarse según PEP-508.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
Ejemplo:
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- notebook:
name: "run-notebook-on-dataproc-serverless"
mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
archiveUris:
- "gs://example-bucket-additional-data/custom_venv.tar.gz"
staging_bucket: "example-bucket-additional-data-additional-data"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
sql
Son acciones de tipo sql. Ejecutar consultas en SQL
Teclas específicas para cada tipo de acción:
query: Define una consulta.path: La consulta se define en un archivo ubicado en la ruta de acceso relativa al archivo de configuración de la implementación.inline: La consulta se define de forma intercalada.
engine:bigQuerydataprocServerlessdataprocOnGce>existingClusterdataprocOnGce>ephemeralCluster
bigQuery
modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run_bigquery_insert_job_create"
query:
inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
engine:
bigquery:
location: "US"
- sql:
name: "run_bigquery_insert_job_select"
query:
path: "sql-scripts/count_rows.sql"
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
dependsOn:
- "run_bigquery_insert_job_create"
dataprocServerless
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-dataproc"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocServerless:
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
resourceProfile:
inline:
runtimeConfig:
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
dataproc.sparkBqConnector.version: "0.42.3"
existingCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-existing-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
existingCluster:
clusterName: "cluster-sql"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
ephemeralCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-ephemeral-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
ephemeralCluster:
clusterName: "example-ephemeral-cluster"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
resourceProfile:
inline:
clusterConfig:
gceClusterConfig:
zoneUri: "us-central1-a"
metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
canalización
Son acciones de tipo pipeline. Ejecuta una canalización de procesamiento de datos.
Teclas específicas para cada tipo de acción:
framework:dbtdataform>airflowWorkerdataform>dataformService
Ejemplos:
dbt
modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "dbt-action"
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform>airflowWorker
modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform>dataformService
modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"
- pipeline:
name: "run_dataform_compilation"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
Motores
Son los motores que se usan en las acciones.
dataprocOnGce > existingCluster
Ejecuta en un clúster de Managed Service para Apache Spark existente identificado por clusterName, proyecto y ubicación.
Puedes administrar el clúster especificado en tu configuración de implementación o de forma manual en Managed Service para Apache Spark. Te recomendamos que actualices el clúster con regularidad.
Claves:
clusterName: Nombre del clústerlocation: Es la región en la que se encuentra el clúster.projectId: ID del proyecto en el que se encuentra el clústerproperties: Es un mapa de propiedades del trabajo de Spark.
Ejemplo:
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
dataprocOnGce > ephemeralCluster
Se ejecuta en un clúster de Managed Service para Apache Spark efímero, que se crea y borra después de ejecutar el trabajo.
Claves:
clusterName: Nombre del clústerlocation: Es la región en la que se encuentra el clúster.projectId: ID del proyecto en el que se encuentra el clústerimpersonationChain: Cadena de identidad temporal como cuenta de servicio que se usará para ejecutar la acción.resourceProfile: Es el perfil de recursos del clúster de Managed Service para Apache Spark.Para obtener una descripción de los campos disponibles, consulta ClusterConfig en la documentación de Managed Service para Apache Spark.
Un perfil de recursos se puede especificar de las siguientes maneras:
inline: Se define como parte de la configuración de la canalización.path: Se define en un archivo ubicado en la ruta de acceso relativa.external_config_path: Se define en un archivo ubicado en un bucket de Cloud Storage. A diferencia de las opcionesinlineypath, que requieren la confirmación y la implementación para actualizar los valores del perfil de recursos, un perfil de recursos externo se resuelve en cada ejecución de la canalización y puedes actualizarlo sin volver a implementar la canalización.
Se pueden aplicar anulaciones al perfil de recurso especificado con la clave
override. Las anulaciones se aplican con una combinación profunda en el perfil de recurso previo proporcionado.properties: Es un mapa de propiedades del trabajo de Spark.
Ejemplo:
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "example-ephemeral-cluster"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
Ejecuta en el envío por lotes de Managed Service para Apache Spark.
Claves:
location: Es la región en la que se debe ejecutar el trabajo de Spark.impersonationChain: Cadena de identidad temporal como cuenta de servicio que se usará para ejecutar la acción.resourceProfile: Es el perfil de recursos de Managed Service para Apache Spark.Un perfil de recursos se puede especificar de las siguientes maneras:
inline: Se define como parte de la configuración de la canalización.path: Se define en un archivo ubicado en la ruta de acceso relativa.external_config_path: Se define en un archivo ubicado en un bucket de Cloud Storage. A diferencia de las opcionesinlineypath, que requieren la confirmación y la implementación para actualizar los valores del perfil de recursos, un perfil de recursos externo se resuelve en cada ejecución de la canalización y puedes actualizarlo sin volver a implementar la canalización.
Las siguientes claves especifican la configuración del perfil de recursos:
environmentConfig: Configuración del entornoruntimeConfig: Configuración del tiempo de ejecución
Para obtener la descripción de los campos disponibles, consulta RuntimeConfig y EnvironmentConfig en la documentación de Managed Service para Apache Spark.
Se pueden aplicar anulaciones al perfil de recurso especificado con la clave
override. Las anulaciones se aplican con una combinación profunda en el perfil de recursos proporcionado.
Ejemplo (en línea):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
Ejemplo (ruta de acceso externa y anulaciones):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
bigQuery
Se ejecuta como un trabajo de BigQuery.
Claves:
location: Es la región en la que se encuentra la tabla de destino.destinationTable: Tabla de BigQuery en la que se generarán los datosimpersonationChain: Cadena de identidad temporal como cuenta de servicio que se usará para ejecutar la acción.
Ejemplo:
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
local
Se ejecuta de forma local en el entorno del ejecutor.
Consulta la acción python para conocer las formas de configurar el entorno virtual.
Ejemplo:
engine:
local: {}
Frameworks
Son los frameworks que se usan en las acciones.
dbt > airflowWorker
Ejecuta un modelo de dbt que se ejecutó en un trabajador de Airflow en el entorno del ejecutor con dbt-core.
Claves:
projectDirectoryPath: Es la ruta de acceso relativa a una carpeta que contiene el proyecto de DBT.selectModels: Es la lista de modelos que se incluirán en la ejecución por nombre (equivalente adbt --select).tags: Es la lista de modelos que se incluirán en la ejecución por etiqueta (equivalente adbt --select).
Ejemplo:
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform > airflowWorker
Flujos de trabajo de Dataform ejecutados en un trabajador de Airflow en el entorno del ejecutor con dataform core cli.
Claves:
projectDirectoryPath: Es la ruta de acceso relativa a una carpeta que contiene las definiciones del flujo de trabajo de Dataform.
Ejemplo:
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform > dataformService
Ejecuta flujos de trabajo de Dataform que se ejecutan en el servicio de Dataform.
Claves:
location: Es la ubicación en la que se encuentra el repositorio de Dataform.projectId: Es el proyecto en el que se encuentra el repositorio de Dataform.repositoryId: ID del repositorio de DataformworkflowInvocation: Es la configuración de la invocación del flujo de trabajo, que especifica qué acciones se deben ejecutar. Consulta WorkflowInvocation.
Ejemplo:
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"