Diese Seite enthält eine Referenz zur DSL für Orchestration Pipelines.
Einschränkung in der Vorschau
Während der Vorschau gelten für Orchestration Pipelines die folgenden Einschränkungen:
Für
pyspark- undnotebook-Aktionen:- Es wird nur eine
requirements.txt-Datei für allepyspark- undnotebook-Aktionen unterstützt. - Die Windows-Plattform wird für das Erstellen von Paketen über das Tool
uvnicht unterstützt. - Es werden nur Python-Pakete mit vorkompilierten Binärdateien unterstützt.
- Es wird nur eine
Für
sql-Aktionen:- Die
inline-Definition imquery-Schlüssel wird nicht unterstützt.
- Die
Format und Werte
Pipelines werden im YAML-Format definiert und müssen in separaten Dateien in Ihrem Repository gespeichert werden, eine Datei pro Pipeline.
Orchestration Pipelines bietet mehrere Möglichkeiten, Variablen in Ihren Pipelinedefinitionen und der Bereitstellungskonfiguration zu verwenden. Sie können beispielsweise benutzerdefinierte Variablen definieren, GitHub-Secrets verwenden und Variablenwerte in der Befehlszeile ersetzen. Weitere Informationen finden Sie unter Variablen, Secrets und Substitution.
Weitere Informationen zum Hinzufügen zusätzlicher Pipelines zum Pipeline-Bundle finden Sie unter Weitere Pipeline hinzufügen.
Codebeispiele
Das Repository „orchestration-pipelines“ auf GitHub enthält die neuesten Codebeispiele für viele Kombinationen aus Pipelineaktionen und Engines. Wir empfehlen, diese Beispiele als Startpunkt für die Orchestration Pipelines-Funktionen zu verwenden.
Pipelinedefinition
Eine Pipeline-Definition hat die folgenden Schlüssel auf oberster Ebene:
modelVersion: Die Version des Pipeline-Definitionsmodells. Die aktuelle Modellversion ist1.0.pipelineId: Eine eindeutige Kennung für die Pipeline. Diese ID bleibt über mehrere Bereitstellungen und Versionen hinweg konsistent und ermöglicht so das Tracking und die Verwaltung der logischen Pipeline-Entität.description: Beschreibung der Pipeline, die der Beschreibung des Airflow-DAG in der Runner-Umgebung zugeordnet wird.owner: Inhaber der Pipeline.tags: String-Kennzeichnungen, die auf die Pipeline angewendet werden und zum Filtern der Pipelines verwendet werden.notifications: Benachrichtigungen zu Pipelineereignissen. Unterstützte Benachrichtigungstypen:onPipelineFailure: E-Mail bei Pipelinefehlern.
Für Benachrichtigungen müssen SendGrid-E-Mail-Dienste in Ihrer Runner-Umgebung konfiguriert sein. Eine Anleitung finden Sie unter E-Mail-Benachrichtigungen konfigurieren.
Beispiel:
notifications: onPipelineFailure: email: ["user1@example.com", "user2@example.com"]runner: Gibt die Ziel-Orchestrierungs-Engine an. Reserviert für zukünftige Verwendungen. Setzen Sie diesen Wert aufairflow.defaults: Legt Standardwerte für Attribute wieproject_id,locationundexecutionConfigfest, die für alle Aktionen gelten, sofern sie nicht in einer bestimmten Aktion überschrieben werden. Die Attributeproject_idundlocationkönnen durch einzelne Aktionsattribute überschrieben werden. DieexecutionConfig-Eigenschaft kann in einzelnen Aktionen nicht überschrieben werden. Sie gibt die Anzahl der Wiederholungsversuche für alle Aktionen in der Pipeline im Feldretriesan.triggers: Definiert, wie die Pipeline initiiert wird:Kein Wert. Die Pipeline kann weiterhin manuell ausgelöst werden.
schedule. Die Pipeline wird nach einem Zeitplan mit Cron-Ausdrücken ausgelöst.Beispiel für einen Zeitplan:
triggers: - schedule: interval: "0 5 * * *" startTime: "2025-10-01T00:00:00" endTime: "2026-10-01T00:00:00" catchup: false timezone: "UTC"
actionsEine Zuordnung der auszuführenden Aufgaben. Jeder Zuordnungseintrag entspricht einer Aktion. Weitere Informationen finden Sie unter Aktionen.
Aktionen
Pipelineaktionen definieren einzelne Schritte bei der Pipelineausführung. Für jede Aktion muss eine Engine oder ein Framework angegeben werden. Die Engine oder das Framework bestimmt, welche Ressourcen zum Ausführen der Aktion verwendet werden.
Orchestration Pipelines unterstützt die folgenden Aktionen:
- Pyspark (
pyspark): Führt ein PySpark-Script aus. - Notebook (
notebook): Führt eine Notebook-Datei aus. - SQL-Abfrage (
notebook): Führen Sie eine SQL-Abfrage aus. - Python (
python): Führt ein Python-Skript aus. - Pipeline (
pipeline): Führt eine Datenverarbeitungspipeline aus.
Orchestration Pipelines unterstützt die folgenden Engines und Frameworks:
dataprocOnGce>existingCluster: Managed Service for Apache Spark-Cluster, der durch clusterName, project und location identifiziert wird.dataprocOnGce>ephemeral: Managed Service for Apache Spark-Cluster, der nach Ausführung des Jobs erstellt und gelöscht wird.dataprocServerless: Managed Service for Apache Spark Batch-Übermittlung.bigquery: BigQuery-Job.python>local: Python-Skript, das auf einem Airflow-Worker in der Runner-Umgebung ausgeführt wird.dbt>airflowWorker: dbt-Modelle, die auf einem Airflow-Worker in der Runner-Umgebung mitdbt-coreausgeführt werden.dataform>airflowWorker: Dataform-Workflows, die auf einem Airflow-Worker in der Runner-Umgebung mit der Dataform Core-Befehlszeile ausgeführt werden.dataform>dataformService: Dataform-Workflows, die im Dataform-Dienst ausgeführt werden.
In der folgenden Tabelle sind mögliche Kombinationen aus Aktionstyp, Engine und Framework aufgeführt. Beispiele für Aktionscodes finden Sie in den Beschreibungen der Engines und Frameworks.
| Aktion | Modul oder Framework | Ausgaben für |
|---|---|---|
pyspark |
dataprocOnGce > existingCluster |
Joblogs für Managed Service for Apache Spark |
pyspark |
dataprocOnGce > ephemeralCluster |
Joblogs für Managed Service for Apache Spark |
pyspark |
dataprocServerless |
Batchlogs für Managed Service for Apache Spark |
notebook |
dataprocOnGce > existingCluster |
Runner-Bucket im Verzeichnis composer_declarative_dags_resources |
notebook |
dataprocOnGce > ephemeralCluster |
Joblogs für Managed Service for Apache Spark |
notebook |
dataprocServerless |
Runner-Bucket im Verzeichnis composer_declarative_dags_resources |
sql |
bigquery |
Tabelle, die im Parameter destinationTable angegeben ist |
sql |
dataprocServerless |
Batchlogs für Managed Service for Apache Spark. |
python |
local (lokale Ausführung) |
Logs |
pipeline |
dbt > airflowWorker |
Logs und BigQuery |
pipeline |
dataform > airflowWorker |
Angegebene Tabelle in BigQuery |
pipeline |
dataform > dataformService |
In Dataform |
Alle Aktionen haben die folgenden gemeinsamen Schlüssel. Andere Schlüssel hängen vom Aktionstyp ab.
name: Name der Aktion. Dieser Name wird in der Runner-Umgebung dem Airflow-Aufgabennamen zugeordnet. Wenn für eine Aktion mehr als eine Airflow-Aufgabe erforderlich ist, wird dieser Name der Aufgabengruppe zugeordnet.dependsOn: Eine Liste mit Namen von Upstream-Aktionen, von denen diese Aktion abhängt. Dadurch wird die Ausführungsreihenfolge festgelegt. Wenn eine der vorgelagerten Aktionen fehlschlägt, werden die davon abhängigen nachgelagerten Aktionen nicht ausgeführt.executionTimeout: Zeitlimit für die Ausführung der Aktion. Beispiele:1h,30m,40s.
Python
Aktionen vom Typ python. Python-Scripts ausführen
Aktionstypspezifische Schlüssel:
mainFilePath: Relativer Pfad zur Python-Skriptdatei.pythonCallable: Name der aufrufbaren Python-Funktion, die im Python-Skript ausgeführt werden soll.opKwargs: Eine Zuordnung von Schlüsselwortargumenten für den Operator.(Optional)
environment: Führen Sie das Skript in einer dynamisch erstellten virtuellen Python-Umgebung aus.requirements: Anforderungen an die virtuelle Umgebung. Die Anforderungen werden zur Laufzeit erfüllt.inline: Anforderungen werden inline angegeben.list: Liste der Anforderungen. Einzelne Anforderungen gemäß PEP-508 auflisten.Beispiel:
environment: requirements: inline: list: ["pandas>=2.0.0"]
(Alternativ)
path: Pfad zur Datei mit den Anforderungen. Die Anforderungen in dieser Datei müssen gemäß PEP-508 aufgeführt werden.Beispiel:
environment: requirements: path: "scripts/requirements.txt"
systemSitePackages: Wenntrue, erbt die virtuelle Umgebung Pakete aus dem Verzeichnis „site-packages“ des Airflow-Workers. Sie können benutzerdefinierte PyPI-Pakete in Ihrer Runner-Umgebung installieren.
engine:local: Lokale Ausführung in der Runner-Umgebung
Beispiel:
lokal
modelVersion: "1.0"
pipelineId: "python-virtual-env"
description: "A pipeline that runs Python script on isolated virtual environment with specified dependencies."
runner: "airflow"
owner: "data-eng-team"
tags: ["python_virtual_environment", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 1
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- python:
name: "first_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
systemSitePackages: true
requirements:
path: "scripts/requirements.txt"
- python:
name: "second_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_1.py"
pythonCallable: "main"
engine:
local: {}
environment:
requirements:
inline:
list: ["pandas>=2.0.0"]
systemSitePackages: true
dependsOn: ["first_script_run"]
- python:
name: "third_script_run"
executionTimeout: "30m"
mainFilePath: "scripts/venv_test_script_2.py"
pythonCallable: "main"
engine:
local: {}
opKwargs:
api_endpoint: "https://api.my-vendor.example.com/v1/status"
api_key_secret_name: "my-vendor-api-key"
dependsOn: ["first_script_run"]
pyspark
Aktionen vom Typ pyspark. PySpark-Scripts ausführen
Aktionstypspezifische Schlüssel:
mainFilePath: Relativer Pfad zum PySpark-Script.archiveUris: Eine Liste von Archiv-URIs, die mit dieser Aktion verwendet werden sollen.stagingBucket: Cloud Storage-Bucket, der für diese Aktion verwendet werden soll.pyFiles: Eine Liste der Python-Dateien, die für diesen Spark-Job verwendet werden sollen.environment: Konfiguration der Python-Umgebung.requirements: Zu verwendende Python-Anforderungsdatei.path: Pfad zur Datei mit den Anforderungen. Die Anforderungen in dieser Datei müssen gemäß PEP-508 aufgeführt werden.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
Beispiele:
existingCluster
modelVersion: "1.0"
pipelineId: "dataproc-existing-cluster-script-pipeline-pyfiles"
description: "A pipeline with a Dataproc Existing cluster running a PySpark task with additional python files."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example", "pyfiles"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 4 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run-pyspark-with-pyfiles-on-existing-cluster"
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
ephemeralCluster
pipelineId: "dataproc-ephemeral-inline-pyspark"
description: "A pipeline with a Dataproc Ephemeral cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
modelVersion: "1.0"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pyspark:
name: "run_dataproc_ephemeral"
executionTimeout: "1h"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "ephemeral-cluster-inline"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-pyspark"
description: "A pipeline with a Batch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pyspark:
name: "run-pyspark-on-dataproc-serverless"
mainFilePath: "scripts/my_spark_job_with_pyfiles.py"
pyFiles:
- "data/lib1.py"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
Notebook
Aktionen vom Typ notebook. Ein .ipynb-Notebook mit Papermill ausführen.
Aktionstypspezifische Schlüssel:
mainFilePath: relativer Pfad zur Notebook-Datei.archiveUris: Eine Liste von Archiv-URIs, die mit dieser Aktion verwendet werden sollen.stagingBucket: Cloud Storage-Bucket, der für diese Aktion verwendet werden soll.environment: Konfiguration der Python-Umgebung.requirements: Zu verwendende Python-Anforderungsdatei.path: Pfad zur Datei mit den Anforderungen. Die Anforderungen in dieser Datei müssen gemäß PEP-508 aufgeführt werden.
engine:dataprocOnGce>existingClusterdataprocOnGce>ephemeralClusterdataprocServerless
Beispiel:
dataprocServerless
modelVersion: "1.0"
pipelineId: "dataproc-create-batch-pipeline-resource-profile-gcs-overrides"
description: "A pipeline with a DataprocCreateBatch task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc_create_batch", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- notebook:
name: "run-notebook-on-dataproc-serverless"
mainFilePath: "scripts/notebookWithArchivesCheck.ipynb"
archiveUris:
- "gs://example-bucket-additional-data/custom_venv.tar.gz"
staging_bucket: "example-bucket-additional-data-additional-data"
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
sql
Aktionen vom Typ sql. SQL-Abfragen ausführen.
Aktionstypspezifische Schlüssel:
query: Definiert eine Abfrage.path: Die Abfrage wird in einer Datei definiert, die sich im relativen Pfad zur Bereitstellungskonfigurationsdatei befindet.inline: Die Abfrage wird inline definiert.
engine:bigQuerydataprocServerlessdataprocOnGce>existingClusterdataprocOnGce>ephemeralCluster
bigQuery
modelVersion: "1.0"
pipelineId: "sql-on-bigquery"
description: "A pipeline with a BigQueryInsertJob task."
runner: "airflow"
tags: ["bigquery", "example"]
owner: "data-eng-team"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run_bigquery_insert_job_create"
query:
inline: "CREATE TABLE IF NOT EXISTS `example-project.example_dataset.example_table` (id INT64, name STRING, timestamp TIMESTAMP );"
engine:
bigquery:
location: "US"
- sql:
name: "run_bigquery_insert_job_select"
query:
path: "sql-scripts/count_rows.sql"
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
dependsOn:
- "run_bigquery_insert_job_create"
dataprocServerless
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-serverless"
description: "A pipeline with a cluster running the PySpark task."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-serverless", "example"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-dataproc"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocServerless:
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
resourceProfile:
inline:
runtimeConfig:
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
dataproc.sparkBqConnector.version: "0.42.3"
existingCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-existing"
description: "A pipeline running a SQL query on an existing Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "existing-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-existing-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
existingCluster:
clusterName: "cluster-sql"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
ephemeralCluster
modelVersion: "1.0"
pipelineId: "sql-on-dataproc-gce-ephemeral"
description: "A pipeline running a SQL query on an ephemeral Dataproc cluster."
runner: "airflow"
owner: "data-eng-team"
tags: ["dataproc-gce", "example", "ephemeral-cluster"]
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- sql:
name: "run-sql-on-ephemeral-cluster"
query:
path: "sql-scripts/test_query.sql"
engine:
dataprocOnGce:
ephemeralCluster:
clusterName: "example-ephemeral-cluster"
projectId: "example-project"
location: "us-central1"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
properties:
spark.sql.catalog.bigquery: "com.google.cloud.spark.bigquery.BigQueryCatalog"
spark.sql.catalog.bigquery.project: "example-project"
# This field is needed and important for the Spark-BigQuery connector.
spark.jars: "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.42.3.jar"
resourceProfile:
inline:
clusterConfig:
gceClusterConfig:
zoneUri: "us-central1-a"
metadata: # This metadata field is needed and important for the Spark-BigQuery connector.
SPARK_BQ_CONNECTOR_URL: "gs://spark-lib/bigquery/spark-3.5-bigquery-0.43.1.jar"
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
Pipeline
Aktionen vom Typ pipeline. Datenverarbeitungs-Pipeline ausführen
Aktionstypspezifische Schlüssel:
framework:dbtdataform>airflowWorkerdataform>dataformService
Beispiele:
Logo: dbt
modelVersion: "1.0"
pipelineId: "dbt-pipeline"
description: "A pipeline that runs a dbt workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "dbt-action"
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform>airflowWorker
modelVersion: "1.0"
pipelineId: "dataform-pipeline-local"
description: "A pipeline that runs a Dataform workflow."
runner: "airflow"
owner: "example-owner"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
triggers:
- schedule:
interval: "0 5 * * *"
startTime: "2025-10-01T00:00:00"
endTime: "2026-10-01T00:00:00"
catchup: false
timezone: "UTC"
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform>dataformService
modelVersion: "1.0"
pipelineId: dataform-service
description: "A pipeline that runs a Dataform workflow."
owner: "example-owner"
tags: ["dataform", "example"]
runner: "airflow"
defaults:
projectId: "example-project"
location: "us-central1"
executionConfig:
retries: 0
actions:
- pipeline:
name: "run_dataform"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
workflowConfig: "projects/example-project/locations/us-central1/repositories/example-repository/workflowConfigs/example-test"
- pipeline:
name: "run_dataform_compilation"
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/tets/compilationResults/8027975f-9692-42da-a13b-2267a4909f76"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
Suchmaschinen
In Aktionen verwendete Engines.
dataprocOnGce > existingCluster
In einem vorhandenen Managed Service for Apache Spark-Cluster ausführen, der durch clusterName, project und location identifiziert wird.
Sie können den angegebenen Cluster in Ihrer Bereitstellungskonfiguration oder manuell in Managed Service for Apache Spark verwalten. Wir empfehlen, den Cluster regelmäßig zu aktualisieren.
Schlüssel:
clusterName: Name des Clusterslocation: Region, in der sich der Cluster befindetprojectId: Projekt-ID des Projekts, in dem sich der Cluster befindetproperties: Eine Karte der Spark-Jobattribute.
Beispiel:
engine:
dataprocOnGce:
existingCluster:
clusterName: "example-dataproc-cluster"
location: "us-central1"
projectId: "example-project"
impersonationChain: "example-account@example-project.iam.gserviceaccount.com"
dataprocOnGce > ephemeralCluster
Ausführung in einem kurzlebigen Managed Service for Apache Spark-Cluster, der nach der Ausführung des Jobs erstellt und gelöscht wird.
Schlüssel:
clusterName: Name des Clusterslocation: Region, in der sich der Cluster befindetprojectId: Projekt-ID des Projekts, in dem sich der Cluster befindetimpersonationChain: Die Kette zur Identitätsübernahme des Dienstkontos, die zum Ausführen der Aktion verwendet werden soll.resourceProfile: Ressourcenprofil für Managed Service for Apache Spark-Cluster.Eine Beschreibung der verfügbaren Felder finden Sie in der Managed Service for Apache Spark-Dokumentation unter ClusterConfig.
Ein Ressourcenprofil kann auf folgende Weise angegeben werden:
inline: als Teil der Pipelinekonfiguration definiert.path: in einer Datei definiert, die sich im relativen Pfad befindet.external_config_path: in einer Datei in einem Cloud Storage-Bucket definiert. Im Gegensatz zu den Optioneninlineundpath, für die ein Commit und eine Bereitstellung erforderlich sind, um die Werte des Ressourcenprofils zu aktualisieren, wird ein externes Ressourcenprofil bei jedem Pipeline-Lauf aufgelöst. Sie können es also aktualisieren, ohne die Pipeline neu bereitzustellen.
Mit dem Schlüssel
overridekönnen Überschreibungen auf das angegebene Ressourcenprofil angewendet werden. Überschreibungen werden mit Deep Merge auf das bereitgestellte Quellprofil angewendet.properties: Eine Karte der Spark-Jobattribute.
Beispiel:
engine:
dataprocOnGce:
ephemeralCluster:
projectId: "example-project"
location: "us-central1"
clusterName: "example-ephemeral-cluster"
resourceProfile:
inline:
config:
masterConfig:
numInstances: 1
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
workerConfig:
numInstances: 2
machineTypeUri: "n1-standard-4"
diskConfig:
bootDiskType: "pd-standard"
bootDiskSizeGb: 1024
properties:
spark.submit.deployMode: "cluster"
dataprocServerless
Führen Sie die Batchübermittlung in Managed Service for Apache Spark aus.
Schlüssel:
location: Region, in der der Spark-Job ausgeführt werden muss.impersonationChain: Identitätsübernahmekette des Dienstkontos, die zum Ausführen der Aktion verwendet werden soll.resourceProfile: Ressourcenprofil für Managed Service for Apache Spark.Ein Ressourcenprofil kann auf folgende Weise angegeben werden:
inline: als Teil der Pipelinekonfiguration definiert.path: in einer Datei definiert, die sich im relativen Pfad befindet.external_config_path: in einer Datei in einem Cloud Storage-Bucket definiert. Im Gegensatz zu den Optioneninlineundpath, für die ein Commit und eine Bereitstellung erforderlich sind, um die Werte des Ressourcenprofils zu aktualisieren, wird ein externes Ressourcenprofil bei jedem Pipeline-Lauf aufgelöst. Sie können es also aktualisieren, ohne die Pipeline neu bereitzustellen.
Die folgenden Schlüssel geben die Konfiguration des Ressourcenprofils an:
environmentConfig: UmgebungskonfigurationruntimeConfig: Laufzeitkonfiguration
Eine Beschreibung der verfügbaren Felder finden Sie in der Dokumentation zu Managed Service for Apache Spark unter RuntimeConfig und EnvironmentConfig.
Mit dem Schlüssel
overridekönnen Überschreibungen auf das angegebene Ressourcenprofil angewendet werden. Überschreibungen werden mit Deep Merge auf das bereitgestellte Ressourcenprofil angewendet.
Beispiel (inline):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
inline:
environmentConfig:
executionConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"
networkUri: "projects/example-project/global/networks/default"
runtimeConfig:
version: "2.3"
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
Beispiel (externer Pfad und Überschreibungen):
engine:
dataprocServerless:
location: "us-central1"
resourceProfile:
externalConfigPath: "gs://example-runner-bucket/data/batch-resource-profile.yml"
overrides:
runtimeConfig:
properties:
spark.app.name: "run-notebook-on-dataproc-serverless"
spark.executor.instances: "2"
spark.driver.cores: "4"
bigQuery
Als BigQuery-Job ausführen
Schlüssel:
location: Region, in der sich die Zieltabelle befindet.destinationTable: BigQuery-Tabelle für die Ausgabe der DatenimpersonationChain: Die Identitätsübernahmekette des Dienstkontos, die zum Ausführen der Aktion verwendet werden soll.
Beispiel:
engine:
bigquery:
location: "US"
destinationTable: "example-project.example_dataset.example_table_query_results"
lokal
Lokal in der Runner-Umgebung ausführen.
Informationen zum Konfigurieren der virtuellen Umgebung finden Sie unter der Aktion python.
Beispiel:
engine:
local: {}
Frameworks
In Aktionen verwendete Frameworks.
dbt > airflowWorker
Führen Sie ein dbt-Modell aus, das auf einem Airflow-Worker in der Runner-Umgebung ausgeführt wird, indem Sie dbt-core verwenden.
Schlüssel:
projectDirectoryPath: Relativer Pfad zu einem Ordner, der das DBT-Projekt enthält.selectModels: Liste der Modelle, die nach Namen in die Ausführung aufgenommen werden sollen (entsprichtdbt --select).tags: Liste der Modelle, die nach Tag in den Lauf aufgenommen werden sollen (entsprichtdbt --select).
Beispiel:
framework:
dbt:
airflowWorker:
projectDirectoryPath: "dbt_project"
selectModels: ["model_1", "model_2"]
dataform > airflowWorker
Dataform-Workflows, die auf einem Airflow-Worker in der Runner-Umgebung mit der Dataform Core-Befehlszeile ausgeführt werden.
Schlüssel:
projectDirectoryPath: Relativer Pfad zu einem Ordner, der die Dataform-Workflowdefinitionen enthält.
Beispiel:
framework:
dataform:
airflowWorker:
projectDirectoryPath: "dataform_local"
dataform > dataformService
Führt Dataform-Workflows aus, die im Dataform-Dienst ausgeführt werden.
Schlüssel:
location: Speicherort des Dataform-Repositorys.projectId: Das Projekt, in dem sich das Dataform-Repository befindet.repositoryId: Dataform-Repository-IDworkflowInvocation: Konfiguration für den Workflow-Aufruf, in der angegeben wird, welche Aktionen ausgeführt werden sollen. Weitere Informationen finden Sie unter WorkflowInvocation.
Beispiel:
framework:
dataform:
dataformService:
location: "us-central1"
projectId: "example-project"
repositoryId: "example-repository"
workflowInvocation:
compilationResult: "projects/example-project/locations/us-central1/repositories/example-repository/compilationResults/example-compilation-id"
invocationConfig:
serviceAccount: "example-account@example-project.iam.gserviceaccount.com"