Documentation de référence sur le DSL Orchestration Pipelines

Cette page contient la documentation de référence sur le DSL des pipelines d'orchestration.

Limites de l'aperçu

Pendant la version préliminaire, les pipelines d'orchestration présentent les limites suivantes :

  • Pour les actions pyspark et notebook :

    • Un seul fichier requirements.txt est accepté pour toutes les actions pyspark et notebook.
    • La plate-forme Windows n'est pas compatible avec la création de packages via l'outil uv.
    • Seuls les packages Python avec des binaires précompilés sont acceptés.
  • Pour les actions sql :

    • La définition inline dans la clé query n'est pas acceptée.

À propos du format et des valeurs

Les pipelines sont définis au format YAML et doivent être stockés dans des fichiers distincts (un par pipeline) dans votre dépôt.

Orchestration Pipelines offre plusieurs façons d'utiliser des variables dans vos définitions de pipeline et votre configuration de déploiement. Par exemple, vous pouvez définir des variables personnalisées, utiliser des secrets GitHub et remplacer des valeurs de variables sur la ligne de commande. Pour en savoir plus, consultez Variables, secrets et substitution.

Pour savoir comment ajouter des pipelines supplémentaires au bundle de pipelines, consultez Ajouter un autre pipeline.

Exemples de code

Le dépôt orchestration-pipelines sur GitHub contient les derniers exemples de code pour de nombreuses combinaisons d'actions et de moteurs de pipeline. Nous vous recommandons d'utiliser ces exemples comme point de départ pour explorer les fonctionnalités des Orchestration Pipelines.

Définition d'un pipeline

Une définition de pipeline comporte les clés de niveau supérieur suivantes :

  • modelVersion : version du modèle de définition du pipeline. La dernière version du modèle est 1.0.

  • pipelineId : identifiant unique du pipeline. Cet ID reste cohérent sur plusieurs déploiements et versions, ce qui permet de suivre et de gérer l'entité de pipeline logique.

  • description : description du pipeline, qui est mappée à la description du DAG Airflow dans l'environnement de l'exécuteur.

  • owner : propriétaire du pipeline.

  • tags : identifiants de chaîne appliqués au pipeline, utilisés pour filtrer les pipelines.

  • notifications : notifications sur les événements de pipeline. Types de notifications acceptés :

    • onPipelineFailure : e-mail en cas d'échec du pipeline.

    Les notifications nécessitent que les services de messagerie SendGrid soient configurés dans votre environnement d'exécution. Pour savoir comment procéder, consultez Configurer les notifications par e-mail.

    Exemple :

    notifications:
      onPipelineFailure:
        email: ["user1@example.com", "user2@example.com"]
    
  • runner : spécifie le moteur d'orchestration cible. Réservé pour une utilisation ultérieure. Définissez cette valeur sur airflow.

  • defaults : définit les valeurs par défaut des propriétés telles que project_id, location et executionConfig qui s'appliquent à toutes les actions, sauf si elles sont remplacées dans une action spécifique. Les propriétés project_id et location peuvent être remplacées par des propriétés d'action individuelles. La propriété executionConfig ne peut pas être remplacée dans les actions individuelles. Elle spécifie le nombre de nouvelles tentatives pour toutes les actions du pipeline dans le champ retries.

  • triggers : définit la façon dont le pipeline est lancé :

    • Aucune valeur. Le pipeline peut toujours être déclenché manuellement.

    • schedule : déclenchez le pipeline selon une programmation à l'aide d'expressions Cron.

      Exemple de programmation :

      triggers:
        - schedule:
            interval: "0 5 * * *"
            startTime: "2025-10-01T00:00:00"
            endTime: "2026-10-01T00:00:00"
            catchup: false
            timezone: "UTC"
      
  • actions

    Mise en correspondance des tâches à exécuter. Chaque entrée de mappage correspond à une action. Consultez Actions.

Actions

Les actions de pipeline définissent les étapes individuelles de l'exécution du pipeline. Chaque action doit être associée à un moteur ou un framework. Le moteur ou le framework détermine les ressources utilisées pour exécuter l'action.

Orchestration Pipelines prend en charge les actions suivantes :

  • Pyspark (pyspark) : exécutez un script PySpark.
  • Notebook (notebook) : exécutez un fichier notebook.
  • Requête SQL (notebook) : exécutez une requête SQL.
  • Python (python) : exécutez un script Python.
  • Pipeline (pipeline) : exécutez un pipeline de traitement des données.

Orchestration Pipelines prend en charge les moteurs et frameworks suivants :

  • dataprocOnGce > existingCluster : Cluster Managed Service pour Apache Spark identifié par clusterName, project et location.

  • dataprocOnGce > ephemeral : Cluster Managed Service pour Apache Spark créé et supprimé après l'exécution du job.

  • dataprocServerless : Managed Service pour Apache Spark envoi par lot.

  • bigquery : job BigQuery.

  • python > local : script Python exécuté sur un Worker Airflow dans l'environnement du runner.

  • dbt > airflowWorker : modèles dbt exécutés sur un nœud de calcul Airflow dans l'environnement runner à l'aide de dbt-core.

  • dataform > airflowWorker : workflows Dataform exécutés sur un nœud de calcul Airflow dans l'environnement Runner à l'aide de dataform core cli.

  • dataform > dataformService : workflows Dataform exécutés sur le service Dataform.

Le tableau suivant répertorie les combinaisons possibles de types d'actions, de moteurs et de frameworks. Consultez les descriptions des moteurs et des frameworks pour obtenir des exemples de code d'action.

Action Moteur ou framework Sorties vers
pyspark dataprocOnGce > existingCluster Journaux des jobs Managed Service pour Apache Spark
pyspark dataprocOnGce > ephemeralCluster Journaux des jobs Managed Service pour Apache Spark
pyspark dataprocServerless Journaux des traitements par lot Managed Service pour Apache Spark
notebook dataprocOnGce > existingCluster Bucket du programme d'exécution, sous le répertoire composer_declarative_dags_resources
notebook dataprocOnGce > ephemeralCluster Journaux des jobs Managed Service pour Apache Spark
notebook dataprocServerless Bucket du programme d'exécution, sous le répertoire composer_declarative_dags_resources
sql bigquery Table spécifiée dans le paramètre destinationTable
sql dataprocServerless Journaux des traitements par lot Managed Service pour Apache Spark.
python local (exécution locale) Journaux
pipeline dbt > airflowWorker Journaux et BigQuery
pipeline dataform > airflowWorker Table spécifiée dans BigQuery
pipeline dataform > dataformService Dans Dataform

Toutes les actions ont les clés communes suivantes. D'autres clés dépendent du type d'action.

  • name : nom de l'action. Ce nom est mappé au nom de la tâche Airflow dans l'environnement du runner. Si une action nécessite plusieurs tâches Airflow, ce nom est mappé au groupe de tâches.

  • dependsOn : liste des noms d'actions en amont dont dépend cette action, définissant l'ordre d'exécution. Si l'une des actions en amont échoue, les actions en aval qui en dépendent ne sont pas exécutées.

  • executionTimeout : délai avant expiration pour exécuter l'action. Exemples : 1h, 30m, 40s.

python

Actions de type python. Exécutez des scripts Python.

Clés spécifiques au type d'action :

  • mainFilePath : chemin relatif vers le fichier de script Python.
  • pythonCallable : nom de l'appelable Python à exécuter dans le script Python.
  • opKwargs : mappage des arguments de mot clé pour l'opérateur.
  • (Facultatif) environment : exécutez le script dans un environnement virtuel Python créé de manière dynamique.

    • requirements : exigences concernant l'environnement virtuel. Les exigences sont résolues au moment de l'exécution.

      • inline : les exigences sont spécifiées sur la même ligne.

        • list : liste des exigences. Listez les exigences individuelles conformément à la PEP-508.

          Exemple :

          environment:
            requirements:
              inline:
                list: ["pandas>=2.0.0"]
          
      • (Autre option) path : chemin d'accès au fichier contenant les exigences. Les exigences de ce fichier doivent être listées conformément à la spécification PEP-508.

        Exemple :

        environment:
          requirements:
            path: "scripts/requirements.txt"
        
    • systemSitePackages : si la valeur est true, l'environnement virtuel hérite des packages du répertoire site-packages du nœud de calcul Airflow. Vous pouvez installer des packages PyPI personnalisés dans votre environnement d'exécution.

  • engine :

    • local : exécution locale dans l'environnement de l'exécuteur

Exemple :

interprétabilité locale

modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 1

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - python:
      name: "first_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        systemSitePackages: true
        requirements:
          path: "scripts/requirements.txt"

  - python:
      name: "second_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        requirements:
          inline:
            list: ["pandas>=2.0.0"]
        systemSitePackages: true
      dependsOn: ["first_script_run"]

  - python:
      name: "third_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_2.py"
      pythonCallable: "main"
      engine:
        local: {}
      opKwargs:
        api_endpoint: "https://api.my-vendor.example.com/v1/status"
        api_key_secret_name: "my-vendor-api-key"
      dependsOn: ["first_script_run"]

pyspark

Actions de type pyspark. Exécutez des scripts PySpark.

Clés spécifiques au type d'action :

  • mainFilePath : chemin d'accès relatif au script PySpark.
  • archiveUris : liste des URI d'archive à utiliser avec cette action.
  • stagingBucket : bucket Cloud Storage à utiliser avec cette action.
  • pyFiles : liste des fichiers Python à utiliser avec cette tâche Spark.
  • environment : configuration de l'environnement Python.

    • requirements : fichier des exigences Python à utiliser.

      • path : chemin d'accès au fichier contenant les exigences. Les exigences de ce fichier doivent être listées conformément à la spécification PEP-508.
  • engine :

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Exemples :

existingCluster

modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 4 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-dataproc-cluster"
            location: "us-central1"
            projectId: "example-project"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"

ephemeralCluster

pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - pyspark:
      name: "run_dataproc_ephemeral"
      executionTimeout: "1h"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            projectId: "example-project"
            location: "us-central1"
            clusterName: "ephemeral-cluster-inline"
            resourceProfile:
              inline:
                config:
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
            properties:
              spark.submit.deployMode: "cluster"

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pyspark:
      name: "run-pyspark-on-dataproc-serverless"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            inline:
              environmentConfig:
                executionConfig:
                  serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
                  networkUri: "projects/example-project/global/networks/default"
              runtimeConfig:
                version: "2.3"
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

notebook

Actions de type notebook. Exécutez un notebook .ipynb via Papermill.

Clés spécifiques au type d'action :

  • mainFilePath : chemin relatif vers le fichier notebook.
  • archiveUris : liste des URI d'archive à utiliser avec cette action.
  • stagingBucket : bucket Cloud Storage à utiliser avec cette action.
  • environment : configuration de l'environnement Python.

    • requirements : fichier des exigences Python à utiliser.

      • path : chemin d'accès au fichier contenant les exigences. Les exigences de ce fichier doivent être listées conformément à la spécification PEP-508.
  • engine :

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Exemple :

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - notebook:
      name: "run-notebook-on-dataproc-serverless"
      mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
      archiveUris:
        - "gs://example-bucket-additional-data/custom_venv.tar.gz"
      staging_bucket: "example-bucket-additional-data-additional-data"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

sql

Actions de type sql. Exécuter des requêtes SQL

Clés spécifiques au type d'action :

  • query : définit une requête.

    • path : la requête est définie dans un fichier situé au chemin d'accès relatif au fichier de configuration du déploiement.
    • inline : la requête est définie de manière intégrée.

  • engine :

    • bigQuery
    • dataprocServerless
    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster

bigQuery

modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run_bigquery_insert_job_create"
      query:
        inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
      engine:
        bigquery:
          location: "US"
  - sql:
      name: "run_bigquery_insert_job_select"
      query:
        path: "sql-scripts/count_rows.sql"
      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"
      dependsOn:
        - "run_bigquery_insert_job_create"

dataprocServerless

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-dataproc"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocServerless:
          location: "us-central1"
          impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
          resourceProfile:
            inline:
              runtimeConfig:
                properties:
                  spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
                  spark.sql.catalog.bigquery.project: "example-project"
                  dataproc.sparkBqConnector.version: "0.42.3"

existingCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-existing-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "cluster-sql"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"

ephemeralCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-ephemeral-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            clusterName: "example-ephemeral-cluster"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
            resourceProfile:
              inline:
                clusterConfig:
                  gceClusterConfig:
                    zoneUri: "us-central1-a"
                    metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
                      SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"

pipeline

Actions de type pipeline. Exécutez un pipeline de traitement de données.

Clés spécifiques au type d'action :

  • framework :

    • dbt
    • dataform > airflowWorker
    • dataform > dataformService

Exemples :

dbt

modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "dbt-action"
      framework:
        dbt:
          airflowWorker:
            projectDirectoryPath: "dbt_project"
            selectModels: ["model_1", "model_2"]

dataform>airflowWorker

modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          airflowWorker:
            projectDirectoryPath: "dataform_local"

dataform>dataformService

modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"

  - pipeline:
      name: "run_dataform_compilation"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
              invocationConfig:
                serviceAccount: "example-account@example-project.iam.gserviceaccount.com"

Moteurs

Moteurs utilisés dans les actions.

dataprocOnGce > existingCluster

Exécutez dans un cluster Managed Service pour Apache Spark existant identifié par clusterName, project et location.

Vous pouvez gérer le cluster spécifié dans votre configuration de déploiement ou manuellement dans Managed Service pour Apache Spark. Nous vous recommandons de mettre à niveau régulièrement le cluster.

Clés :

  • clusterName : nom du cluster.
  • location : région où se trouve le cluster.
  • projectId : ID du projet dans lequel se trouve le cluster
  • properties : carte des propriétés des tâches Spark.

Exemple :

engine:
  dataprocOnGce:
    existingCluster:
      clusterName: "example-dataproc-cluster"
      location: "us-central1"
      projectId: "example-project"
      impersonationChain: "example-account@example-project.iam.gserviceaccount.com"

dataprocOnGce > ephemeralCluster

Exécutez-le dans un cluster Managed Service pour Apache Spark éphémère, qui est créé et supprimé après l'exécution du job.

Clés :

  • clusterName : nom du cluster.
  • location : région où se trouve le cluster.
  • projectId : ID du projet dans lequel se trouve le cluster
  • impersonationChain : chaîne d'emprunt d'identité d'un compte de service à utiliser pour exécuter l'action.
  • resourceProfile : profil de ressources du cluster Managed Service pour Apache Spark.

    Pour obtenir la description des champs disponibles, consultez ClusterConfig dans la documentation Managed Service pour Apache Spark.

    Vous pouvez spécifier un profil de ressource de l'une des manières suivantes :

    • inline : défini dans la configuration du pipeline.
    • path : défini dans un fichier situé au chemin d'accès relatif.
    • external_config_path : défini dans un fichier situé dans un bucket Cloud Storage. Contrairement aux options inline et path, qui nécessitent un commit et un déploiement pour mettre à jour les valeurs du profil de ressource, un profil de ressource externe est résolu à chaque exécution du pipeline. Vous pouvez le mettre à jour sans redéployer le pipeline.

    Les remplacements peuvent être appliqués au profil de ressource spécifié avec la clé override. Les remplacements sont appliqués avec une fusion profonde sur le profil de pré-source fourni.

  • properties : carte des propriétés des tâches Spark.

Exemple :

engine:
  dataprocOnGce:
    ephemeralCluster:
      projectId: "example-project"
      location: "us-central1"
      clusterName: "example-ephemeral-cluster"
      resourceProfile:
        inline:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
            workerConfig:
              numInstances: 2
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
      properties:
        spark.submit.deployMode: "cluster"

dataprocServerless

Exécutez l'envoi par lot dans Managed Service pour Apache Spark.

Clés :

  • location : région dans laquelle le job Spark doit être exécuté.
  • impersonationChain : chaîne d'emprunt d'identité d'un compte de service à utiliser pour exécuter l'action.
  • resourceProfile : profil de ressources Managed Service pour Apache Spark.

    Vous pouvez spécifier un profil de ressource de l'une des manières suivantes :

    • inline : défini dans la configuration du pipeline.
    • path : défini dans un fichier situé au chemin d'accès relatif.
    • external_config_path : défini dans un fichier situé dans un bucket Cloud Storage. Contrairement aux options inline et path, qui nécessitent un commit et un déploiement pour mettre à jour les valeurs du profil de ressource, un profil de ressource externe est résolu à chaque exécution du pipeline. Vous pouvez le mettre à jour sans redéployer le pipeline.

    Les clés suivantes spécifient la configuration du profil de ressource :

    • environmentConfig : configuration de l'environnement
    • runtimeConfig : configuration de l'environnement d'exécution

    Pour obtenir la description des champs disponibles, consultez RuntimeConfig et EnvironmentConfig dans la documentation Managed Service pour Apache Spark.

    Les remplacements peuvent être appliqués au profil de ressource spécifié avec la clé override. Les remplacements sont appliqués avec une fusion profonde au profil de ressource fourni.

Exemple (intégré) :

engine:
  dataprocServerless:
    location: "us-central1"
    resourceProfile:
      inline:
        environmentConfig:
          executionConfig:
            serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
            networkUri: "projects/example-project/global/networks/default"
        runtimeConfig:
          version: "2.3"
          properties:
            spark.app.name: "run-notebook-on-dataproc-serverless"
            spark.executor.instances: "2"
            spark.driver.cores: "4"

Exemple (chemin externe et remplacements) :

      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                    spark.app.name: "run-notebook-on-dataproc-serverless"
                    spark.executor.instances: "2"
                    spark.driver.cores: "4"

bigQuery

Exécuter en tant que job BigQuery.

Clés :

Exemple :

      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"

interprétabilité locale

Exécuter localement dans l'environnement du runner.

Consultez l'action python pour savoir comment configurer l'environnement virtuel.

Exemple :

    engine:
      local: {}

Frameworks

Frameworks utilisés dans les actions.

dbt > airflowWorker

Exécutez un modèle dbt exécuté sur un nœud de calcul Airflow dans l'environnement du runner à l'aide de dbt-core.

Clés :

  • projectDirectoryPath : chemin relatif vers un dossier contenant le projet DBT.
  • selectModels : liste des modèles à inclure dans l'exécution par nom (équivalent à dbt --select).
  • tags : liste des modèles à inclure dans l'exécution par tag (équivalent à dbt --select).

Exemple :

framework:
  dbt:
    airflowWorker:
      projectDirectoryPath: "dbt_project"
      selectModels: ["model_1", "model_2"]

dataform > airflowWorker

Workflows Dataform exécutés sur un nœud de calcul Airflow dans l'environnement de l'exécuteur à l'aide de dataform core cli.

Clés :

  • projectDirectoryPath : chemin d'accès relatif à un dossier contenant les définitions du workflow Dataform.

Exemple :

framework:
  dataform:
    airflowWorker:
      projectDirectoryPath: "dataform_local"

dataform > dataformService

Exécute les workflows Dataform exécutés sur le service Dataform.

Clés :

  • location : emplacement du dépôt Dataform.
  • projectId : projet dans lequel se trouve le dépôt Dataform.
  • repositoryId : ID du dépôt Dataform
  • workflowInvocation : configuration de l'appel de workflow, qui spécifie les actions à exécuter. Consultez WorkflowInvocation.

Exemple :

framework:
  dataform:
    dataformService:
      location: "us-central1"
      projectId: "example-project"
      repositoryId: "example-repository"
      workflowInvocation:
        compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
        invocationConfig:
          serviceAccount: "example-account@example-project.iam.gserviceaccount.com"