DSL-Referenz für Orchestration Pipelines

Diese Seite enthält eine Referenz zur DSL für Orchestration Pipelines.

Einschränkung in der Vorschau

Während der Vorschau gelten für Orchestration Pipelines die folgenden Einschränkungen:

  • Für pyspark- und notebook-Aktionen:

    • Es wird nur eine requirements.txt-Datei für alle pyspark- und notebook-Aktionen unterstützt.
    • Die Windows-Plattform wird für das Erstellen von Paketen über das Tool uv nicht unterstützt.
    • Es werden nur Python-Pakete mit vorkompilierten Binärdateien unterstützt.
  • Für sql-Aktionen:

    • Die inline-Definition im query-Schlüssel wird nicht unterstützt.

Format und Werte

Pipelines werden im YAML-Format definiert und müssen in separaten Dateien in Ihrem Repository gespeichert werden, eine Datei pro Pipeline.

Orchestration Pipelines bietet mehrere Möglichkeiten, Variablen in Ihren Pipelinedefinitionen und der Bereitstellungskonfiguration zu verwenden. Sie können beispielsweise benutzerdefinierte Variablen definieren, GitHub-Secrets verwenden und Variablenwerte in der Befehlszeile ersetzen. Weitere Informationen finden Sie unter Variablen, Secrets und Substitution.

Weitere Informationen zum Hinzufügen zusätzlicher Pipelines zum Pipeline-Bundle finden Sie unter Weitere Pipeline hinzufügen.

Codebeispiele

Das Repository „orchestration-pipelines“ auf GitHub enthält die neuesten Codebeispiele für viele Kombinationen aus Pipelineaktionen und Engines. Wir empfehlen, diese Beispiele als Startpunkt für die Orchestration Pipelines-Funktionen zu verwenden.

Pipelinedefinition

Eine Pipeline-Definition hat die folgenden Schlüssel auf oberster Ebene:

  • modelVersion: Die Version des Pipeline-Definitionsmodells. Die aktuelle Modellversion ist 1.0.

  • pipelineId: Eine eindeutige Kennung für die Pipeline. Diese ID bleibt über mehrere Bereitstellungen und Versionen hinweg konsistent und ermöglicht so das Tracking und die Verwaltung der logischen Pipeline-Entität.

  • description: Beschreibung der Pipeline, die der Beschreibung des Airflow-DAG in der Runner-Umgebung zugeordnet wird.

  • owner: Inhaber der Pipeline.

  • tags: String-Kennzeichnungen, die auf die Pipeline angewendet werden und zum Filtern der Pipelines verwendet werden.

  • notifications: Benachrichtigungen zu Pipelineereignissen. Unterstützte Benachrichtigungstypen:

    • onPipelineFailure: E-Mail bei Pipelinefehlern.

    Für Benachrichtigungen müssen SendGrid-E-Mail-Dienste in Ihrer Runner-Umgebung konfiguriert sein. Eine Anleitung finden Sie unter E-Mail-Benachrichtigungen konfigurieren.

    Beispiel:

    notifications:
      onPipelineFailure:
        email: ["user1@example.com", "user2@example.com"]
    
  • runner: Gibt die Ziel-Orchestrierungs-Engine an. Reserviert für zukünftige Verwendungen. Setzen Sie diesen Wert auf airflow.

  • defaults: Legt Standardwerte für Attribute wie project_id, location und executionConfig fest, die für alle Aktionen gelten, sofern sie nicht in einer bestimmten Aktion überschrieben werden. Die Attribute project_id und location können durch einzelne Aktionsattribute überschrieben werden. Die executionConfig-Eigenschaft kann in einzelnen Aktionen nicht überschrieben werden. Sie gibt die Anzahl der Wiederholungsversuche für alle Aktionen in der Pipeline im Feld retries an.

  • triggers: Definiert, wie die Pipeline initiiert wird:

    • Kein Wert. Die Pipeline kann weiterhin manuell ausgelöst werden.

    • schedule. Die Pipeline wird nach einem Zeitplan mit Cron-Ausdrücken ausgelöst.

      Beispiel für einen Zeitplan:

      triggers:
        - schedule:
            interval: "0 5 * * *"
            startTime: "2025-10-01T00:00:00"
            endTime: "2026-10-01T00:00:00"
            catchup: false
            timezone: "UTC"
      
  • actions

    Eine Zuordnung der auszuführenden Aufgaben. Jeder Zuordnungseintrag entspricht einer Aktion. Weitere Informationen finden Sie unter Aktionen.

Aktionen

Pipelineaktionen definieren einzelne Schritte bei der Pipelineausführung. Für jede Aktion muss eine Engine oder ein Framework angegeben werden. Die Engine oder das Framework bestimmt, welche Ressourcen zum Ausführen der Aktion verwendet werden.

Orchestration Pipelines unterstützt die folgenden Aktionen:

  • Pyspark (pyspark): Führt ein PySpark-Script aus.
  • Notebook (notebook): Führt eine Notebook-Datei aus.
  • SQL-Abfrage (notebook): Führen Sie eine SQL-Abfrage aus.
  • Python (python): Führt ein Python-Skript aus.
  • Pipeline (pipeline): Führt eine Datenverarbeitungspipeline aus.

Orchestration Pipelines unterstützt die folgenden Engines und Frameworks:

  • dataprocOnGce > existingCluster: Managed Service for Apache Spark-Cluster, der durch clusterName, project und location identifiziert wird.

  • dataprocOnGce > ephemeral: Managed Service for Apache Spark-Cluster, der nach Ausführung des Jobs erstellt und gelöscht wird.

  • dataprocServerless: Managed Service for Apache Spark Batch-Übermittlung.

  • bigquery: BigQuery-Job.

  • python > local: Python-Skript, das auf einem Airflow-Worker in der Runner-Umgebung ausgeführt wird.

  • dbt > airflowWorker: dbt-Modelle, die auf einem Airflow-Worker in der Runner-Umgebung mit dbt-core ausgeführt werden.

  • dataform > airflowWorker: Dataform-Workflows, die auf einem Airflow-Worker in der Runner-Umgebung mit der Dataform Core-Befehlszeile ausgeführt werden.

  • dataform > dataformService: Dataform-Workflows, die im Dataform-Dienst ausgeführt werden.

In der folgenden Tabelle sind mögliche Kombinationen aus Aktionstyp, Engine und Framework aufgeführt. Beispiele für Aktionscodes finden Sie in den Beschreibungen der Engines und Frameworks.

Aktion Modul oder Framework Ausgaben für
pyspark dataprocOnGce > existingCluster Joblogs für Managed Service for Apache Spark
pyspark dataprocOnGce > ephemeralCluster Joblogs für Managed Service for Apache Spark
pyspark dataprocServerless Batchlogs für Managed Service for Apache Spark
notebook dataprocOnGce > existingCluster Runner-Bucket im Verzeichnis composer_declarative_dags_resources
notebook dataprocOnGce > ephemeralCluster Joblogs für Managed Service for Apache Spark
notebook dataprocServerless Runner-Bucket im Verzeichnis composer_declarative_dags_resources
sql bigquery Tabelle, die im Parameter destinationTable angegeben ist
sql dataprocServerless Batchlogs für Managed Service for Apache Spark.
python local (lokale Ausführung) Logs
pipeline dbt > airflowWorker Logs und BigQuery
pipeline dataform > airflowWorker Angegebene Tabelle in BigQuery
pipeline dataform > dataformService In Dataform

Alle Aktionen haben die folgenden gemeinsamen Schlüssel. Andere Schlüssel hängen vom Aktionstyp ab.

  • name: Name der Aktion. Dieser Name wird in der Runner-Umgebung dem Airflow-Aufgabennamen zugeordnet. Wenn für eine Aktion mehr als eine Airflow-Aufgabe erforderlich ist, wird dieser Name der Aufgabengruppe zugeordnet.

  • dependsOn: Eine Liste mit Namen von Upstream-Aktionen, von denen diese Aktion abhängt. Dadurch wird die Ausführungsreihenfolge festgelegt. Wenn eine der vorgelagerten Aktionen fehlschlägt, werden die davon abhängigen nachgelagerten Aktionen nicht ausgeführt.

  • executionTimeout: Zeitlimit für die Ausführung der Aktion. Beispiele: 1h, 30m, 40s.

Python

Aktionen vom Typ python. Python-Scripts ausführen

Aktionstypspezifische Schlüssel:

  • mainFilePath: Relativer Pfad zur Python-Skriptdatei.
  • pythonCallable: Name der aufrufbaren Python-Funktion, die im Python-Skript ausgeführt werden soll.
  • opKwargs: Eine Zuordnung von Schlüsselwortargumenten für den Operator.
  • (Optional) environment: Führen Sie das Skript in einer dynamisch erstellten virtuellen Python-Umgebung aus.

    • requirements: Anforderungen an die virtuelle Umgebung. Die Anforderungen werden zur Laufzeit erfüllt.

      • inline: Anforderungen werden inline angegeben.

        • list: Liste der Anforderungen. Einzelne Anforderungen gemäß PEP-508 auflisten.

          Beispiel:

          environment:
            requirements:
              inline:
                list: ["pandas>=2.0.0"]
          
      • (Alternativ) path: Pfad zur Datei mit den Anforderungen. Die Anforderungen in dieser Datei müssen gemäß PEP-508 aufgeführt werden.

        Beispiel:

        environment:
          requirements:
            path: "scripts/requirements.txt"
        
    • systemSitePackages: Wenn true, erbt die virtuelle Umgebung Pakete aus dem Verzeichnis „site-packages“ des Airflow-Workers. Sie können benutzerdefinierte PyPI-Pakete in Ihrer Runner-Umgebung installieren.

  • engine:

    • local: Lokale Ausführung in der Runner-Umgebung

Beispiel:

lokal

modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 1

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - python:
      name: "first_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        systemSitePackages: true
        requirements:
          path: "scripts/requirements.txt"

  - python:
      name: "second_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_1.py"
      pythonCallable: "main"
      engine:
        local: {}
      environment:
        requirements:
          inline:
            list: ["pandas>=2.0.0"]
        systemSitePackages: true
      dependsOn: ["first_script_run"]

  - python:
      name: "third_script_run"
      executionTimeout: "30m"
      mainFilePath: "scripts/venv_test_script_2.py"
      pythonCallable: "main"
      engine:
        local: {}
      opKwargs:
        api_endpoint: "https://api.my-vendor.example.com/v1/status"
        api_key_secret_name: "my-vendor-api-key"
      dependsOn: ["first_script_run"]

pyspark

Aktionen vom Typ pyspark. PySpark-Scripts ausführen

Aktionstypspezifische Schlüssel:

  • mainFilePath: Relativer Pfad zum PySpark-Script.
  • archiveUris: Eine Liste von Archiv-URIs, die mit dieser Aktion verwendet werden sollen.
  • stagingBucket: Cloud Storage-Bucket, der für diese Aktion verwendet werden soll.
  • pyFiles: Eine Liste der Python-Dateien, die für diesen Spark-Job verwendet werden sollen.
  • environment: Konfiguration der Python-Umgebung.

    • requirements: Zu verwendende Python-Anforderungsdatei.

      • path: Pfad zur Datei mit den Anforderungen. Die Anforderungen in dieser Datei müssen gemäß PEP-508 aufgeführt werden.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Beispiele:

existingCluster

modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 4 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pyspark:
      name: "run-pyspark-with-pyfiles-on-existing-cluster"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "example-dataproc-cluster"
            location: "us-central1"
            projectId: "example-project"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"

ephemeralCluster

pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - pyspark:
      name: "run_dataproc_ephemeral"
      executionTimeout: "1h"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            projectId: "example-project"
            location: "us-central1"
            clusterName: "ephemeral-cluster-inline"
            resourceProfile:
              inline:
                config:
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"
                    diskConfig:
                      bootDiskType: "pd-standard"
                      bootDiskSizeGb: 1024
            properties:
              spark.submit.deployMode: "cluster"

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pyspark:
      name: "run-pyspark-on-dataproc-serverless"
      mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
      pyFiles:
        - "data/lib1.py"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            inline:
              environmentConfig:
                executionConfig:
                  serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
                  networkUri: "projects/example-project/global/networks/default"
              runtimeConfig:
                version: "2.3"
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

Notebook

Aktionen vom Typ notebook. Ein .ipynb-Notebook mit Papermill ausführen.

Aktionstypspezifische Schlüssel:

  • mainFilePath: relativer Pfad zur Notebook-Datei.
  • archiveUris: Eine Liste von Archiv-URIs, die mit dieser Aktion verwendet werden sollen.
  • stagingBucket: Cloud Storage-Bucket, der für diese Aktion verwendet werden soll.
  • environment: Konfiguration der Python-Umgebung.

    • requirements: Zu verwendende Python-Anforderungsdatei.

      • path: Pfad zur Datei mit den Anforderungen. Die Anforderungen in dieser Datei müssen gemäß PEP-508 aufgeführt werden.
  • engine:

    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster
    • dataprocServerless

Beispiel:

dataprocServerless

modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - notebook:
      name: "run-notebook-on-dataproc-serverless"
      mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
      archiveUris:
        - "gs://example-bucket-additional-data/custom_venv.tar.gz"
      staging_bucket: "example-bucket-additional-data-additional-data"
      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                  spark.app.name: "run-notebook-on-dataproc-serverless"
                  spark.executor.instances: "2"
                  spark.driver.cores: "4"

sql

Aktionen vom Typ sql. SQL-Abfragen ausführen.

Aktionstypspezifische Schlüssel:

  • query: Definiert eine Abfrage.

    • path: Die Abfrage wird in einer Datei definiert, die sich im relativen Pfad zur Bereitstellungskonfigurationsdatei befindet.
    • inline: Die Abfrage wird inline definiert.

  • engine:

    • bigQuery
    • dataprocServerless
    • dataprocOnGce > existingCluster
    • dataprocOnGce > ephemeralCluster

bigQuery

modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run_bigquery_insert_job_create"
      query:
        inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
      engine:
        bigquery:
          location: "US"
  - sql:
      name: "run_bigquery_insert_job_select"
      query:
        path: "sql-scripts/count_rows.sql"
      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"
      dependsOn:
        - "run_bigquery_insert_job_create"

dataprocServerless

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-dataproc"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocServerless:
          location: "us-central1"
          impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
          resourceProfile:
            inline:
              runtimeConfig:
                properties:
                  spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
                  spark.sql.catalog.bigquery.project: "example-project"
                  dataproc.sparkBqConnector.version: "0.42.3"

existingCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-existing-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          existingCluster:
            clusterName: "cluster-sql"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"

ephemeralCluster

modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"

actions:
  - sql:
      name: "run-sql-on-ephemeral-cluster"
      query:
        path: "sql-scripts/test_query.sql"
      engine:
        dataprocOnGce:
          ephemeralCluster:
            clusterName: "example-ephemeral-cluster"
            projectId: "example-project"
            location: "us-central1"
            impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
            properties:
              spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
              spark.sql.catalog.bigquery.project: "example-project"
              # This field is needed and important for the Spark-BigQuery connector.
              spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
            resourceProfile:
              inline:
                clusterConfig:
                  gceClusterConfig:
                    zoneUri: "us-central1-a"
                    metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
                      SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
                  masterConfig:
                    numInstances: 1
                    machineTypeUri: "n1-standard-4"
                  workerConfig:
                    numInstances: 2
                    machineTypeUri: "n1-standard-4"

Pipeline

Aktionen vom Typ pipeline. Datenverarbeitungs-Pipeline ausführen

Aktionstypspezifische Schlüssel:

  • framework:

    • dbt
    • dataform > airflowWorker
    • dataform > dataformService

Beispiele:

Logo: dbt

modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "dbt-action"
      framework:
        dbt:
          airflowWorker:
            projectDirectoryPath: "dbt_project"
            selectModels: ["model_1", "model_2"]

dataform>airflowWorker

modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0
triggers:
  - schedule:
      interval: "0 5 * * *"
      startTime: "2025-10-01T00:00:00"
      endTime: "2026-10-01T00:00:00"
      catchup: false
      timezone: "UTC"
actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          airflowWorker:
            projectDirectoryPath: "dataform_local"

dataform>dataformService

modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"

defaults:
  projectId: "example-project"
  location: "us-central1"
  executionConfig:
    retries: 0

actions:
  - pipeline:
      name: "run_dataform"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"

  - pipeline:
      name: "run_dataform_compilation"
      framework:
        dataform:
          dataformService:
            location: "us-central1"
            projectId: "example-project"
            repositoryId: "example-repository"
            workflowInvocation:
              compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
              invocationConfig:
                serviceAccount: "example-account@example-project.iam.gserviceaccount.com"

Suchmaschinen

In Aktionen verwendete Engines.

dataprocOnGce > existingCluster

In einem vorhandenen Managed Service for Apache Spark-Cluster ausführen, der durch clusterName, project und location identifiziert wird.

Sie können den angegebenen Cluster in Ihrer Bereitstellungskonfiguration oder manuell in Managed Service for Apache Spark verwalten. Wir empfehlen, den Cluster regelmäßig zu aktualisieren.

Schlüssel:

  • clusterName: Name des Clusters
  • location: Region, in der sich der Cluster befindet
  • projectId: Projekt-ID des Projekts, in dem sich der Cluster befindet
  • properties: Eine Karte der Spark-Jobattribute.

Beispiel:

engine:
  dataprocOnGce:
    existingCluster:
      clusterName: "example-dataproc-cluster"
      location: "us-central1"
      projectId: "example-project"
      impersonationChain: "example-account@example-project.iam.gserviceaccount.com"

dataprocOnGce > ephemeralCluster

Ausführung in einem kurzlebigen Managed Service for Apache Spark-Cluster, der nach der Ausführung des Jobs erstellt und gelöscht wird.

Schlüssel:

  • clusterName: Name des Clusters
  • location: Region, in der sich der Cluster befindet
  • projectId: Projekt-ID des Projekts, in dem sich der Cluster befindet
  • impersonationChain: Die Kette zur Identitätsübernahme des Dienstkontos, die zum Ausführen der Aktion verwendet werden soll.
  • resourceProfile: Ressourcenprofil für Managed Service for Apache Spark-Cluster.

    Eine Beschreibung der verfügbaren Felder finden Sie in der Managed Service for Apache Spark-Dokumentation unter ClusterConfig.

    Ein Ressourcenprofil kann auf folgende Weise angegeben werden:

    • inline: als Teil der Pipelinekonfiguration definiert.
    • path: in einer Datei definiert, die sich im relativen Pfad befindet.
    • external_config_path: in einer Datei in einem Cloud Storage-Bucket definiert. Im Gegensatz zu den Optionen inline und path, für die ein Commit und eine Bereitstellung erforderlich sind, um die Werte des Ressourcenprofils zu aktualisieren, wird ein externes Ressourcenprofil bei jedem Pipeline-Lauf aufgelöst. Sie können es also aktualisieren, ohne die Pipeline neu bereitzustellen.

    Mit dem Schlüssel override können Überschreibungen auf das angegebene Ressourcenprofil angewendet werden. Überschreibungen werden mit Deep Merge auf das bereitgestellte Quellprofil angewendet.

  • properties: Eine Karte der Spark-Jobattribute.

Beispiel:

engine:
  dataprocOnGce:
    ephemeralCluster:
      projectId: "example-project"
      location: "us-central1"
      clusterName: "example-ephemeral-cluster"
      resourceProfile:
        inline:
          config:
            masterConfig:
              numInstances: 1
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
            workerConfig:
              numInstances: 2
              machineTypeUri: "n1-standard-4"
              diskConfig:
                bootDiskType: "pd-standard"
                bootDiskSizeGb: 1024
      properties:
        spark.submit.deployMode: "cluster"

dataprocServerless

Führen Sie die Batchübermittlung in Managed Service for Apache Spark aus.

Schlüssel:

  • location: Region, in der der Spark-Job ausgeführt werden muss.
  • impersonationChain: Identitätsübernahmekette des Dienstkontos, die zum Ausführen der Aktion verwendet werden soll.
  • resourceProfile: Ressourcenprofil für Managed Service for Apache Spark.

    Ein Ressourcenprofil kann auf folgende Weise angegeben werden:

    • inline: als Teil der Pipelinekonfiguration definiert.
    • path: in einer Datei definiert, die sich im relativen Pfad befindet.
    • external_config_path: in einer Datei in einem Cloud Storage-Bucket definiert. Im Gegensatz zu den Optionen inline und path, für die ein Commit und eine Bereitstellung erforderlich sind, um die Werte des Ressourcenprofils zu aktualisieren, wird ein externes Ressourcenprofil bei jedem Pipeline-Lauf aufgelöst. Sie können es also aktualisieren, ohne die Pipeline neu bereitzustellen.

    Die folgenden Schlüssel geben die Konfiguration des Ressourcenprofils an:

    • environmentConfig: Umgebungskonfiguration
    • runtimeConfig: Laufzeitkonfiguration

    Eine Beschreibung der verfügbaren Felder finden Sie in der Dokumentation zu Managed Service for Apache Spark unter RuntimeConfig und EnvironmentConfig.

    Mit dem Schlüssel override können Überschreibungen auf das angegebene Ressourcenprofil angewendet werden. Überschreibungen werden mit Deep Merge auf das bereitgestellte Ressourcenprofil angewendet.

Beispiel (inline):

engine:
  dataprocServerless:
    location: "us-central1"
    resourceProfile:
      inline:
        environmentConfig:
          executionConfig:
            serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
            networkUri: "projects/example-project/global/networks/default"
        runtimeConfig:
          version: "2.3"
          properties:
            spark.app.name: "run-notebook-on-dataproc-serverless"
            spark.executor.instances: "2"
            spark.driver.cores: "4"

Beispiel (externer Pfad und Überschreibungen):

      engine:
        dataprocServerless:
          location: "us-central1"
          resourceProfile:
            externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
            overrides:
              runtimeConfig:
                properties:
                    spark.app.name: "run-notebook-on-dataproc-serverless"
                    spark.executor.instances: "2"
                    spark.driver.cores: "4"

bigQuery

Als BigQuery-Job ausführen

Schlüssel:

  • location: Region, in der sich die Zieltabelle befindet.
  • destinationTable: BigQuery-Tabelle für die Ausgabe der Daten
  • impersonationChain: Die Identitätsübernahmekette des Dienstkontos, die zum Ausführen der Aktion verwendet werden soll.

Beispiel:

      engine:
        bigquery:
          location: "US"
          destinationTable: "example-project.example_dataset.example_table_query_results"

lokal

Lokal in der Runner-Umgebung ausführen.

Informationen zum Konfigurieren der virtuellen Umgebung finden Sie unter der Aktion python.

Beispiel:

    engine:
      local: {}

Frameworks

In Aktionen verwendete Frameworks.

dbt > airflowWorker

Führen Sie ein dbt-Modell aus, das auf einem Airflow-Worker in der Runner-Umgebung ausgeführt wird, indem Sie dbt-core verwenden.

Schlüssel:

  • projectDirectoryPath: Relativer Pfad zu einem Ordner, der das DBT-Projekt enthält.
  • selectModels: Liste der Modelle, die nach Namen in die Ausführung aufgenommen werden sollen (entspricht dbt --select).
  • tags: Liste der Modelle, die nach Tag in den Lauf aufgenommen werden sollen (entspricht dbt --select).

Beispiel:

framework:
  dbt:
    airflowWorker:
      projectDirectoryPath: "dbt_project"
      selectModels: ["model_1", "model_2"]

dataform > airflowWorker

Dataform-Workflows, die auf einem Airflow-Worker in der Runner-Umgebung mit der Dataform Core-Befehlszeile ausgeführt werden.

Schlüssel:

  • projectDirectoryPath: Relativer Pfad zu einem Ordner, der die Dataform-Workflowdefinitionen enthält.

Beispiel:

framework:
  dataform:
    airflowWorker:
      projectDirectoryPath: "dataform_local"

dataform > dataformService

Führt Dataform-Workflows aus, die im Dataform-Dienst ausgeführt werden.

Schlüssel:

  • location: Speicherort des Dataform-Repositorys.
  • projectId: Das Projekt, in dem sich das Dataform-Repository befindet.
  • repositoryId: Dataform-Repository-ID
  • workflowInvocation: Konfiguration für den Workflow-Aufruf, in der angegeben wird, welche Aktionen ausgeführt werden sollen. Weitere Informationen finden Sie unter WorkflowInvocation.

Beispiel:

framework:
  dataform:
    dataformService:
      location: "us-central1"
      projectId: "example-project"
      repositoryId: "example-repository"
      workflowInvocation:
        compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
        invocationConfig:
          serviceAccount: "example-account@example-project.iam.gserviceaccount.com"