流水线模板是一种资源,可用于发布工作流定义,以供单个用户或多个用户多次重复使用。
Kubeflow Pipelines SDK 注册表客户端是一个新的客户端界面,您可将其与兼容的注册表服务器(如 Artifact Registry)搭配使用,以便对 Kubeflow Pipelines (KFP) 模板进行版本控制。如需了解详情,请参阅在 Kubeflow Pipelines SDK 注册表客户端中使用模板。
本页介绍如何执行以下操作:
- 创建 KFP 流水线模板
- 使用 Kubeflow Pipelines SDK 注册表客户端将模板上传到流水线模板代码库
- 在 Kubeflow Pipelines 客户端中使用模板
准备工作
在构建和运行流水线之前,请按照以下说明在 Google Cloud 控制台中设置 Google Cloud 项目和开发环境。
安装 Kubeflow Pipelines SDK v2 或更高版本。
pip install --upgrade "kfp>=2,<3"
安装 Vertex AI SDK for Python v1.15.0 或更高版本。
(可选)在安装之前,请运行以下命令以查看安装的 Vertex AI SDK for Python 版本:pip freeze | grep google-cloud-aiplatform(可选)安装 390.0.0 或更高版本的 Google Cloud CLI。
配置权限
如果您还没有为 Vertex AI Pipelines 设置 gcloud CLI 项目,请按照为 Vertex AI Pipelines 配置 Google Cloud 项目中的说明进行操作。
此外,请分配以下预定义的 Identity and Access Management 权限,以将 Artifact Registry 用作模板注册表:
roles/artifactregistry.admin:分配此角色可创建和管理制品库。roles/artifactregistry.repoAdmin或roles/artifactregistry.writer:分配其中任一角色可管理制品库内的模板。roles/artifactregistry.reader:分配此角色可从制品库下载模板。roles/artifactregistry.reader:将此角色分配给与 Vertex AI Pipelines 关联的服务账号可基于模板创建流水线运行作业。
如需详细了解 Artifact Registry 的预定义 Identity and Access Management 角色,请参阅预定义的 Artifact Registry 角色。
使用以下示例分配角色:
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=PRINCIPAL \
--role=ROLE
请替换以下内容:
- PROJECT_ID:您要在其中创建流水线的项目。
- PRINCIPAL:您要向其添加权限的主账号。
- ROLE:您要向主账号授予的 Identity and Access Management 角色。
如需详细了解以下内容,请参阅 Artifact Registry 文档中的角色和权限:
在 Artifact Registry 中创建代码库
接下来,您需要在 Artifact Registry 中为流水线模板创建一个代码库。
控制台
在 Google Cloud 控制台中打开 Vertex AI Pipelines。
点击您的模板标签页。
如需打开选择制品库窗格,请点击选择制品库。
点击创建代码库。
指定
quickstart-kfp-repo作为该代码库的名称。在格式下,选择
Kubeflow Pipelines。在位置类型下,选择区域。
在区域下拉列表中,选择
us-central1。点击创建。
Google Cloud CLI
运行以下命令以创建代码库。
在使用下面的命令数据之前,请先进行以下替换:
- LOCATION:创建代码库的位置或区域,例如
us-central1
执行 gcloud artifacts repositories create 命令:
Linux、macOS 或 Cloud Shell
gcloud artifacts repositories create quickstart-kfp-repo --location=LOCATION_ID --repository-format=KFP
Windows (PowerShell)
gcloud artifacts repositories create quickstart-kfp-repo --location=LOCATION_ID --repository-format=KFP
Windows (cmd.exe)
gcloud artifacts repositories create quickstart-kfp-repo --location=LOCATION_ID --repository-format=KFP
创建模板
使用以下代码示例定义具有单个组件的流水线。如需了解如何使用 KFP 定义流水线,请参阅构建流水线。
from kfp import dsl
from kfp import compiler
@dsl.component()
def hello_world(text: str) -> str:
print(text)
return text
@dsl.pipeline(name='hello-world', description='A simple intro pipeline')
def pipeline_hello_world(text: str = 'hi there'):
"""Pipeline that passes small pipeline parameter string to consumer op."""
consume_task = hello_world(
text=text) # Passing pipeline parameter as argument to consumer op
compiler.Compiler().compile(
pipeline_func=pipeline_hello_world,
package_path='hello_world_pipeline.yaml')
运行该示例时,compiler.Compiler().compile(...) 语句会将“hello-world”流水线编译为名为 hello_world_pipeline.yaml 的本地 YAML 文件。
上传模板
控制台
在 Google Cloud 控制台中打开 Vertex AI Pipelines。
点击上传以打开上传流水线或组件窗格。
在代码库下拉列表中,选择
quickstart-kfp-repo代码库。为流水线模板指定一个名称。
在文件字段中,点击选择,以从本地文件系统中选择并上传已编译的流水线模板 YAML。
上传流水线模板后,它会列在您的模板页面上。
Kubeflow Pipelines SDK 客户端
如需配置 Kubeflow Pipelines SDK 注册表客户端,请运行以下命令:
from kfp.registry import RegistryClient client = RegistryClient(host=f"https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo")将已编译的 YAML 文件上传到 Artifact Registry 中的代码库。
templateName, versionName = client.upload_pipeline( file_name="hello_world_pipeline.yaml", tags=["v1", "latest"], extra_headers={"description":"This is an example pipeline template."})如需验证模板是否已上传,请执行以下操作:
在 Google Cloud 控制台中打开 Vertex AI Pipelines。
点击您的模板标签页。
点击选择制品库。
从列表中选择
quickstart-kfp-repo代码库,然后点击选择。您应该会在列表中找到上传的模板软件包
hello-world。如需查看流水线模板的版本列表,请点击
hello-world模板。如需查看流水线拓扑,请点击版本。
在 Vertex AI 中使用模板
将流水线模板上传到 Artifact Registry 中的代码库后,您就可以在 Vertex AI Pipelines 中使用它了。
为模板创建暂存存储桶
在使用流水线模板之前,您需要为暂存流水线运行创建一个 Cloud Storage 存储桶。
如需创建存储桶,请按照为流水线工件配置 Cloud Storage 存储桶中的说明操作,然后运行以下命令:
STAGING_BUCKET="gs://BUCKET_NAME"
将 BUCKET_NAME 替换为您刚刚创建的存储桶的名称。
基于模板创建流水线运行
您可以使用 Vertex AI SDK for Python 或 Google Cloud 控制台通过 Artifact Registry 中的模板创建流水线运行。
控制台
在 Google Cloud 控制台中打开 Vertex AI Pipelines。
点击您的模板标签页。
如需打开选择制品库窗格,请点击选择制品库。
选择
quickstart-kfp-repo制品库,然后点击选择。点击
hello-world软件包。在
4f245e8f9605版本旁边,点击创建运行作业。点击运行时配置。
在 Cloud Storage 位置下,输入以下内容:
gs://BUCKET_NAME将 BUCKET_NAME 替换为您为暂存流水线运行创建的存储桶的名称。
点击提交。
Python 版 Vertex AI SDK
使用以下示例基于模板创建流水线运行:
from google.cloud import aiplatform
# Initialize the aiplatform package
aiplatform.init(
project="PROJECT_ID",
location='us-central1',
staging_bucket=STAGING_BUCKET)
# Create a pipeline job using a version ID.
job = aiplatform.PipelineJob(
display_name="hello-world-latest",
template_path="https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/hello-world@SHA256_TAG" + \
versionName)
# Alternatively, create a pipeline job using a tag.
job = aiplatform.PipelineJob(
display_name="hello-world-latest",
template_path="https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/hello-world/TAG")
job.submit()
替换以下内容:
PROJECT_ID:此流水线在其中运行的 Google Cloud 项目。
SHA256_TAG:模板版本的 sha256 哈希值。
TAG:模板的版本标记。
查看已创建的流水线运行
您可以在 Vertex AI SDK for Python 中查看由特定流水线版本创建的运行作业。
控制台
在 Google Cloud 控制台中打开 Vertex AI Pipelines。
点击您的模板标签页。
点击选择制品库。
从列表中选择
quickstart-kfp-repo代码库,然后点击选择。如需查看
hello-world流水线模板的版本列表,请点击hello world模板。点击要查看其流水线运行的所需版本。
如需查看所选版本的流水线运行,请点击查看运行,然后点击运行标签页。
Python 版 Vertex AI SDK
如需列出流水线运行,请运行 pipelineJobs.list 命令,如以下一个或多个示例所示:
from google.cloud import aiplatform
# To filter all runs created from a specific version
filter = 'template_uri:"https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/hello-world/*" AND ' + \
'template_metadata.version="%s"' % versionName
aiplatform.PipelineJob.list(filter=filter)
# To filter all runs created from a specific version tag
filter = 'template_uri="https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/hello-world/latest"'
aiplatform.PipelineJob.list(filter=filter)
# To filter all runs created from a package
filter = 'template_uri:"https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/hello-world/*"'
aiplatform.PipelineJob.list(filter=filter)
# To filter all runs created from a repo
filter = 'template_uri:"https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/*"'
aiplatform.PipelineJob.list(filter=filter)
在 Kubeflow Pipelines SDK 注册表客户端中使用模板
您可以将 Kubeflow Pipelines SDK 注册表客户端与 Artifact Registry 结合使用来下载和使用流水线模板。
如需列出代码库中的资源,请运行以下命令:
templatePackages = client.list_packages() templatePackage = client.get_package(package_name = "hello-world") versions = client.list_versions(package_name="hello-world") version = client.get_version(package_name="hello-world", version=versionName) tags = client.list_tags(package_name = "hello-world") tag = client.get_tag(package_name = "hello-world", tag="latest")如需查看可用方法和文档的完整列表,请参阅 Artifact Registry GitHub 代码库中的
proto文件。如需将模板下载到本地文件系统,请运行以下命令:
# Sample 1 filename = client.download_pipeline( package_name = "hello-world", version = versionName) # Sample 2 filename = client.download_pipeline( package_name = "hello-world", tag = "v1") # Sample 3 filename = client.download_pipeline( package_name = "hello-world", tag = "v1", file_name = "hello-world-template.yaml")
使用 Artifact Registry REST API
以下部分总结了如何使用 Artifact Registry REST API 来管理 Artifact Registry 代码库中的流水线模板。
使用 Artifact Registry REST API 上传流水线模板
您可以使用本部分所述的参数值创建 HTTP 请求来上传流水线模板,其中:
- PROJECT_ID是此流水线在其中运行的 Google Cloud 项目。
- REPO_ID 是您的 Artifact Registry 代码库的 ID。
curl 请求示例
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-F tags=v1,latest \
-F content=@pipeline_spec.yaml \
https://us-central1-kfp.pkg.dev/PROJECT_ID/REPO_ID
构建上传请求
该请求可以是 HTTP 或 HTTPS 分段请求;必须在请求标头中包含身份验证令牌。如需了解详情,请参阅 gcloud auth print-access-token。
请求的载荷是 pipeline_spec.yaml 文件(或 .zip 软件包)的内容。建议的大小上限为 10 MiB。
软件包名称取自 pipeline_spec.yaml 文件中的 pipeline_spec.pipeline_info.name 条目。软件包名称唯一标识软件包,并且在各个版本中不可变。长度在 4 到 128 个字符之间,并且必须与以下正则表达式匹配:^[a-z0-9][a-z0-9-]{3,127}$。
软件包 tags 是一个最多包含 8 个以英文逗号分隔的标记的列表。每个标记必须与以下正则表达式匹配:^[a-zA-Z0-9\-._~:@+]{1,128}$。
如果标记存在并指向已上传的流水线,则该标记将更新为指向您正在上传的流水线。例如,如果 latest 标记指向您已上传的流水线,并且您使用 --tag=latest 上传新版本,则系统会从之前上传的流水线移除 latest 标记并将其分配给您正在上传的新流水线。
如果您上传的流水线与之前上传的流水线相同,则上传成功。系统会更新所上传流水线的元数据(包括其版本标记),以与上传请求的参数值匹配。
上传响应
如果上传请求成功,则返回 HTTP OK 状态。响应的正文如下所示:
{packageName}/{versionName=sha256:abcdef123456...}
其中 versionName 是以十六进制字符串表示的 pipeline_spec.yaml 的 sha256 摘要。
使用 Artifact Registry REST API 下载流水线模板
您可以使用本部分所述的参数值创建 HTTP 请求来下载流水线模板,其中:
- PROJECT_ID是此流水线在其中运行的 Google Cloud 项目。
- REPO_ID 是您的 Artifact Registry 代码库的 ID。
- PACKAGE_ID 是您上传的模板的软件包 ID。
- TAG 是版本标记。
- VERSION 是模板版本,格式为
sha256:abcdef123456...。
对于标准 Artifact Registry 下载,您应按如下方式创建下载链接:
url = https://us-central1-kfp.pkg.dev/PROJECT_ID/REPO_ID/PACKAGE_ID/VERSION
url = https://us-central1-kfp.pkg.dev/PROJECT_ID/REPO_ID/PACKAGE_ID/TAG
curl 请求示例
curl -X GET \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
https://us-central1-kfp.pkg.dev/PROJECT_ID/REPO_ID/PACKAGE_ID/VERSION
您可以将 VERSION 替换为 TAG 并下载同一模板,如以下示例所示:
curl -X GET \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
https://us-central1-kfp.pkg.dev/PROJECT_ID/REPO_ID/PACKAGE_ID/TAG
下载响应
如果下载请求成功,则返回 HTTP OK 状态。响应的正文是 pipeline_spec.yaml 文件的内容。
参考链接
- 请参阅 Artifact Registry - 代码库概览,详细了解如何管理您的代码库。
- 代码库 API
- 格式关键字为“KFP”
- 软件包 API
- 版本 API
- 标记 API
- GitHub 上的 Proto 定义