Dataflow で Flex テンプレートを実行する

このページでは、Flex テンプレートを使用して Dataflow ジョブを実行する方法について説明します。Flex テンプレートを使用すると、Apache Beam 開発環境がなくてもパイプラインを実行できるように、Dataflow パイプラインをパッケージ化できます。

必要な権限

Flex テンプレートを実行すると、Dataflow によってジョブが作成されます。ジョブを作成するには、Dataflow サービス アカウントに次の権限が必要です。

  • dataflow.serviceAgent

このロールは、Dataflow を初めて使用するとき自動的に割り当てられます。そのため、この権限を付与する必要はありません。

デフォルトでは、Compute Engine サービス アカウントはランチャー VM とワーカー VM に使用されます。サービス アカウントには、次のロールと機能が必要です。

  • ストレージ オブジェクト管理者(roles/storage.objectAdmin
  • 閲覧者(roles/viewer
  • Dataflow ワーカー(roles/dataflow.worker
  • ステージング バケットへの読み取り / 書き込みアクセス権
  • Flex テンプレート イメージに対する読み取りアクセス権

ステージング バケットに対する読み取り / 書き込みアクセス権を付与するには、ストレージ オブジェクト管理者(roles/storage.objectAdmin)のロールを使用します。詳しくは、Cloud Storage の IAM ロールをご覧ください。

Flex テンプレート イメージに対する読み取りアクセス権を付与するには、ストレージ オブジェクト閲覧者(roles/storage.objectViewer)のロールを使用します。詳細については、アクセス制御の構成をご覧ください。

Flex テンプレートを実行する

Flex テンプレートを実行するには、gcloud dataflow flex-template run コマンドを使用します。

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \
  --region REGION \
  --staging-location STAGING_LOCATION \
  --temp-location TEMP_LOCATION \
  --parameters  PARAMETERS \
  --additional-user-labels LABELS \

次のように置き換えます。

  • JOB_ID: ジョブの ID

  • TEMPLATE_FILE_LOCATION: テンプレート ファイルの Cloud Storage のロケーション

  • REGION: Dataflow ジョブを実行するリージョン

  • STAGING_LOCATION: ローカル ファイルをステージングする Cloud Storage のロケーション

  • TEMP_LOCATION: 一時ファイルの書き込み先の Cloud Storage のロケーション。設定しない場合、デフォルトはステージングの場所になります。

  • PARAMETERS: ジョブのパイプライン パラメータ

  • LABELS: 省略可。ジョブに適用されたラベル(KEY_1=VALUE_1,KEY_2=VALUE_2,... 形式)。

テンプレートを起動するステージング ステップで、Dataflow はステージング ロケーションにファイルを書き込みます。Dataflow は、ステージングされたファイルを読み取り、ジョブグラフを作成します。実行ステップでは、Dataflow は一時的なロケーションにファイルを書き込みます。

パイプライン オプションを設定する

Flex テンプレートの実行時にパイプライン オプションを設定するには、gcloud dataflow flex-template run コマンドで次のフラグを使用します。

gcloud

  • パイプライン オプションを指定するには、parameters フラグを使用します。

  • ランタイム試験運用版オプションとランタイム パイプライン オプションを指定するには、additional-experiments フラグと additional-pipeline-options フラグを使用します。

List 型または Map 型のパラメータを渡す場合は、YAML ファイルでパラメータを定義して、flags-file フラグを使用する必要がある場合があります。

API

次の例は、リクエスト本文でパイプライン オプション、試験運用版、追加オプションを指定する方法を示しています。

{
  "jobName": "my-flex-template-job",
  "parameters": {
    "option_defined_in_metadata": "value"
  },
  "environment": {
    "additionalExperiments": [
      "use_runner_v2"
    ],
    "additionalPipelineOptions": {
      "common_pipeline_option": "value"
    }
  }
}

Flex テンプレートを使用する場合、パイプラインの初期化時にいくつかのパイプライン オプションを構成できますが、他のパイプライン オプションは変更できません。Flex テンプレートに必要なコマンドライン引数が上書きされると、テンプレート ランチャーから渡されたパイプライン オプションがジョブによって無視、オーバーライド、または破棄される可能性があります。ジョブの起動に失敗する、または Flex テンプレートを使用していないジョブが起動する可能性があります。詳細については、ジョブファイルの読み取りに失敗したをご覧ください。

パイプラインの初期化中は、次のパイプライン オプションを変更しないでください。

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Go

  • runner
  • project
  • job_name
  • template_location
  • region

メタデータ ベースの SSH 認証鍵を使用する VM からのプロジェクトの SSH 認証鍵をブロックする

プロジェクト メタデータに保存された SSH 認証鍵を VM が受け入れないようにするには、VM からのプロジェクトの SSH 認証鍵をブロックします。block_project_ssh_keys サービス オプションを指定して additional-experiments フラグを使用します。

--additional-experiments=block_project_ssh_keys

詳細については、Dataflow サービス オプションをご覧ください。

Flex テンプレート ジョブを更新する

次のリクエストの例は、projects.locations.flexTemplates.launch メソッドを使用してテンプレート ストリーミング ジョブを更新する方法を示しています。gcloud CLI を使用する場合は、既存のパイプラインを更新するをご覧ください。

クラシック テンプレートを更新する場合は、projects.locations.templates.launch を使用します。

  1. 手順に沿って、Flex テンプレートからストリーミング ジョブを作成します。次のように値を変更して、下記の HTTP POST リクエストを送信します。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • REGION は、更新するジョブの Dataflow リージョンに置き換えます。
    • JOB_NAME は、更新するジョブの正確な名前に置き換えます。
    • parameters を Key-Value ペアのリストに設定します。このリスト化されたパラメータは、このテンプレートの例に固有のものです。カスタム テンプレートを使用している場合は、必要に応じてパラメータを変更します。サンプル テンプレートを使用している場合は、次の変数を置き換えます。
      • SUBSCRIPTION_NAME は、Pub/Sub サブスクリプション名に置き換えます。
      • DATASET は、BigQuery データセット名に置き換えます。
      • TABLE_NAME は、BigQuery テーブル名に置き換えます。
    • STORAGE_PATH は、テンプレート ファイルの Cloud Storage のロケーションに置き換えます。ロケーションは gs:// で始まる必要があります。
  2. environment パラメータを使用して環境設定を変更します。詳細については、FlexTemplateRuntimeEnvironment をご覧ください。

  3. 省略可: curl(Linux、macOS、Cloud Shell)を使用してリクエストを送信するには、リクエストを JSON ファイルに保存し、次のコマンドを実行します。

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    FILE_PATH は、リクエスト本文を含む JSON ファイルのパスに置き換えます。

  4. Dataflow モニタリング インターフェースを使用して、同じ名前の新しいジョブが作成されたことを確認します。このジョブのステータスは「更新済み」です。

次のステップ