Flexible Vorlagen in Dataflow ausführen

Auf dieser Seite wird beschrieben, wie Sie einen Dataflow-Job mit einer flexiblen Vorlage ausführen. Mit flexiblen Vorlagen können Sie eine Dataflow-Pipeline so verpacken, dass Sie die Pipeline ohne Apache Beam-Entwicklungsumgebung ausführen können.

Erforderliche Berechtigungen

Wenn Sie eine Flex-Vorlage ausführen, erstellt Dataflow einen Job für Sie. Zum Erstellen des Jobs benötigt das Dataflow-Dienstkonto die folgende Berechtigung:

  • dataflow.serviceAgent

Wenn Sie Dataflow zum ersten Mal verwenden, weist der Dienst Ihnen diese Rolle zu. Sie müssen diese Berechtigung dann also nicht erteilen.

Standardmäßig wird das Compute Engine-Dienstkonto für Launcher-VMs und Worker-VMs verwendet. Das Dienstkonto benötigt die folgenden Rollen und Funktionen:

  • Storage-Objekt-Administrator (roles/storage.objectAdmin)
  • Betrachter (roles/viewer)
  • Dataflow-Worker (roles/dataflow.worker)
  • Lese- und Schreibzugriff auf den Staging-Bucket
  • Lesezugriff auf das Flex-Vorlagenbild

Wenn Sie Lese- und Schreibzugriff auf den Staging-Bucket gewähren möchten, verwenden Sie die Rolle "Storage-Objekt-Administrator" (roles/storage.objectAdmin). Weitere Informationen finden Sie unter IAM-Rollen für Cloud Storage.

Um Lesezugriff auf das Flex-Vorlagen-Image zu erteilen, können Sie die Rolle "Storage-Objekt-Betrachter" (roles/storage.objectViewer) verwenden. Weitere Informationen erhalten Sie unter Zugriffssteuerung konfigurieren.

Flexible Vorlage ausführen

Verwenden Sie den Befehl gcloud dataflow flex-template run, um eine flexible Vorlage auszuführen:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \
  --region REGION \
  --staging-location STAGING_LOCATION \
  --temp-location TEMP_LOCATION \
  --parameters  PARAMETERS \
  --additional-user-labels LABELS \

Ersetzen Sie Folgendes:

  • JOB_ID: die ID Ihres Jobs

  • TEMPLATE_FILE_LOCATION: der Cloud Storage-Speicherort der Vorlagendatei

  • REGION: die Region, in der der Dataflow-Job ausgeführt werden soll

  • STAGING_LOCATION: Der Cloud Storage-Speicherort für das Staging lokaler Dateien

  • TEMP_LOCATION: Der Cloud Storage-Speicherort, an dem temporäre Dateien geschrieben werden sollen. Wenn nichts anderes festgelegt ist, wird standardmäßig der Staging-Speicherort verwendet.

  • PARAMETERS: Pipelineparameter für den Job

  • LABELS: Optional. Labels, die Ihrem Job zugewiesen sind, im Format KEY_1=VALUE_1,KEY_2=VALUE_2,....

Während des Staging-Schritts beim Starten einer Vorlage schreibt Dataflow Dateien an den Staging-Speicherort. Dataflow liest diese bereitgestellten Dateien, um die Jobgrafik zu erstellen. Während des Ausführungsschritts schreibt Dataflow Dateien an den temporären Speicherort.

Pipelineoptionen festlegen

Wenn Sie Pipelineoptionen festlegen möchten, wenn Sie eine flexible Vorlage ausführen, verwenden Sie die folgenden Flags im gcloud dataflow flex-template run-Befehl:

gcloud

Wenn Sie Parameter vom Typ List oder Map übergeben, müssen Sie möglicherweise Parameter in einer YAML-Datei definieren und das Flag flags-file verwenden.

API

Im folgenden Beispiel sehen Sie, wie Sie Pipelineoptionen, Tests und zusätzliche Optionen in einen Anfragetext einfügen:

{
  "jobName": "my-flex-template-job",
  "parameters": {
    "option_defined_in_metadata": "value"
  },
  "environment": {
    "additionalExperiments": [
      "use_runner_v2"
    ],
    "additionalPipelineOptions": {
      "common_pipeline_option": "value"
    }
  }
}

Bei Verwendung von Flex-Vorlagen haben Sie die Möglichkeit, einige Pipelineoptionen bei der Pipelineinitialisierung zu konfigurieren. Es können aber nicht alle Pipelineoptionen geändert werden. Wenn die von der Flex-Vorlage erforderlichen Befehlszeilenargumente überschrieben werden, kann es sein, dass der Job die vom Vorlagen-Launcher übergebenen Pipelineoptionen ignoriert, überschreibt oder verwirft. Der Job wird möglicherweise nicht gestartet, oder es wird ein Job gestartet, der die Flex-Vorlage nicht verwendet. Weitere Informationen finden Sie unter Jobdatei konnte nicht gelesen werden.

Ändern Sie bei der Pipelineinitialisierung die folgenden Pipelineoptionen nicht:

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Go

  • runner
  • project
  • job_name
  • template_location
  • region

Projekt-SSH-Schlüssel für VMs mit metadatenbasierten SSH-Schlüsseln blockieren

Wenn Sie verhindern möchten, dass VMs SSH-Schlüssel akzeptieren, die in Projektmetadaten gespeichert sind, können Sie die Projekt-SSH-Schlüssel für VMs blockieren. Verwenden Sie das Flag additional-experiments mit der Dienstoption block_project_ssh_keys:

--additional-experiments=block_project_ssh_keys

Weitere Informationen finden Sie unter Dataflow-Dienstoptionen.

Job mit flexibler Vorlage aktualisieren

Die folgende Beispielanfrage zeigt, wie Sie eine Vorlage für einen Streamingjob mit der Methode projects.locations.flexTemplates.launch aktualisieren. Wenn Sie die gcloud CLI verwenden möchten, finden Sie weitere Informationen unter Vorhandene Pipeline aktualisieren.

Wenn Sie eine klassische Vorlage aktualisieren möchten, verwenden Sie stattdessen projects.locations.templates.launch.

  1. Führen Sie die Schritte zum Erstellen eines Streamingjobs aus einer flexiblen Vorlage aus. Senden Sie die folgende HTTP-POST-Anfrage mit den geänderten Werten:

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • Ersetzen Sie PROJECT_ID durch Ihre Projekt-ID.
    • Ersetzen Sie REGION durch die Dataflow-Region des Jobs, den Sie aktualisieren.
    • Ersetzen Sie JOB_NAME durch den genauen Namen des Jobs, den Sie aktualisieren möchten.
    • Stellen Sie das Flag parameters auf Ihre Liste der Schlüssel/Wert-Paare ein. Die aufgeführten Parameter gelten speziell für dieses Vorlagenbeispiel. Wenn Sie eine benutzerdefinierte Vorlage verwenden, ändern Sie die Parameter nach Bedarf. Wenn Sie die Beispielvorlage verwenden, ersetzen Sie die folgenden Variablen.
      • Ersetzen Sie SUBSCRIPTION_NAME durch den Namen des Pub/Sub-Abos.
      • Ersetzen Sie DATASET durch den Namen Ihres BigQuery-Datasets.
      • Ersetzen Sie TABLE_NAME durch Ihren BigQuery-Tabellennamen.
    • Ersetzen Sie STORAGE_PATH durch den Cloud Storage-Speicherort der Vorlagendatei. Der Speicherort sollte mit gs:// beginnen.
  2. Mit dem Parameter environment können Sie die Umgebungseinstellungen ändern. Weitere Informationen finden Sie unter: FlexTemplateRuntimeEnvironment

  3. Optional: Zum Senden der Anfrage mit curl (Linux, macOS oder Cloud Shell) speichern Sie die Anfrage in einer JSON-Datei und führen Sie dann den folgenden Befehl aus:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    Ersetzen Sie FILE_PATH durch den Pfad zur JSON-Datei, die den Anfragetext enthält.

  4. Überprüfen Sie über die Dataflow-Monitoring-Oberfläche, ob ein neuer Job mit demselben Namen erstellt wurde. Dieser Job hat den Status Aktualisiert.

Nächste Schritte