Run Flex Templates in Dataflow

This page describes how to run a Dataflow job by using a Flex Template. Flex Templates let you package a Dataflow pipeline so that you can run the pipeline without having an Apache Beam development environment.

Required permissions

When you run a Flex Template, Dataflow creates a job for you. To create the job, the Dataflow service account needs the following permission:

  • dataflow.serviceAgent

When you first use Dataflow, the service assigns this role for you, so you don't need to grant this permission.

By default, the Compute Engine service account is used for launcher VMs and worker VMs. The service account needs the following roles and abilities:

  • Storage Object Admin (roles/storage.objectAdmin)
  • Viewer (roles/viewer)
  • Dataflow Worker (roles/dataflow.worker)
  • Read and write access to the staging bucket
  • Read access to the Flex Template image

To grant read and write access to the staging bucket, you can use the role Storage Object Admin (roles/storage.objectAdmin). For more information, see IAM roles for Cloud Storage.

To grant read access to the Flex Template image, you can use the role Storage Object Viewer (roles/storage.objectViewer). For more information, see Configuring access control.

Run a Flex Template

To run a Flex Template, use the gcloud dataflow flex-template run command:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \
  --region REGION \
  --staging-location STAGING_LOCATION \
  --temp-location TEMP_LOCATION \
  --parameters  PARAMETERS \
  --additional-user-labels LABELS \

Replace the following:

  • JOB_ID: the ID for your job

  • TEMPLATE_FILE_LOCATION: the Cloud Storage location of the template file

  • REGION: the region in which to run the Dataflow job

  • STAGING_LOCATION: the Cloud Storage location to stage local files

  • TEMP_LOCATION: the Cloud Storage location to write temporary files. If not set, defaults to the staging location.

  • PARAMETERS: pipeline parameters for the job

  • LABELS: Optional. Labels attached to your job, using the format KEY_1=VALUE_1,KEY_2=VALUE_2,....

During the staging step of launching a template, Dataflow writes files to the staging location. Dataflow reads these staged files to create the job graph. During the execution step, Dataflow writes files to the temporary location.

Set pipeline options

To set pipeline options when you run a Flex Template, use the following flags in the gcloud dataflow flex-template run command:

gcloud

When passing parameters of List or Map type, you might need to define parameters in a YAML file and use the flags-file flag.

API

The following example shows how to include pipeline options, experiments, and additional options in a request body:

{
  "jobName": "my-flex-template-job",
  "parameters": {
    "option_defined_in_metadata": "value"
  },
  "environment": {
    "additionalExperiments": [
      "use_runner_v2"
    ],
    "additionalPipelineOptions": {
      "common_pipeline_option": "value"
    }
  }
}

When using Flex Templates, you can configure some pipeline options during pipeline initialization, but other pipeline options can't be changed. If the command line arguments required by the Flex Template are overwritten, the job might ignore, override, or discard the pipeline options passed by the template launcher. The job might fail to launch, or a job that doesn't use the Flex Template might launch. For more information, see Failed to read the job file.

During pipeline initialization, don't change the following pipeline options:

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Go

  • runner
  • project
  • job_name
  • template_location
  • region

Block project SSH keys from VMs that use metadata-based SSH keys

You can prevent VMs from accepting SSH keys that are stored in project metadata by blocking project SSH keys from VMs. Use the additional-experiments flag with the block_project_ssh_keys service option:

--additional-experiments=block_project_ssh_keys

For more information, see Dataflow service options.

Update a Flex Template job

The following example request shows you how to update a template streaming job by using the projects.locations.flexTemplates.launch method. If you want to use the gcloud CLI, see Update an existing pipeline.

If you want to update a classic template, use projects.locations.templates.launch instead.

  1. Follow the steps to create a streaming job from a Flex Template. Send the following HTTP POST request with the modified values:

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • Replace PROJECT_ID with your project ID.
    • Replace REGION with the Dataflow region of the job that you're updating.
    • Replace JOB_NAME with the exact name of the job that you want to update.
    • Set parameters to your list of key-value pairs. The parameters listed are specific to this template example. If you're using a custom template, modify the parameters as needed. If you're using the example template, replace the following variables.
      • Replace SUBSCRIPTION_NAME with your Pub/Sub subscription name.
      • Replace DATASET with your with your BigQuery dataset name.
      • Replace TABLE_NAME with your with your BigQuery table name.
    • Replace STORAGE_PATH with the Cloud Storage location of the template file. The location should start with gs://.
  2. Use the environment parameter to change environment settings. For more information, see FlexTemplateRuntimeEnvironment.

  3. Optional: To send your request using curl (Linux, macOS, or Cloud Shell), save the request to a JSON file, and then run the following command:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    Replace FILE_PATH with the path to the JSON file that contains the request body.

  4. Use the Dataflow monitoring interface to verify that a new job with the same name was created. This job has the status Updated.

What's next