このドキュメントでは、カスタム コンテナを使用して Dataflow パイプラインを実行する方法について説明します。
コンテナ イメージの作成方法については、Dataflow 用のカスタム コンテナ イメージをビルドするをご覧ください。
パイプラインを実行するときに、カスタム コンテナ イメージ上の SDK と同じバージョンと言語バージョンの Apache Beam SDK を使用してパイプラインを起動します。このステップにより、互換性のない依存関係や SDK からの予期しないエラーを回避できます。
ローカルでテストする
Dataflow でパイプラインを実行する前に、コンテナ イメージをローカルでテストすることをおすすめします。これにより、テストとデバッグをより迅速に行うことができます。
Apache Beam 固有の使用方法については、Apache Beam のガイドでカスタム コンテナ イメージを使用したパイプラインの実行に関する説明をご覧ください。
PortableRunner を使用した基本的なテスト
リモート コンテナ イメージを pull でき、簡単なパイプラインを実行できることを確認するには、Apache Beam PortableRunner を使用します。PortableRunner を使用すると、ジョブ送信がローカル環境で行われ、DoFn の実行が Docker 環境で行われます。
GPU を使用する場合、Docker コンテナが GPU にアクセスできない場合があります。GPU を使用してコンテナをテストするには、Direct Runner を使用し、「GPU を使用する」ページのスタンドアロン VM でデバッグするの手順に沿って、スタンドアロン VM でコンテナ イメージをテストします。
次のサンプル パイプラインを実行します。
Java
mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
-Dexec.args="--runner=PortableRunner \
--jobEndpoint=REGION \
--defaultEnvironmentType=DOCKER \
--defaultEnvironmentConfig=IMAGE_URI \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE"Python
python path/to/my/pipeline.py \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILEGo
go path/to/my/pipeline.go \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE次のように置き換えます。
REGION: 使用するジョブサービス リージョン(アドレスとポートの形式)。例:localhost:3000。プロセス内ジョブサービスを実行するにはembedを使用します。IMAGE_URI: カスタム コンテナ イメージの URI。INPUT_FILE: テキスト ファイルとして読み取り可能な入力ファイル。このファイルは、コンテナ イメージまたはリモート ファイルにプリロードされた SDK ハーネス コンテナ イメージからアクセスできる必要があります。OUTPUT_FILE: 出力を書き込むパス。このパスは、リモートパスまたはコンテナのローカルパスになります。
パイプラインが正常に完了したらコンソールのログを調べて、パイプラインが正常に完了し、IMAGE_URI で指定されたリモート イメージが使用されていることを確認します。
パイプラインの実行後、コンテナに保存されたファイルはローカル ファイル システムに存在せず、コンテナは停止します。停止したコンテナ ファイル システムからファイルをコピーするには、docker cp を使用します。
または
- Cloud Storage などのリモート ファイル システムに出力を提供します。認証情報ファイルまたはアプリケーションのデフォルト認証情報など、テスト目的でアクセスを手動で構成しなければならない場合があります。
- すばやくデバッグできるように、一時的なロギングを追加します。
Direct Runner を使用する
コンテナ イメージとパイプラインのより詳細なローカルテストには、Apache Beam Direct Runner を使用します。
パイプラインをコンテナとは別に検証するには、コンテナ イメージと一致するローカル環境でテストするか、実行中のコンテナでパイプラインを起動します。
Java
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...
Python
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# python path/to/my/pipeline.py ...
Go
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# go path/to/my/pipeline.go ...
IMAGE_URI は、カスタム コンテナ イメージの URI に置き換えます。
この例では、パイプライン ファイル(パイプライン自体を含む)がカスタム コンテナに配置されていることを前提としています。また、ローカル ファイル システムからマウントされているか、Apache Beam とコンテナからリモートでアクセスできることを前提としています。たとえば、Maven(mvn)を使用して前の Java の例を実行するには、Maven とその依存関係をコンテナでステージングする必要があります。詳細については、Docker ドキュメントのストレージと docker run をご覧ください。
Direct Runner でテストする目的は、デフォルトの ENTRYPOINT でコンテナの実行をテストすることではなく、カスタム コンテナ環境でパイプラインをテストすることです。ENTRYPOINT(docker run --entrypoint ... など)を変更してパイプラインを直接実行するか、コンテナでコマンドを手動で実行します。
Compute Engine でのコンテナの実行に基づく特定の構成に依存している場合は、Compute Engine VM でコンテナを直接実行できます。詳細については、Compute Engine のコンテナをご覧ください。
Dataflow ジョブを起動する
Dataflow で Apache Beam パイプラインを起動するときは、コンテナ イメージへのパスを指定します。カスタム イメージでは :latest タグを使用しないでください。ビルドには、日付または一意の識別子をタグとして設定します。このタイプのタグを使用すると、問題が発生したときにパイプラインの実行を既知の動作構成に戻し、変更の検査が可能になります。
Java
--sdkContainerImage を使用して、Java ランタイム用の SDK コンテナ イメージを指定します。
Runner v2 を有効にするには、--experiments=use_runner_v2 を使用します。
Python
SDK バージョン 2.30.0 以降を使用している場合は、パイプライン オプション --sdk_container_image を使用して SDK コンテナ イメージを指定します。
古いバージョンの SDK の場合は、パイプライン オプション --worker_harness_container_image を使用して、ワーカー ハーネスに使用するコンテナ イメージの場所を指定します。
カスタム コンテナは Dataflow Runner v2 でのみサポートされます。バッチ Python パイプラインを起動する場合は、--experiments=use_runner_v2 フラグを設定します。ストリーミング Python パイプラインを起動する場合、ストリーミング Python パイプラインはデフォルトで Runner v2 を使用するため、テストを指定する必要はありません。
Go
SDK バージョン 2.40.0 以降を使用している場合は、パイプライン オプション --sdk_container_image を使用して SDK コンテナ イメージを指定します。
古いバージョンの SDK の場合は、パイプライン オプション --worker_harness_container_image を使用して、ワーカー ハーネスに使用するコンテナ イメージの場所を指定します。
カスタム コンテナは、デフォルトで Dataflow Runner v2 を使用するため、Go SDK のすべてのバージョンでサポートされています。
次の例では、カスタム コンテナを使用してバッチ WordCount サンプルを起動する方法を示しています。
Java
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--gcpTempLocation=TEMP_LOCATION \
--diskSizeGb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdkContainerImage=IMAGE_URI"Python
Apache Beam SDK for Python バージョン 2.30.0 以降の使用:
python -m apache_beam.examples.wordcount \
--input=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--temp_location=TEMP_LOCATION \
--runner=DataflowRunner \
--disk_size_gb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdk_container_image=IMAGE_URIGo
wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--sdk_container_image=IMAGE_URI次のように置き換えます。
INPUT_FILE: サンプルの実行時に Dataflow によって読み取られる Cloud Storage 入力パス。OUTPUT_FILE: サンプル パイプラインによって書き込まれる Cloud Storage 出力パス。このファイルに文字数が書き込まれます。PROJECT_ID: Google Cloudオブジェクトの IDREGION: Dataflow ジョブをデプロイするリージョン。TEMP_LOCATION: パイプラインの実行中に作成される一時ジョブファイルをステージングするための Dataflow 用の Cloud Storage パス。DISK_SIZE_GB: 省略可。コンテナのサイズが大きい場合は、ディスク容量が不足しないように、デフォルトのブートディスク サイズを増やすことを検討してください。IMAGE_URI: SDK カスタム コンテナ イメージの URI。バージョニングされたコンテナの SHA またはタグを常に使用します。:latestタグや可変タグは使用しないでください。
コンテナ イメージのストリーミング
イメージ ストリーミングを有効にすると、Dataflow パイプラインの起動と自動スケーリングのレイテンシを改善できます。この機能は、カスタム コンテナに不要なコンテンツが含まれている場合や、各ステップで一部のコンテンツを使用しない場合に便利です。たとえば、GPU ベースの推論用のコンテナに CPU ベースのライブラリ コードなどのコンテンツが誤って含まれている場合があります。同様に、コンテナで実行する ML パイプラインに複数のモデルが含まれており、各ステップで 1 つのモデルのみを使用する場合、そのコンテンツは一度にすべて必要になるわけではありません。このような場合は、イメージ ストリーミングを有効にするとレイテンシを改善できます。
Java
--dataflowServiceOptions=enable_image_streaming
Python
--dataflow_service_options=enable_image_streaming
Go
--dataflow_service_options=enable_image_streaming
イメージ ストリーミングでは、コンテナ全体を事前にダウンロードするのではなく、パイプライン コードで必要になった時点でカスタム コンテナの一部を取得します。コンテナの使用されていない部分はダウンロードする必要がありません。
イメージ ストリーミングを利用するには、Container File System API を有効にする必要があります。