使用 Dataflow 进行机器学习推理时,我们建议您使用 RunInference 转换。使用此转换可带来多项好处,包括:
- 执行本地推理时,针对 Dataflow 工作器优化的智能模型内存管理。
- 动态批处理,可使用流水线特性和用户定义的限制条件来优化性能。
- 可提供更高吞吐量和更低延迟时间的机器学习感知型 Dataflow 后端功能。
- 当达到远程推理配额限制时,采用智能退避和自动扩缩机制。
- 可用于生产环境的指标和运维功能。
使用 RunInference 时,请注意以下几点:
内存管理
加载中型或大型机器学习模型时,您的机器可能会内存不足。 Dataflow 提供了一些工具,可帮助您在加载机器学习模型时避免内存不足 (OOM) 错误。请使用下表确定适合您情况的方法。
| 场景 | 解决方案 |
|---|---|
| 模型足够小,可放入内存中。 |
使用 RunInference 转换,而无需进行任何其他配置。RunInference 转换会在多个线程中共享模型。如果您可以在机器上为每个 CPU 核心安装一个模型,您的流水线可以使用默认配置。
|
| 多个经过不同训练的模型执行相同的任务。 | 使用独立模型键。如需了解详情,请参阅使用多个不同训练的模型运行机器学习推理。 |
| 一个模型会加载到内存中,所有进程共享此模型。 |
使用 如果您要构建自定义模型处理程序,请替换 |
| 您需要配置加载到机器上的模型的确切数量。 |
如需精确控制加载的模型数量,请使用 如果您要构建自定义模型处理程序,请替换 |
如需详细了解如何使用 Dataflow 进行内存管理,请参阅排查 Dataflow 内存不足错误。
批处理
在 Beam 中有许多批处理方法,但在执行推理时,我们建议您让 RunInference 转换来处理批处理。如果您的模型在特定批次大小下表现最佳,请考虑限制 RunInference 的目标批次大小参数。大多数模型处理程序都会将最大和最小批次大小作为参数公开。例如,如需控制馈送到 HuggingFace 流水线的批次大小,您可以定义以下模型处理程序:
mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)
RunInference 转换始终遵循最大批次大小。最小批次大小是一个目标值,但无法保证在所有情况下都能实现。例如,请参阅下一部分中的基于包的批处理。
基于包的批处理
Dataflow 会以包的形式将数据传递给转换。这些包的大小可能会因 Dataflow 定义的启发法而异。通常,批处理流水线中的包相当大(O(100s) 个元素),而流处理流水线中的包可能很小(包括大小为 1 的包)。
默认情况下,RunInference 会根据每个包生成批次,而不会跨包进行批处理。这意味着,如果您设置的最小批次大小为 8,但您的包中只剩下 3 个元素,RunInference 将使用批次大小 3。大多数模型处理程序都公开了一个 max_batch_duration_secs 参数,可用于替换此行为。如果设置了 max_batch_duration_secs,则 RunInference 会跨包进行批处理。如果转换无法通过单个包实现其目标批次大小,则最多等待 max_batch_duration_secs,然后生成一个批次。例如,如要在使用 HuggingFace 流水线时启用跨包批处理,您可以定义以下模型处理程序:
mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)
如果您流水线的批次大小非常小,此功能会有所帮助。在其他情况下,跨包进行批处理的同步费用通常是不划算的,因为它可能会导致代价高昂的 shuffle。
处理故障
错误处理是所有生产流水线的重要组成部分。 Dataflow 会处理任意包中的元素,并会在该包中有任何元素抛出错误时重试整个包。如果您不应用其他错误处理机制,Dataflow 在以批处理模式运行时,会重试包含失败项的包四次。单个包失败四次后,流水线会完全失败。以流处理模式运行时,Dataflow 会无限期地重试包含失败项的包,而这可能会导致您的流水线永久性停滞。
RunInference 通过其 with_exception_handling 函数提供内置的错误处理机制。应用此函数后,它会将所有失败及其错误消息路由到单独的失败 PCollection。这样您就可以重新处理它们。如果您将预处理或后处理操作与模型处理程序相关联,RunInference 也会将这些操作路由到失败收集器。例如,如需收集具有预处理和后处理操作的模型处理程序中的所有失败,请使用以下逻辑:
main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)
# handles failed inferences
other.failed_inferences | beam.Map(logging.info)
# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)
超时
使用 RunInference 的 with_exception_handling 功能时,您还可以为每个操作设置超时,超时按批次计算。这样,您就可以避免单个卡住的推理导致整个流水线无响应。如果发生超时,超时记录会路由到失败 PCollection,所有模型状态都会清理并重新创建,然后继续正常执行。
# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)
从 Beam 2.68.0 开始,您还可以使用 --element_processing_timeout_minutes 流水线选项指定超时。在这种情况下,超时会导致失败的工作项重试,直到成功为止,而不是将失败的推理结果路由到死信队列。
使用加速器
使用加速器时,许多模型处理程序都有您可以启用的加速器特定配置。例如,在使用 GPU 和 Hugging Face 流水线时,建议您将 device 参数设置为 GPU:
mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')
我们还建议您从单个虚拟机实例开始,并在该实例上本地运行流水线。为此,请按照 GPU 问题排查指南中所述的步骤操作。这样可以显著缩短流水线运行所需的时间。这种方法还有助于您更好地了解作业的执行情况。
如需详细了解如何在 Dataflow 中使用加速器,请参阅 Dataflow 关于 GPU 和 TPU 的文档。
依赖项管理
机器学习流水线通常包含大型重要依赖项,例如 PyTorch 或 TensorFlow。为了管理这些依赖项,我们建议您在将作业部署到生产环境时使用自定义容器。这可确保您的作业在多次运行作业中都在稳定环境中执行并简化调试。
如需详细了解依赖项管理,请参阅 Beam 的 Python 依赖项管理页面。
后续步骤
- 探索 Dataflow 机器学习笔记本,了解实际示例。
- 如需深入了解如何将机器学习与 Apache Beam 搭配使用,请参阅 AI/机器学习流水线文档。
- 详细了解
RunInferenceAPI。 - 了解可用于监控
RunInference转换的指标。 - 返回“Dataflow 机器学习简介”页面,查看 Dataflow 的机器学习功能概览。