Referencia del DSL de canalizaciones de organización

Esta página contiene la referencia del DSL de Canalizaciones de organización.

Limitación en la versión preliminar

Durante la versión preliminar, Canalizaciones de organización tiene las siguientes limitaciones:

  • Para las acciones de pyspark y notebook, haz lo siguiente:

    • Solo se admite un archivo requirements.txt para todas las acciones de pyspark y notebook.
    • La plataforma de Windows no es compatible con la compilación de paquetes a través de la herramienta uv.
    • Solo se admiten paquetes de Python con archivos binarios prediseñados.
  • Para las acciones de sql, haz lo siguiente:

    • No se admite la definición de inline en la clave query.

Acerca del formato y los valores

Las canalizaciones se definen en formato YAML y deben almacenarse en archivos separados, uno por canalización, en tu repositorio.

Canalizaciones de organización ofrece varias formas de usar variables en las definiciones de canalización y la configuración de implementación. Por ejemplo, puedes definir variables personalizadas, usar secretos de GitHub y sustituir valores de variables en la línea de comandos. Para obtener más información, consulta Variables, secretos y sustitución.

Para obtener más información sobre cómo agregar canalizaciones adicionales al paquete de canalizaciones, consulta Cómo agregar otra canalización.

Ejemplos de código

El repositorio orchestration-pipelines en GitHub contiene los ejemplos de código más recientes para muchas combinaciones de acciones y motores de canalizaciones. Te recomendamos que uses estos ejemplos como punto de partida para explorar las capacidades de las canalizaciones de organización.

Definición de canalización

Una definición de canalización tiene las siguientes claves de nivel superior:

  • modelVersion: Es la versión del modelo de definición de la canalización. La versión más reciente del modelo es 1.0.

  • pipelineId: Es un identificador único de la canalización. Este ID se mantiene coherente en múltiples implementaciones y versiones, lo que permite el seguimiento y la administración de la entidad de canalización lógica.

  • description: Es la descripción de la canalización, que se asigna a la descripción del DAG de Airflow en el entorno del ejecutor.

  • owner: Es el propietario de la canalización.

  • tags: Son identificadores de cadena aplicados en la canalización que se usan para filtrar las canalizaciones.

  • notifications: Notificaciones sobre eventos de canalización Tipos de notificaciones admitidos:

    • onPipelineFailure: Correo electrónico sobre fallas en la canalización.

    Las notificaciones requieren que se configuren los servicios de correo electrónico de SendGrid en tu entorno de ejecución. Para obtener instrucciones, consulta Configura las notificaciones por correo electrónico.

    Ejemplo:

    notifications:
      onPipelineFailure:
        email: ["user1@example.com", "user2@example.com"]
    
  • runner: Especifica el motor de orquestación de destino. Se reserva para usarlo más adelante. Establece este valor en airflow.

  • defaults: Establece valores predeterminados para propiedades como project_id, location y executionConfig que se aplican a todas las acciones, a menos que se anulen dentro de una acción específica. Las propiedades project_id y location pueden anularse con propiedades de acción individuales. La propiedad executionConfig no se puede anular en acciones individuales y especifica la cantidad de reintentos para todas las acciones de la canalización en el campo retries.

  • triggers: Define cómo se inicia la canalización:

    • Sin valor. La canalización aún se puede activar manualmente.

    • schedule: Activa la canalización según un programa, con expresiones cron.

      Ejemplo de programa:

      triggers:
        - schedule:
            interval: "0 5 * * *"
            startTime: "2025-10-01T00:00:00"
            endTime: "2026-10-01T00:00:00"
            catchup: false
            timezone: "UTC"
      
  • actions

    Es una asignación de las tareas que se ejecutarán. Cada entrada de asignación corresponde a una acción. Consulta Acciones.

Acciones

Las acciones de canalización definen pasos individuales en la ejecución de la canalización. Cada acción debe tener especificado un motor o un framework. El motor o el framework determinan qué recursos se usan para ejecutar la acción.

Las canalizaciones de organización admiten las siguientes acciones:

  • PySpark (pyspark): Ejecuta una secuencia de comandos de PySpark.
  • Notebook (notebook): Ejecuta un archivo de notebook.
  • Consulta en SQL (notebook): Ejecuta una consulta en SQL.
  • Python (python): Ejecuta una secuencia de comandos de Python.
  • Canalización (pipeline): Ejecuta canalizaciones de procesamiento de datos.

Canalizaciones de organización admite los siguientes motores y frameworks:

  • dataprocOnGce > existingCluster: Clúster de Managed Service para Apache Spark identificado por clusterName, project y location.

  • dataprocOnGce > ephemeral: Se crea y borra un clúster de Managed Service para Apache Spark después de ejecutar el trabajo.

  • dataprocServerless: Managed Service para Apache Spark envío por lotes.

  • bigquery: Es un trabajo de BigQuery.

  • python > local: Es una secuencia de comandos de Python que se ejecuta en un Airflow Worker en el entorno del ejecutor.

  • dbt > airflowWorker: Modelos de dbt ejecutados en un trabajador de Airflow en el entorno del ejecutor con dbt-core.

  • dataform > airflowWorker: Flujos de trabajo de Dataform ejecutados en un trabajador de Airflow en el entorno del ejecutor con dataform core cli.

  • dataform > dataformService: Flujos de trabajo de Dataform ejecutados en el servicio de Dataform.

En la siguiente tabla, se enumeran las posibles combinaciones de tipo de acción, motor y framework. Consulta las descripciones de los motores y los frameworks para ver ejemplos de código de acciones.

Acción Motor o framework Salidas a
pyspark dataprocOnGce > existingCluster Registros de trabajos de Managed Service para Apache Spark
pyspark dataprocOnGce > ephemeralCluster Registros de trabajos de Managed Service para Apache Spark
pyspark dataprocServerless Registros de Managed Service para Apache Spark Batch
notebook dataprocOnGce > existingCluster Bucket de ejecución, en el directorio composer_declarative_dags_resources
notebook dataprocOnGce > ephemeralCluster Registros de trabajos de Managed Service para Apache Spark
notebook dataprocServerless Bucket de ejecución, en el directorio composer_declarative_dags_resources
sql bigquery Tabla especificada en el parámetro destinationTable
sql dataprocServerless Registros de Managed Service para Apache Spark Batch
python local (ejecución local) Registros
pipeline dbt > airflowWorker Registros y BigQuery
pipeline dataform > airflowWorker Tabla especificada en BigQuery
pipeline dataform > dataformService En Dataform

Todas las acciones tienen las siguientes claves comunes. Otras claves dependen del tipo de acción.

  • name: Nombre de la acción. Este nombre se asigna al nombre de la tarea de Airflow en el entorno del ejecutor. Si una acción requiere más de una tarea de Airflow, este nombre se asigna al grupo de tareas.

  • dependsOn: Es una lista de nombres de acciones upstream de las que depende esta acción, que definen el orden de ejecución. Si alguna de las acciones upstream falla, no se ejecutan las acciones downstream que dependen de ella.

  • executionTimeout: Es el tiempo de espera para ejecutar la acción. Ejemplos: 1h, 30m, 40s.

python

Son acciones de tipo python. Ejecutar secuencias de comandos de Python

Teclas específicas para cada tipo de acción:

  • mainFilePath: Es la ruta de acceso relativa al archivo de secuencia de comandos de Python.
  • pythonCallable: Es el nombre de la función de Python que se ejecutará en la secuencia de comandos de Python.
  • opKwargs: Es una asignación de argumentos de palabras clave para el operador.
  • (Opcional) environment: Ejecuta el script dentro de un entorno virtual de Python creado de forma dinámica.

    • requirements: Requisitos del entorno virtual Los requisitos se resuelven en el tiempo de ejecución.

      • inline: Los requisitos se especifican de forma intercalada.

        • list: Lista de requisitos. Enumera los requisitos individuales según la PEP-508.

          Ejemplo:

          environment:
            requirements:
              inline:
                list: ["pandas>=2.0.0"]
          
      • (Alternativa) path: Es la ruta de acceso al archivo con los requisitos. Los requisitos de este archivo deben enumerarse según PEP-508.

        Ejemplo:

        environment:
          requirements:
            path: "scripts/requirements.txt"
        
    • systemSitePackages: Si es true, el entorno virtual hereda paquetes del directorio site-packages del trabajador de Airflow. Puedes instalar paquetes PyPI personalizados en tu entorno de ejecutor.

  • engine:

    • local: Ejecución local en el entorno del ejecutor

Ejemplo:

local

modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 1

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - python:
      name: "first_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        systemSitePackages: true
        requirements:
          path: "scripts/requirements.txt"

  - python:
      name: "second_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        requirements:
          inline:
            list: ["pandas>=2.0.0"]
        systemSitePackages: true
      dependsOn: ["first_script_run"]

  - python:
      name: "third_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_2.py"
      pythonCallable: "main"
      engine:
        local: {}
      opKwargs:
        api_endpoint: "https://api.my-vendor.example.com/v1/status"
        api_key_secret_name: "my-vendor-api-key"
      dependsOn: ["first_script_run"]

pyspark

Son acciones de tipo pyspark. Ejecutar secuencias de comandos de PySpark

Teclas específicas para cada tipo de acción:

  • mainFilePath: Es la ruta relativa a la secuencia de comandos de PySpark.
  • archiveUris: Es una lista de URIs de archivo para usar con esta acción.
  • stagingBucket: Bucket de Cloud Storage que se usará con esta acción.
  • pyFiles: Es una lista de archivos de Python que se usarán con este trabajo de Spark.
  • environment: Es la configuración del entorno de Python.

    • requirements: Archivo de requisitos de Python que se usará.

      • path: Es la ruta de acceso al archivo con los requisitos. Los requisitos de este archivo deben enumerarse según PEP-508.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Ejemplos:

existingCluster

modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 4 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-dataproc-cluster"
            location: "us-central1"
            projectId: "example-project"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"

ephemeralCluster

pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - pyspark:
      name: "run_dataproc_ephemeral"
      executionTimeout: "1h"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            projectId: "example-project"
            location: "us-central1"
            clusterName: "ephemeral-cluster-inline"
            resourceProfile:
              inline:
                config:
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
            properties:
              spark.submit.deployMode: "cluster"

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pyspark:
      name: "run-pyspark-on-dataproc-serverless"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            inline:
              environmentConfig:
                executionConfig:
                  serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
                  networkUri: "projects/example-project/global/networks/default"
              runtimeConfig:
                version: "2.3"
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

notebook

Son acciones de tipo notebook. Ejecuta un notebook .ipynb a través de Papermill.

Teclas específicas para cada tipo de acción:

  • mainFilePath: Es la ruta de acceso relativa al archivo del notebook.
  • archiveUris: Es una lista de URIs de archivo para usar con esta acción.
  • stagingBucket: Bucket de Cloud Storage que se usará con esta acción.
  • environment: Es la configuración del entorno de Python.

    • requirements: Archivo de requisitos de Python que se usará.

      • path: Es la ruta de acceso al archivo con los requisitos. Los requisitos de este archivo deben enumerarse según PEP-508.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Ejemplo:

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - notebook:
      name: "run-notebook-on-dataproc-serverless"
      mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
      archiveUris:
        - "gs://example-bucket-additional-data/custom_venv.tar.gz"
      staging_bucket: "example-bucket-additional-data-additional-data"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

sql

Son acciones de tipo sql. Ejecutar consultas en SQL

Teclas específicas para cada tipo de acción:

  • query: Define una consulta.

    • path: La consulta se define en un archivo ubicado en la ruta de acceso relativa al archivo de configuración de la implementación.
    • inline: La consulta se define de forma intercalada.

  • engine:

    • bigQuery
    • dataprocServerless
    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster

bigQuery

modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run_bigquery_insert_job_create"
      query:
        inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
      engine:
        bigquery:
          location: "US"
  - sql:
      name: "run_bigquery_insert_job_select"
      query:
        path: "sql-scripts/count_rows.sql"
      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"
      dependsOn:
        - "run_bigquery_insert_job_create"

dataprocServerless

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-dataproc"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocServerless:
          location: "us-central1"
          impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
          resourceProfile:
            inline:
              runtimeConfig:
                properties:
                  spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
                  spark.sql.catalog.bigquery.project: "example-project"
                  dataproc.sparkBqConnector.version: "0.42.3"

existingCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-existing-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "cluster-sql"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"

ephemeralCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-ephemeral-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            clusterName: "example-ephemeral-cluster"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
            resourceProfile:
              inline:
                clusterConfig:
                  gceClusterConfig:
                    zoneUri: "us-central1-a"
                    metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
                      SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"

canalización

Son acciones de tipo pipeline. Ejecuta una canalización de procesamiento de datos.

Teclas específicas para cada tipo de acción:

  • framework:

    • dbt
    • dataform > airflowWorker
    • dataform > dataformService

Ejemplos:

dbt

modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "dbt-action"
      framework:
        dbt:
          airflowWorker:
            projectDirectoryPath: "dbt_project"
            selectModels: ["model_1", "model_2"]

dataform>airflowWorker

modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          airflowWorker:
            projectDirectoryPath: "dataform_local"

dataform>dataformService

modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"

  - pipeline:
      name: "run_dataform_compilation"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
              invocationConfig:
                serviceAccount: "example-account@example-project.iam.gserviceaccount.com"

Motores

Son los motores que se usan en las acciones.

dataprocOnGce > existingCluster

Ejecuta en un clúster de Managed Service para Apache Spark existente identificado por clusterName, proyecto y ubicación.

Puedes administrar el clúster especificado en tu configuración de implementación o de forma manual en Managed Service para Apache Spark. Te recomendamos que actualices el clúster con regularidad.

Claves:

  • clusterName: Nombre del clúster
  • location: Es la región en la que se encuentra el clúster.
  • projectId: ID del proyecto en el que se encuentra el clúster
  • properties: Es un mapa de propiedades del trabajo de Spark.

Ejemplo:

engine:
  dataprocOnGce:
    existingCluster:
      clusterName: "example-dataproc-cluster"
      location: "us-central1"
      projectId: "example-project"
      impersonationChain: "example-account@example-project.iam.gserviceaccount.com"

dataprocOnGce > ephemeralCluster

Se ejecuta en un clúster de Managed Service para Apache Spark efímero, que se crea y borra después de ejecutar el trabajo.

Claves:

  • clusterName: Nombre del clúster
  • location: Es la región en la que se encuentra el clúster.
  • projectId: ID del proyecto en el que se encuentra el clúster
  • impersonationChain: Cadena de identidad temporal como cuenta de servicio que se usará para ejecutar la acción.
  • resourceProfile: Es el perfil de recursos del clúster de Managed Service para Apache Spark.

    Para obtener una descripción de los campos disponibles, consulta ClusterConfig en la documentación de Managed Service para Apache Spark.

    Un perfil de recursos se puede especificar de las siguientes maneras:

    • inline: Se define como parte de la configuración de la canalización.
    • path: Se define en un archivo ubicado en la ruta de acceso relativa.
    • external_config_path: Se define en un archivo ubicado en un bucket de Cloud Storage. A diferencia de las opciones inline y path, que requieren la confirmación y la implementación para actualizar los valores del perfil de recursos, un perfil de recursos externo se resuelve en cada ejecución de la canalización y puedes actualizarlo sin volver a implementar la canalización.

    Se pueden aplicar anulaciones al perfil de recurso especificado con la clave override. Las anulaciones se aplican con una combinación profunda en el perfil de recurso previo proporcionado.

  • properties: Es un mapa de propiedades del trabajo de Spark.

Ejemplo:

engine:
  dataprocOnGce:
    ephemeralCluster:
      projectId: "example-project"
      location: "us-central1"
      clusterName: "example-ephemeral-cluster"
      resourceProfile:
        inline:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
            workerConfig:
              numInstances: 2
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
      properties:
        spark.submit.deployMode: "cluster"

dataprocServerless

Ejecuta en el envío por lotes de Managed Service para Apache Spark.

Claves:

  • location: Es la región en la que se debe ejecutar el trabajo de Spark.
  • impersonationChain: Cadena de identidad temporal como cuenta de servicio que se usará para ejecutar la acción.
  • resourceProfile: Es el perfil de recursos de Managed Service para Apache Spark.

    Un perfil de recursos se puede especificar de las siguientes maneras:

    • inline: Se define como parte de la configuración de la canalización.
    • path: Se define en un archivo ubicado en la ruta de acceso relativa.
    • external_config_path: Se define en un archivo ubicado en un bucket de Cloud Storage. A diferencia de las opciones inline y path, que requieren la confirmación y la implementación para actualizar los valores del perfil de recursos, un perfil de recursos externo se resuelve en cada ejecución de la canalización y puedes actualizarlo sin volver a implementar la canalización.

    Las siguientes claves especifican la configuración del perfil de recursos:

    • environmentConfig: Configuración del entorno
    • runtimeConfig: Configuración del tiempo de ejecución

    Para obtener la descripción de los campos disponibles, consulta RuntimeConfig y EnvironmentConfig en la documentación de Managed Service para Apache Spark.

    Se pueden aplicar anulaciones al perfil de recurso especificado con la clave override. Las anulaciones se aplican con una combinación profunda en el perfil de recursos proporcionado.

Ejemplo (en línea):

engine:
  dataprocServerless:
    location: "us-central1"
    resourceProfile:
      inline:
        environmentConfig:
          executionConfig:
            serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
            networkUri: "projects/example-project/global/networks/default"
        runtimeConfig:
          version: "2.3"
          properties:
            spark.app.name: "run-notebook-on-dataproc-serverless"
            spark.executor.instances: "2"
            spark.driver.cores: "4"

Ejemplo (ruta de acceso externa y anulaciones):

      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                    spark.app.name: "run-notebook-on-dataproc-serverless"
                    spark.executor.instances: "2"
                    spark.driver.cores: "4"

bigQuery

Se ejecuta como un trabajo de BigQuery.

Claves:

Ejemplo:

      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"

local

Se ejecuta de forma local en el entorno del ejecutor.

Consulta la acción python para conocer las formas de configurar el entorno virtual.

Ejemplo:

    engine:
      local: {}

Frameworks

Son los frameworks que se usan en las acciones.

dbt > airflowWorker

Ejecuta un modelo de dbt que se ejecutó en un trabajador de Airflow en el entorno del ejecutor con dbt-core.

Claves:

  • projectDirectoryPath: Es la ruta de acceso relativa a una carpeta que contiene el proyecto de DBT.
  • selectModels: Es la lista de modelos que se incluirán en la ejecución por nombre (equivalente a dbt --select).
  • tags: Es la lista de modelos que se incluirán en la ejecución por etiqueta (equivalente a dbt --select).

Ejemplo:

framework:
  dbt:
    airflowWorker:
      projectDirectoryPath: "dbt_project"
      selectModels: ["model_1", "model_2"]

dataform > airflowWorker

Flujos de trabajo de Dataform ejecutados en un trabajador de Airflow en el entorno del ejecutor con dataform core cli.

Claves:

  • projectDirectoryPath: Es la ruta de acceso relativa a una carpeta que contiene las definiciones del flujo de trabajo de Dataform.

Ejemplo:

framework:
  dataform:
    airflowWorker:
      projectDirectoryPath: "dataform_local"

dataform > dataformService

Ejecuta flujos de trabajo de Dataform que se ejecutan en el servicio de Dataform.

Claves:

  • location: Es la ubicación en la que se encuentra el repositorio de Dataform.
  • projectId: Es el proyecto en el que se encuentra el repositorio de Dataform.
  • repositoryId: ID del repositorio de Dataform
  • workflowInvocation: Es la configuración de la invocación del flujo de trabajo, que especifica qué acciones se deben ejecutar. Consulta WorkflowInvocation.

Ejemplo:

framework:
  dataform:
    dataformService:
      location: "us-central1"
      projectId: "example-project"
      repositoryId: "example-repository"
      workflowInvocation:
        compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
        invocationConfig:
          serviceAccount: "example-account@example-project.iam.gserviceaccount.com"