您可以重新运行已完成、已取消或失败的机器学习 (ML) 流水线运行中的特定任务。发起重新运行后,您可以修改任务级配置,也可以选择跳过任务,然后根据更新的配置创建运行。新的流水线运行会保留对原始流水线运行的引用,以便进行跟踪。如果某项任务在之前的运行中成功完成,Vertex AI Pipelines 会重复使用该任务的缓存结果。否则,如果相应步骤之前失败,Vertex AI Pipelines 会在流水线重新运行时运行该步骤。
这样,您无需重启整个机器学习流水线,只需调整机器学习流水线即可高效解决机器学习流水线故障。您可以调整失败的任务、比较不同参数集带来的结果,或跳过失败的非必要任务。
对于在生产环境中管理复杂机器学习流水线的 MLOps 从业人员来说,重新运行流水线非常有用。以下是一些适用场景示例:
处理并行进程中的部分故障:当大型并行进程的一部分故障时,您可以跳过失败的任务,让流水线的其余部分继续运行。例如,如果 100 个任务中有一个任务的数据流水线失败,您可以跳过该任务。
使用更新的输入数据重新运行任务:如果需要使用更新的数据重新运行单个任务,您可以重新运行该特定任务。
无需更改代码即可调试生产环境问题:重新运行特定任务及其所有依赖任务,而无需涉及流水线代码的作者。
重新运行流水线
如需重新运行流水线,请使用 Vertex AI SDK for Python。
Python
使用以下示例,通过 PipelineJob.rerun()
方法跳过失败的任务并重新运行另一个具有更新的参数的任务,从而重新运行流水线:
from google.cloud import aiplatform from google.cloud.aiplatform.preview.pipelinejob.pipeline_jobs import ( _PipelineJob as PipelineJob ) from google.cloud.aiplatform_v1beta1.types.ui_pipeline_spec import RuntimeArtifact from google.protobuf.struct_pb2 import Value from google.cloud.aiplatform_v1beta1.types import PipelineTaskRerunConfig aiplatform.init(project="PROJECT_ID", location="LOCATION") job = aiplatform.PipelineJob.get(resource_name="PIPELINE_RUN_RESOURCE_NAME") original_job_name = job.resource_name rerun_task_id = None skip_failed_task_id = None task_inputs_override = PipelineTaskRerunConfig.Inputs( parameter_values={ "TASK_PARAMETER_1": Value(TASK_PARAMETER_1_VALUE), "TASK_PARAMETER_2": Value(TASK_PARAMETER_2_VALUE) } ) for task in job.task_details: if task.task_name == RERUN_TASK_NAME: rerun_task_id = task.task_id if task.task_name == SKIP_FAILED_TASK_NAME: skip_failed_task_id = task.task_id pipeline_job.rerun(original_pipelinejob_name=original_job_name, pipeline_task_rerun_configs=[ PipelineTaskRerunConfig(task_id = rerun_task_id, skip_task = False, inputs = PipelineTaskRerunConfig.Inputs(task_inputs_override) ), PipelineTaskRerunConfig(task_id = skip_failed_task_id, skip_task = True ) ], parameter_values=PIPELINE_PARAMETER_VALUES, job_id=RERUN_PIPELINE_JOB_ID)
替换以下内容:
- PROJECT_ID:包含流水线运行的 Google Cloud 项目。
- LOCATION:流水线运行所在的区域。如需详细了解支持 Vertex AI Pipelines 的区域,请参阅 Vertex AI 位置指南。
PIPELINE_RUN_RESOURCE_NAME:您要重新运行的已完成、失败或已取消的流水线运行的完全限定资源名称。 以
projects/PROJECT_NUMBER/locations/LOCATION/pipelineJobs/PIPELINE_RUN_ID
格式输入资源名称,其中:- PROJECT_NUMBER:您的项目的项目编号。您可以在 Google Cloud 控制台中找到此项目编号。如需了解详情,请参阅查找项目名称、编号和 ID。
- 将 PIPELINE_RUN_ID 替换为您要重新运行的流水线运行的唯一 ID。该 ID 显示在 Google Cloud 控制台流水线页面上的运行标签页中。
- RERUN_TASK_NAME:要使用更新的参数重新运行的任务的名称。
- SKIP_FAILED_TASK_NAME:在重新运行期间要跳过的失败任务的名称。
- TASK_PARAMETER_1 和 TASK_PARAMETER_2:您要在流水线重新运行中替换的任务的参数名称。
- TASK_PARAMETER_1_VALUE 和 TASK_PARAMETER_2_VALUE:流水线重新运行时 TASK_PARAMETER_1 和 TASK_PARAMETER_2 的新值。
- PIPELINE_PARAMETER_VALUES:可选。用于流水线重新运行的更新的流水线运行级参数值。
- RERUN_PIPELINE_JOB_ID:可选。要分配给新的流水线重新运行作业的唯一 ID。