本页介绍了如何使用 Flex 模板运行 Dataflow 作业。借助 Flex 模板,您可以打包 Dataflow 流水线,以便在没有 Apache Beam 开发环境的情况下运行该流水线。
所需权限
当您运行 Flex 模板时,Dataflow 会为您创建一个作业。如需创建作业,Dataflow 服务账号需要以下权限:
dataflow.serviceAgent
首次使用 Dataflow 时,服务会为您分配此角色,因此您无需授予此权限。
默认情况下,Compute Engine 服务账号用于启动器虚拟机和工作器虚拟机。该服务账号需要以下角色和权限:
- Storage Object Admin (
roles/storage.objectAdmin) - Viewer (
roles/viewer) - Dataflow Worker (
roles/dataflow.worker) - 暂存存储桶的读写权限
- Flex 模板映像的读取权限
如需授予暂存存储桶的读写权限,您可以使用 Storage Object Admin (roles/storage.objectAdmin) 角色。如需了解详情,请参阅 Cloud Storage 的 IAM 角色。
如需授予 Flex 模板映像的读取权限,您可以使用 Storage Object Viewer (roles/storage.objectViewer) 角色。如需了解详情,请参阅配置访问权限控制。
运行 Flex 模板
如需运行 Flex 模板,请使用 gcloud dataflow flex-template run 命令:
gcloud dataflow flex-template run JOB_ID \ --template-file-gcs-location gs://TEMPLATE_FILE_LOCATION \ --region REGION \ --staging-location STAGING_LOCATION \ --temp-location TEMP_LOCATION \ --parameters PARAMETERS \ --additional-user-labels LABELS \
替换以下内容:
JOB_ID:作业的 IDTEMPLATE_FILE_LOCATION:模板文件的 Cloud Storage 位置REGION:运行 Dataflow 作业的区域STAGING_LOCATION:用于暂存本地文件的 Cloud Storage 位置TEMP_LOCATION:用于写入临时文件的 Cloud Storage 位置。如果未设置,则默认为过渡位置。PARAMETERS:作业的流水线参数LABELS:可选。附加到作业的标签,使用KEY_1=VALUE_1,KEY_2=VALUE_2,...格式。
在启动模板的暂存步骤中,Dataflow 会将文件写入暂存位置。Dataflow 会读取这些暂存文件以创建作业图。在执行步骤中,Dataflow 会将文件写入临时位置。
设置流水线选项
如需在运行 Flex 模板时设置流水线选项,请在 gcloud dataflow flex-template run 命令中使用以下标志:
parameters:使用此标志可设置以下类型的流水线选项:additional-pipeline-options:使用此标志可设置 Flex 模板未直接支持的其他 Apache Beam 流水线选项。additional-experiments:使用此标志可设置实验性流水线选项(相当于experiments选项)。
gcloud
使用
parameters标志添加流水线选项。使用
additional-experiments和additional-pipeline-options标志添加运行时实验和流水线选项。
传递 List 或 Map 类型的参数时,您可能需要在 YAML 文件中定义参数并使用 flags-file标志。
API
使用
parameters字段添加流水线选项。使用
additionalExperiments和additionalPipelineOptions字段添加运行时实验和流水线选项。
以下示例展示了如何在请求正文中包含流水线选项、实验和其他选项:
{
"jobName": "my-flex-template-job",
"parameters": {
"option_defined_in_metadata": "value"
},
"environment": {
"additionalExperiments": [
"use_runner_v2"
],
"additionalPipelineOptions": {
"common_pipeline_option": "value"
}
}
}
使用 Flex 模板时,您可以在流水线初始化期间配置部分流水线选项,但其他流水线选项无法更改。 如果 Flex 模板所需的命令行参数被覆盖,作业可能会忽略、替换或舍弃模板启动器传递的流水线选项。此外,作业本身可能会无法启动,或者系统可能会启动未使用 Flex 模板的作业。如需了解详情,请参阅读取作业文件失败。
在流水线初始化期间,请勿更改以下流水线选项:
Java
runnerprojectjobNametemplateLocationregion
Python
runnerprojectjob_nametemplate_locationregion
Go
runnerprojectjob_nametemplate_locationregion
屏蔽使用基于元数据的 SSH 密钥的虚拟机的项目 SSH 密钥
您可以通过阻止来自虚拟机的项目 SSH 密钥,阻止虚拟机接受存储在项目元数据中的 SSH 密钥。 将 additional-experiments 标志与 block_project_ssh_keys 服务选项搭配使用:
--additional-experiments=block_project_ssh_keys
如需了解详情,请参阅 Dataflow 服务选项。
更新 Flex 模板作业
以下示例请求展示了如何使用 projects.locations.flexTemplates.launch 方法更新模板流处理作业。如果您想使用 gcloud CLI,请参阅更新现有流水线。
如果要更新经典模板,请改用 projects.locations.templates.launch。
按照步骤通过 Flex 模板创建流处理作业。发送以下 HTTP POST 请求,其中包含修改后的值:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch { "launchParameter": { "update": true "jobName": "JOB_NAME", "parameters": { "input_subscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "output_table": "PROJECT_ID:DATASET.TABLE_NAME" }, "containerSpecGcsPath": "STORAGE_PATH" }, }- 将
PROJECT_ID替换为您的项目 ID。 - 将
REGION替换为您要更新的作业的 Dataflow 区域。 - 将
JOB_NAME替换为您要更新的作业的确切名称。 - 将
parameters设置为您的键值对列表。列出的参数特定于此模板示例。如果您使用的是自定义模板,请根据需要修改参数。如果您使用的是示例模板,请替换以下变量。- 将
SUBSCRIPTION_NAME替换为您的 Pub/Sub 订阅名称。 - 将
DATASET替换为您的 BigQuery 数据集名称。 - 将
TABLE_NAME替换为 BigQuery 表名称。
- 将
- 将
STORAGE_PATH替换为模板文件的 Cloud Storage 位置。该位置应以gs://开头。
- 将
使用
environment参数更改环境设置。如需了解详情,请参阅FlexTemplateRuntimeEnvironment。可选:如需使用 curl(Linux、macOS 或 Cloud Shell)发送请求,将请求保存到 JSON 文件,然后运行以下命令:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch将 FILE_PATH 替换为包含请求正文的 JSON 文件的路径。
使用 Dataflow 监控界面验证已创建了具有相同名称的新作业。此作业的状态为已更新。
后续步骤
- 了解如何为 Apache Beam 流水线构建 Flex 模板。
- 如需详细了解经典模板和 Flex 模板及其用例场景,请参阅 Dataflow 模板。
- 如需了解 Flex 模板问题排查相关的信息,请参阅排查 Flex 模板超时问题。
- 如需查看更多参考架构、图表和最佳实践,请浏览云架构中心。