Dataflow-Job in einem benutzerdefinierten Container ausführen

In diesem Dokument wird beschrieben, wie Sie eine Dataflow-Pipeline mit einem benutzerdefinierten Container ausführen.

Informationen zum Erstellen des Container-Images finden Sie unter Benutzerdefinierte Container-Images für Dataflow erstellen.

Starten Sie beim Ausführen Ihrer Pipeline diese mit dem Apache Beam SDK mit der gleichen Version und Sprachversion wie das SDK auf Ihrem benutzerdefinierten Container-Image. Dadurch werden unerwartete Fehler durch inkompatible Abhängigkeiten oder SDKs vermieden.

Lokal testen

Bevor Sie Ihre Pipeline in Dataflow ausführen, sollten Sie das Container-Image lokal testen. So können Sie Tests und Fehlerbehebungen schneller durchführen.

Weitere Informationen zur Apache Beam-spezifischen Nutzung finden Sie im Apache Beam-Leitfaden zum Ausführen von Pipelines mit benutzerdefinierten Container-Images.

Einfache Tests mit PortableRunner

Verwenden Sie PortableRunner von Apache Beam, um zu prüfen, ob Remote-Container-Images abgerufen werden können und eine einfache Pipeline ausgeführt werden kann. Wenn Sie PortableRunner verwenden, erfolgt die Jobübermittlung in der lokalen Umgebung und die DoFn-Ausführung in der Docker-Umgebung.

Wenn Sie GPUs verwenden, hat der Docker-Container möglicherweise keinen Zugriff auf die GPUs. Verwenden Sie zum Testen Ihres Containers mit GPUs den Direct Runner und befolgen Sie die Schritte zum Testen eines Container-Images auf einer eigenständigen VM mit GPUs im Abschnitt Fehlerbehebung mit einer eigenständigen VM auf der Seite „GPUs verwenden“.

Im Folgenden wird eine Beispielpipeline ausgeführt:

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Go

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Ersetzen Sie dabei Folgendes:

  • REGION: die Region des Jobdienstes, die verwendet werden soll, in Form einer Adresse und eines Ports. Beispiel: localhost:3000 Führen Sie mit embed einen Prozessjobdienst aus.
  • IMAGE_URI: der URI des benutzerdefinierten Container-Images.
  • INPUT_FILE ist eine Eingabedatei, die als Textdatei gelesen werden kann. Auf diese Datei muss vom SDK aus zugegriffen werden können.
    Das Container-Image kann entweder als vorab geladenes Container-Image oder als Remote-Datei verwendet werden.
  • OUTPUT_FILE ist ein Pfad, in den die Ausgabe geschrieben wird. Dieser Pfad ist entweder ein Remote-Pfad oder ein lokaler Pfad im Container.

Wenn die Pipeline erfolgreich abgeschlossen wurde, prüfen Sie in den Konsolenlogs, ob die Pipeline erfolgreich abgeschlossen wurde und ob das Remote-Image, angegeben durch IMAGE_URI, verwendet wird.

Nachdem die Pipeline ausgeführt wurde, befinden sich die im Container gespeicherten Dateien nicht in Ihrem lokalen Dateisystem und der Container wird angehalten. Sie können Dateien aus dem angehaltenen Containerdateisystem mit docker cp kopieren.

Alternativ:

  • Ausgaben an ein Remote-Dateisystem wie Cloud Storage senden. Für Testzwecke ist es möglicherweise erforderlich, den Zugriff manuell zu konfigurieren, einschließlich Anmeldedatendateien oder Standardanmeldedaten für Anwendungen.
  • Fügen Sie für ein schnelles Debugging ein vorübergehendes Logging hinzu.

Direct Runner verwenden

Für detaillierte lokale Tests des Container-Images und Ihrer Pipeline verwenden Sie DirectRunner von Apache Beam.

Sie können Ihre Pipeline getrennt vom Container prüfen, indem Sie sie in einer lokalen Umgebung testen, die mit dem Container-Image übereinstimmt, oder indem Sie die Pipeline in einem laufenden Container starten.

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Go

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

Ersetzen Sie IMAGE_URI durch den URI des benutzerdefinierten Container-Image.

In den Beispielen wird davon ausgegangen, dass sich alle Pipeline-Dateien, einschließlich der Pipeline selbst, im benutzerdefinierten Container befinden, von einem lokalen Dateisystem bereitgestellt wurden oder von Apache Beam und dem Container remote zugänglich sind. Wenn Sie beispielsweise das vorherige Java-Beispiel mit Maven (mvn) ausführen möchten, müssen Maven und die zugehörigen Abhängigkeiten für den Container bereitgestellt werden. Weitere Informationen finden Sie in der Docker-Dokumentation unter Storage und docker run.

Das Ziel der Tests mit dem Direct Runner ist es, Ihre Pipeline in der benutzerdefinierten Container-Umgebung zu testen und nicht, die Ausführung Ihres Containers mit seinem Standard-ENTRYPOINT zu testen. Ändern Sie ENTRYPOINT (z. B. docker run --entrypoint ...), um Ihre Pipeline direkt auszuführen oder Befehle manuell im Container auszuführen.

Wenn Sie eine bestimmte Konfiguration benötigen, die auf der Ausführung des Containers in Compute Engine basiert, können Sie den Container direkt auf einer Compute Engine-VM ausführen. Weitere Informationen finden Sie unter Container in Compute Engine.

Dataflow-Job starten

Geben Sie beim Starten der Apache Beam-Pipeline in Dataflow den Pfad zum Container-Image an. Verwenden Sie nicht das Tag :latest mit Ihren benutzerdefinierten Images. Taggen Sie Ihre Builds mit einem Datum oder einer eindeutigen ID. Wenn etwas schiefgeht, können Sie mit dieser Art von Tag die Pipelineausführung möglicherweise auf eine zuvor bekannte funktionierende Konfiguration zurücksetzen und Änderungen prüfen.

Java

Verwenden Sie --sdkContainerImage, um ein SDK-Container-Image für Ihre Java-Laufzeit anzugeben.

Verwenden Sie --experiments=use_runner_v2, um Runner v2 zu aktivieren.

Python

Wenn Sie die SDK-Version 2.30.0 oder höher verwenden, geben Sie mit der Pipeline-Option --sdk_container_image ein SDK-Container-Image an.

Verwenden Sie bei älteren Versionen des SDK die Pipeline-Option --worker_harness_container_image, um den Speicherort des Container-Image anzugeben, das für die Worker-Harness verwendet werden soll.

Benutzerdefinierte Container werden nur für Dataflow Runner v2 unterstützt. Wenn Sie eine Batch-Python-Pipeline starten, geben Sie das Flag --experiments=use_runner_v2 an. Wenn Sie eine Streaming-Python-Pipeline starten, ist die Angabe des Tests nicht erforderlich, da für Streaming-Python-Pipelines standardmäßig Runner v2 verwendet wird.

Go

Wenn Sie die SDK-Version 2.40.0 oder höher verwenden, geben Sie mit der Pipeline-Option --sdk_container_image ein SDK-Container-Image an.

Verwenden Sie bei älteren Versionen des SDK die Pipeline-Option --worker_harness_container_image, um den Speicherort des Container-Image anzugeben, das für die Worker-Harness verwendet werden soll.

Benutzerdefinierte Container werden auf allen Versionen des Go SDK unterstützt, da sie standardmäßig Dataflow Runner v2 verwenden.

Das folgende Beispiel zeigt, wie das Beispiel WordCount mit einem benutzerdefinierten Container gestartet wird.

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

Verwenden Sie Apache Beam SDK für Python Version 2.30.0 oder höher:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Go

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

Ersetzen Sie dabei Folgendes:

  • INPUT_FILE: der Cloud Storage-Eingabepfad, der von Dataflow beim Ausführen des Beispiels gelesen wird.
  • OUTPUT_FILE: der Cloud Storage-Ausgabepfad, in den die Beispielpipeline geschrieben wird. Diese Datei enthält die Anzahl der Wörter.
  • PROJECT_ID: die ID Ihres Google Cloud Projekts.
  • REGION: Die Region, in der Ihr Dataflow-Job bereitgestellt werden soll.
  • TEMP_LOCATION: ein Cloud Storage-Pfad, den Dataflow für das Staging temporärer Jobdateien verwendet, die während der Pipelineausführung erstellt werden.
  • DISK_SIZE_GB: Optional. : (Optional) Wenn Ihr Container groß ist, sollten Sie die standardmäßige Bootlaufwerkgröße erhöhen, um zu vermeiden, dass der Speicherplatz aufgebraucht wird.
  • IMAGE_URI: der URI des benutzerdefinierten SDK-Container-Image. Geben Sie immer ein versioniertes Container-SHA oder -Tag an. Verwenden Sie weder das Tag :latest noch ein änderbares Tag.

Container-Image-Streaming

Sie können die Start- und Autoscaling-Latenz Ihrer Dataflow-Pipeline verbessern, indem Sie das Image-Streaming aktivieren. Diese Funktion ist nützlich, wenn Ihr benutzerdefinierter Container überflüssige Inhalte enthält oder nicht alle Inhalte in jedem Schritt verwendet. Ihr Container kann beispielsweise zufällige Inhalte wie CPU-basierten Bibliothekscode für die GPU-basierte Inferenz enthalten. Ebenso kann es sein, dass Sie einen Container haben, der ML-Pipelines mit mehreren Modellen ausführt, aber in jedem Schritt nur ein Modell verwendet. Daher werden die Inhalte nicht alle gleichzeitig benötigt. Wenn Sie das Image-Streaming aktivieren, können Sie die Latenz in diesen Fällen verbessern.

Java

--dataflowServiceOptions=enable_image_streaming

Python

--dataflow_service_options=enable_image_streaming

Go

--dataflow_service_options=enable_image_streaming

Beim Image-Streaming werden Teile Ihres benutzerdefinierten Containers abgerufen, wenn Ihr Pipelinecode sie benötigt, anstatt den gesamten Container im Voraus herunterzuladen. Teile Ihres Containers, die nicht verwendet werden, müssen nie heruntergeladen werden.

Sie müssen die Container File System API aktiviert haben, um das Image-Streaming nutzen zu können.