在 Dataflow 中运行 Flex 模板

本页介绍了如何使用 Flex 模板运行 Dataflow 作业。借助 Flex 模板,您可以打包 Dataflow 流水线,以便在没有 Apache Beam 开发环境的情况下运行该流水线。

所需权限

当您运行 Flex 模板时,Dataflow 会为您创建一个作业。如需创建作业,Dataflow 服务账号需要以下权限:

  • dataflow.serviceAgent

首次使用 Dataflow 时,服务会为您分配此角色,因此您无需授予此权限。

默认情况下,Compute Engine 服务账号用于启动器虚拟机和工作器虚拟机。该服务账号需要以下角色和权限:

  • Storage Object Admin (roles/storage.objectAdmin)
  • Viewer (roles/viewer)
  • Dataflow Worker (roles/dataflow.worker)
  • 暂存存储桶的读写权限
  • Flex 模板映像的读取权限

如需授予暂存存储桶的读写权限,您可以使用 Storage Object Admin (roles/storage.objectAdmin) 角色。如需了解详情,请参阅 Cloud Storage 的 IAM 角色

如需授予 Flex 模板映像的读取权限,您可以使用 Storage Object Viewer (roles/storage.objectViewer) 角色。如需了解详情,请参阅配置访问权限控制

运行 Flex 模板

如需运行 Flex 模板,请使用 gcloud dataflow flex-template run 命令:

gcloud dataflow flex-template run JOB_ID \
  --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \
  --region REGION \
  --staging-location STAGING_LOCATION \
  --temp-location TEMP_LOCATION \
  --parameters  PARAMETERS \
  --additional-user-labels LABELS \

替换以下内容:

  • JOB_ID:作业的 ID

  • TEMPLATE_FILE_LOCATION:模板文件的 Cloud Storage 位置

  • REGION:运行 Dataflow 作业的区域

  • STAGING_LOCATION:用于暂存本地文件的 Cloud Storage 位置

  • TEMP_LOCATION:用于写入临时文件的 Cloud Storage 位置。如果未设置,则默认为过渡位置。

  • PARAMETERS:作业的流水线参数

  • LABELS:可选。附加到作业的标签,使用 KEY_1=VALUE_1,KEY_2=VALUE_2,... 格式。

在启动模板的暂存步骤中,Dataflow 会将文件写入暂存位置。Dataflow 会读取这些暂存文件以创建作业图。在执行步骤中,Dataflow 会将文件写入临时位置。

设置流水线选项

如需在运行 Flex 模板时设置流水线选项,请在 gcloud dataflow flex-template run 命令中使用以下标志:

gcloud

传递 ListMap 类型的参数时,您可能需要在 YAML 文件中定义参数并使用 flags-file标志。

API

以下示例展示了如何在请求正文中包含流水线选项、实验和其他选项:

{
  "jobName": "my-flex-template-job",
  "parameters": {
    "option_defined_in_metadata": "value"
  },
  "environment": {
    "additionalExperiments": [
      "use_runner_v2"
    ],
    "additionalPipelineOptions": {
      "common_pipeline_option": "value"
    }
  }
}

使用 Flex 模板时,您可以在流水线初始化期间配置部分流水线选项,但其他流水线选项无法更改。 如果 Flex 模板所需的命令行参数被覆盖,作业可能会忽略、替换或舍弃模板启动器传递的流水线选项。此外,作业本身可能会无法启动,或者系统可能会启动未使用 Flex 模板的作业。如需了解详情,请参阅读取作业文件失败

在流水线初始化期间,请勿更改以下流水线选项

Java

  • runner
  • project
  • jobName
  • templateLocation
  • region

Python

  • runner
  • project
  • job_name
  • template_location
  • region

Go

  • runner
  • project
  • job_name
  • template_location
  • region

屏蔽使用基于元数据的 SSH 密钥的虚拟机的项目 SSH 密钥

您可以通过阻止来自虚拟机的项目 SSH 密钥,阻止虚拟机接受存储在项目元数据中的 SSH 密钥。 将 additional-experiments 标志与 block_project_ssh_keys 服务选项搭配使用:

--additional-experiments=block_project_ssh_keys

如需了解详情,请参阅 Dataflow 服务选项

更新 Flex 模板作业

以下示例请求展示了如何使用 projects.locations.flexTemplates.launch 方法更新模板流处理作业。如果您想使用 gcloud CLI,请参阅更新现有流水线

如果要更新经典模板,请改用 projects.locations.templates.launch

  1. 按照步骤通过 Flex 模板创建流处理作业。发送以下 HTTP POST 请求,其中包含修改后的值:

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    {
        "launchParameter": {
          "update": true
          "jobName": "JOB_NAME",
          "parameters": {
            "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
            "output_table": "PROJECT_ID:DATASET.TABLE_NAME"
          },
        "containerSpecGcsPath": "STORAGE_PATH"
        },
    }
    
    • PROJECT_ID 替换为您的项目 ID。
    • REGION 替换为您要更新的作业的 Dataflow 区域
    • JOB_NAME 替换为您要更新的作业的确切名称。
    • parameters 设置为您的键值对列表。列出的参数特定于此模板示例。如果您使用的是自定义模板,请根据需要修改参数。如果您使用的是示例模板,请替换以下变量。
      • SUBSCRIPTION_NAME 替换为您的 Pub/Sub 订阅名称。
      • DATASET 替换为您的 BigQuery 数据集名称。
      • TABLE_NAME 替换为 BigQuery 表名称。
    • STORAGE_PATH 替换为模板文件的 Cloud Storage 位置。该位置应以 gs:// 开头。
  2. 使用 environment 参数更改环境设置。如需了解详情,请参阅 FlexTemplateRuntimeEnvironment

  3. 可选:如需使用 curl(Linux、macOS 或 Cloud Shell)发送请求,将请求保存到 JSON 文件,然后运行以下命令:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch
    

    FILE_PATH 替换为包含请求正文的 JSON 文件的路径。

  4. 使用 Dataflow 监控界面验证已创建了具有相同名称的新作业。此作业的状态为已更新

后续步骤