このページでは、Flex テンプレートを使用して Dataflow ジョブを実行する方法について説明します。Flex テンプレートを使用すると、Apache Beam 開発環境がなくてもパイプラインを実行できるように、Dataflow パイプラインをパッケージ化できます。
必要な権限
Flex テンプレートを実行すると、Dataflow によってジョブが作成されます。ジョブを作成するには、Dataflow サービス アカウントに次の権限が必要です。
dataflow.serviceAgent
このロールは、Dataflow を初めて使用するとき自動的に割り当てられます。そのため、この権限を付与する必要はありません。
デフォルトでは、Compute Engine サービス アカウントはランチャー VM とワーカー VM に使用されます。サービス アカウントには、次のロールと機能が必要です。
- ストレージ オブジェクト管理者(
roles/storage.objectAdmin) - 閲覧者(
roles/viewer) - Dataflow ワーカー(
roles/dataflow.worker) - ステージング バケットへの読み取り / 書き込みアクセス権
- Flex テンプレート イメージに対する読み取りアクセス権
ステージング バケットに対する読み取り / 書き込みアクセス権を付与するには、ストレージ オブジェクト管理者(roles/storage.objectAdmin)のロールを使用します。詳しくは、Cloud Storage の IAM ロールをご覧ください。
Flex テンプレート イメージに対する読み取りアクセス権を付与するには、ストレージ オブジェクト閲覧者(roles/storage.objectViewer)のロールを使用します。詳細については、アクセス制御の構成をご覧ください。
Flex テンプレートを実行する
Flex テンプレートを実行するには、gcloud dataflow flex-template run コマンドを使用します。
gcloud dataflow flex-template run JOB_ID \ --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \ --region REGION \ --staging-location STAGING_LOCATION \ --temp-location TEMP_LOCATION \ --parameters PARAMETERS \ --additional-user-labels LABELS \
次のように置き換えます。
JOB_ID: ジョブの IDTEMPLATE_FILE_LOCATION: テンプレート ファイルの Cloud Storage のロケーションREGION: Dataflow ジョブを実行するリージョンSTAGING_LOCATION: ローカル ファイルをステージングする Cloud Storage のロケーションTEMP_LOCATION: 一時ファイルの書き込み先の Cloud Storage のロケーション。設定しない場合、デフォルトはステージングの場所になります。PARAMETERS: ジョブのパイプライン パラメータLABELS: 省略可。ジョブに適用されたラベル(KEY_1=VALUE_1,KEY_2=VALUE_2,...形式)。
テンプレートを起動するステージング ステップで、Dataflow はステージング ロケーションにファイルを書き込みます。Dataflow は、ステージングされたファイルを読み取り、ジョブグラフを作成します。実行ステップでは、Dataflow は一時的なロケーションにファイルを書き込みます。
パイプライン オプションを設定する
Flex テンプレートの実行時にパイプライン オプションを設定するには、gcloud dataflow flex-template run コマンドで次のフラグを使用します。
parameters: このフラグを使用して、次のタイプのパイプライン オプションを設定します。Flex テンプレートでサポートされているパイプライン オプション。Flex テンプレートでサポートされているオプションの一覧については、パイプライン オプションをご覧ください。
テンプレート メタデータで宣言されたパイプライン オプション。
additional-pipeline-options: このフラグを使用して、フレックス テンプレートで直接サポートされていない他の Apache Beam パイプライン オプションを設定します。additional-experiments: このフラグを使用して、試験運用版のパイプライン オプションを設定します(experimentsオプションと同等)。
gcloud
パイプライン オプションを指定するには、
parametersフラグを使用します。ランタイム試験運用版オプションとランタイム パイプライン オプションを指定するには、
additional-experimentsフラグとadditional-pipeline-optionsフラグを使用します。
List 型または Map 型のパラメータを渡す場合は、YAML ファイルでパラメータを定義して、flags-file フラグを使用する必要がある場合があります。
API
パイプライン オプションを指定するには、
parametersフィールドを使用します。ランタイム試験運用版オプションとランタイム パイプライン オプションを指定するには、
additionalExperimentsおよびadditionalPipelineOptionsフィールドを使用します。
次の例は、リクエスト本文でパイプライン オプション、試験運用版、追加オプションを指定する方法を示しています。
{
"jobName": "my-flex-template-job",
"parameters": {
"option_defined_in_metadata": "value"
},
"environment": {
"additionalExperiments": [
"use_runner_v2"
],
"additionalPipelineOptions": {
"common_pipeline_option": "value"
}
}
}
Flex テンプレートを使用する場合、パイプラインの初期化時にいくつかのパイプライン オプションを構成できますが、他のパイプライン オプションは変更できません。Flex テンプレートに必要なコマンドライン引数が上書きされると、テンプレート ランチャーから渡されたパイプライン オプションがジョブによって無視、オーバーライド、または破棄される可能性があります。ジョブの起動に失敗する、または Flex テンプレートを使用していないジョブが起動する可能性があります。詳細については、ジョブファイルの読み取りに失敗したをご覧ください。
パイプラインの初期化中は、次のパイプライン オプションを変更しないでください。
Java
runnerprojectjobNametemplateLocationregion
Python
runnerprojectjob_nametemplate_locationregion
Go
runnerprojectjob_nametemplate_locationregion
メタデータ ベースの SSH 認証鍵を使用する VM からのプロジェクトの SSH 認証鍵をブロックする
プロジェクト メタデータに保存された SSH 認証鍵を VM が受け入れないようにするには、VM からのプロジェクトの SSH 認証鍵をブロックします。block_project_ssh_keys サービス オプションを指定して additional-experiments フラグを使用します。
--additional-experiments=block_project_ssh_keys
詳細については、Dataflow サービス オプションをご覧ください。
Flex テンプレート ジョブを更新する
次のリクエストの例は、projects.locations.flexTemplates.launch メソッドを使用してテンプレート ストリーミング ジョブを更新する方法を示しています。gcloud CLI を使用する場合は、既存のパイプラインを更新するをご覧ください。
クラシック テンプレートを更新する場合は、projects.locations.templates.launch を使用します。
手順に沿って、Flex テンプレートからストリーミング ジョブを作成します。次のように値を変更して、下記の HTTP POST リクエストを送信します。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch { "launchParameter": { "update": true "jobName": "JOB_NAME", "parameters": { "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "output_table": "PROJECT_ID:DATASET.TABLE_NAME" }, "containerSpecGcsPath": "STORAGE_PATH" }, }PROJECT_IDは、実際のプロジェクト ID に置き換えます。REGIONは、更新するジョブの Dataflow リージョンに置き換えます。JOB_NAMEは、更新するジョブの正確な名前に置き換えます。parametersを Key-Value ペアのリストに設定します。このリスト化されたパラメータは、このテンプレートの例に固有のものです。カスタム テンプレートを使用している場合は、必要に応じてパラメータを変更します。サンプル テンプレートを使用している場合は、次の変数を置き換えます。SUBSCRIPTION_NAMEは、Pub/Sub サブスクリプション名に置き換えます。DATASETは、BigQuery データセット名に置き換えます。TABLE_NAMEは、BigQuery テーブル名に置き換えます。
STORAGE_PATHは、テンプレート ファイルの Cloud Storage のロケーションに置き換えます。ロケーションはgs://で始まる必要があります。
environmentパラメータを使用して環境設定を変更します。詳細については、FlexTemplateRuntimeEnvironmentをご覧ください。省略可: curl(Linux、macOS、Cloud Shell)を使用してリクエストを送信するには、リクエストを JSON ファイルに保存し、次のコマンドを実行します。
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launchFILE_PATH は、リクエスト本文を含む JSON ファイルのパスに置き換えます。
Dataflow モニタリング インターフェースを使用して、同じ名前の新しいジョブが作成されたことを確認します。このジョブのステータスは「更新済み」です。
次のステップ
- Apache Beam パイプライン用の Flex テンプレートを構築する方法を学習する。
- Dataflow テンプレートで、クラシック テンプレート、Flex テンプレート、そのユースケース シナリオの詳細を確認する。
- Flex テンプレートのトラブルシューティングで、Flex テンプレートのトラブルシューティング情報を確認する。
- Cloud アーキテクチャ センターで、リファレンス アーキテクチャ、図、ベスト プラクティスを確認する。