在本文档中,您将了解如何通过 Dataflow 流水线代码创建自定义经典模板。 经典模板打包现有 Dataflow 流水线,以创建可重复使用的模板,您可以通过更改特定的流水线参数来为每个作业自定义模板。您可以使用命令从现有流水线生成模板,而不是编写模板。
以下是此过程的简要介绍。后续部分详细介绍了此过程。
- 在流水线代码中,为要在运行时设置或使用的所有流水线选项使用
ValueProvider接口。使用接受运行时参数的DoFn对象。 - 使用附加元数据扩展模板,以便在运行经典模板时验证自定义参数。此类元数据的示例包括自定义经典模板的名称和可选参数。
- 检查流水线 I/O 连接器是否支持
ValueProvider对象,并根据需要进行更改。 - 创建并暂存自定义经典模板。
- 运行自定义经典模板。
如需了解不同类型的 Dataflow 模板及其优势以及何时选择经典模板,请参阅 Dataflow 模板。
运行经典模板所需的权限
运行 Dataflow 经典模板所需的权限取决于您运行模板的位置,以及流水线的来源和接收器是否位于其他项目中。
如需详细了解如何在本地或使用 Google Cloud Platform 运行 Dataflow 流水线,请参阅 Dataflow 安全性和权限。
如需查看 Dataflow 角色和权限的列表,请参阅 Dataflow 访问权限控制。
限制
- 经典模板不支持以下流水线选项。如果您需要控制工作器自动化测试框架线程数,请使用 Flex 模板。
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads - Dataflow 运行程序不支持 Pub/Sub 主题和订阅参数的
ValueProvider选项。如果您需要在运行时参数中使用 Pub/Sub 选项,请使用 Flex 模板。
关于运行时参数和 ValueProvider 接口
ValueProvider 接口允许流水线接受运行时参数。Apache Beam 提供三种类型的 ValueProvider 对象。
| 名称 | 说明 |
|---|---|
RuntimeValueProvider |
您可以使用 如果您无法提前知道值,请使用 |
StaticValueProvider |
如果您提前知道值,请使用 |
NestedValueProvider |
如果要在运行时使用该值计算另一个值,请使用 |
在流水线代码中使用运行时参数
本部分介绍如何使用 ValueProvider、StaticValueProvider 和 NestedValueProvider。
在流水线选项中使用 ValueProvider
为要在运行时设置或使用的所有流水线选项使用 ValueProvider。
例如,以下 WordCount 代码段不支持运行时参数。该代码会添加一个输入文件选项、创建一个流水线并从输入文件中读取行。
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
如需添加运行时参数支持,请修改输入文件选项以使用 ValueProvider。
Java
对于输入文件选项类型,请使用 ValueProvider<String>(而不是 String)。
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
将 add_argument 替换为 add_value_provider_argument。
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
在函数中使用 ValueProvider
如需在您自己的函数中使用运行时参数值,请更新函数以使用 ValueProvider 参数。
以下示例包含一个整数 ValueProvider 选项和一个做整数加法的简单函数。该函数依赖于 ValueProvider 整数。在执行期间,流水线会将 MySumFn 应用于 PCollection(包含 [1, 2, 3])中的每个整数。如果运行时值为 10,则生成的 PCollection 将包含 [11, 12, 13]。
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
使用 StaticValueProvider
如需为流水线提供静态值,请使用 StaticValueProvider。
此示例使用 MySumFn,它是一个采用 ValueProvider<Integer> 的 DoFn。如果您提前知道参数的值,则可以使用 StaticValueProvider 将静态值指定为 ValueProvider。
Java
此代码获取流水线运行时的值:
.apply(ParDo.of(new MySumFn(options.getInt())))
相反,您可以将 StaticValueProvider 用于静态值:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
此代码获取流水线运行时的值:
beam.ParDo(MySumFn(user_options.templated_int))
相反,您可以将 StaticValueProvider 用于静态值:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
在实现支持常规参数和运行时参数的 I/O 模块时,也可以使用 StaticValueProvider。
StaticValueProvider 减少了实现两个类似方法的代码重复。
Java
此示例的源代码来自 Apache Beam 的 TextIO.java(位于 GitHub 上)。
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
在此示例中,存在一个同时接受 string 或 ValueProvider 参数的构造参数。如果该参数是 string,它将被转换为 StaticValueProvider。
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
使用 NestedStaticValueProvider
如需从另一个 ValueProvider 对象计算值,请使用 NestedValueProvider。
NestedValueProvider 将 ValueProvider 和 SerializableFunction 转换函数作为输入。在对 NestedValueProvider 调用 .get() 时,转换函数会根据 ValueProvider 值创建一个新值。此转换可让您使用 ValueProvider 值来创建所需的最终值。
在以下示例中,用户提供文件名 file.txt。转换将路径 gs://directory_name/ 添加到文件名之前。调用 .get() 将返回 gs://directory_name/file.txt。
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
在流水线代码中使用元数据
您可以使用附加元数据扩展模板,以便在运行模板时验证自定义参数。如果要为模板创建元数据,请按照以下步骤操作:
- 使用元数据参数中的参数和示例元数据文件中的格式创建名为
TEMPLATE_NAME_metadata的 JSON 格式的文件。将TEMPLATE_NAME替换为模板的名称。确保元数据文件没有文件扩展名。例如,如果模板名称为
myTemplate,则其元数据文件必须为myTemplate_metadata。 - 将元数据文件存储在 Cloud Storage 中模板所在的文件夹内。
元数据参数
| 参数键 | 必需 | 值的说明 | |
|---|---|---|---|
name |
是 | 模板的名称。 | |
description |
否 | 对模板进行说明的一小段文本。 | |
streaming |
否 | 如果为 true,则此模板支持流处理。默认值为 false。 |
|
supportsAtLeastOnce |
否 | 如果为 true,则此模板支持“至少一次”处理。默认值为 false。如果模板设计为支持“至少一次”流处理模式,请将此参数设置为 true。
|
|
supportsExactlyOnce |
否 | 如果为 true,则此模板支持“正好一次”处理。默认值为 true。 |
|
defaultStreamingMode |
否 | 默认流处理模式,适用于同时支持“至少一次”模式和“正好一次”模式的模板。请使用以下某个值:"AT_LEAST_ONCE"、"EXACTLY_ONCE"。如果未指定,则默认流处理模式为“正好一次”。
|
|
parameters |
否 | 模板使用的一组附加参数。默认情况下,使用空数组。 | |
name |
是 | 模板中使用的参数的名称。 | |
label |
是 | 人类可读的字符串,用于在 Google Cloud 控制台中标记参数。 | |
helpText |
是 | 对参数进行说明的一小段文本。 | |
isOptional |
否 | 如果参数是必需的,则为 false;如果参数是可选的,则为 true。除非设置了值,否则 isOptional 默认为 false。
如果您没有为元数据添加此参数键,则元数据会成为必需参数。 |
|
regexes |
否 | 一组字符串形式的 POSIX-egrep 正则表达式(用于验证参数的值)。例如,["^[a-zA-Z][a-zA-Z0-9]+"] 是一个正则表达式,用于验证以字母开头,并且包含一个或多个字符的值。默认使用空数组。 |
示例元数据文件
Java
Dataflow 服务使用以下元数据验证 WordCount 模板的自定义参数:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Dataflow 服务使用以下元数据验证 WordCount 模板的自定义参数:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
您可以从 Dataflow 模板目录下载 Google 提供的模板的元数据文件。
支持的流水线 I/O 连接器和 ValueProvider
Java
某些 I/O 连接器包含接受 ValueProvider 对象的方法。如需确定是否支持特定连接器和方法,请参阅 I/O 连接器的 API 参考文档。受支持的方法具有一个带有 ValueProvider 的过载。如果方法没有过载,则此方法不支持运行时参数。以下 I/O 连接器至少具有部分 ValueProvider 支持:
- 基于文件的 IO:
TextIO、AvroIO、FileIO、TFRecordIO、XmlIO BigQueryIO*BigtableIO(需要 SDK 2.3.0 或更高版本)PubSubIOSpannerIO
Python
某些 I/O 连接器包含接受 ValueProvider 对象的方法。如需确定是否支持 I/O 连接器及其方法,请参阅连接器的 API 参考文档。以下 I/O 连接器接受运行时参数:
- 基于文件的 IO:
textio、avroio、tfrecordio
创建和暂存经典模板
写入流水线后,您必须创建并暂存模板文件。创建和暂存模板后,暂存位置包含运行模板所需的其他文件。如果删除暂存位置,模板将无法运行。Dataflow 作业不会在您暂存模板后立即运行。如需运行基于模板的自定义 Dataflow 作业,您可以使用 Google Cloud 控制台、Dataflow REST API 或 gcloud CLI。
以下示例展示了如何暂存模板文件:
Java
此 Maven 命令会在使用 --templateLocation 指定的 Cloud Storage 位置创建和暂存模板。
mvn compile exec:java \
-Dexec.mainClass=com.example.myclass \
-Dexec.args="--runner=DataflowRunner \
--project=PROJECT_ID \
--stagingLocation=gs://BUCKET_NAME/staging \
--templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
--region=REGION" \
-P dataflow-runner
验证 templateLocation 路径是否正确。请替换以下内容:
com.example.myclass:您的 Java 类PROJECT_ID:您的项目 IDBUCKET_NAME:Cloud Storage 存储桶的名称。TEMPLATE_NAME:您的模板的名称REGION:要在其中部署 Dataflow 作业的区域
Python
此 Python 命令会在使用 --template_location 指定的 Cloud Storage 位置创建和暂存模板。
python -m examples.mymodule \
--runner DataflowRunner \
--project PROJECT_ID \
--staging_location gs://BUCKET_NAME/staging \
--template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
--region REGION
验证 template_location 路径是否正确。请替换以下内容:
examples.mymodule:您的 Python 模块PROJECT_ID:您的项目 IDBUCKET_NAME:Cloud Storage 存储桶的名称。TEMPLATE_NAME:您的模板的名称REGION:要在其中部署 Dataflow 作业的区域
创建和暂存模板后,下一步是运行模板。