您可以使用 Workflows 执行 Cloud Run 作业,作为工作流的一部分,以执行更复杂的数据处理或编排现有作业的系统。
本教程演示了如何使用 Workflows 执行 Cloud Run 作业,该作业用于处理存储在 Cloud Storage 存储桶中的事件数据。将事件载荷存储在 Cloud Storage 存储桶中,可让您使用客户管理的加密密钥来加密数据,而如果您将事件数据作为环境变量传递给 Cloud Run 作业,则无法实现这一点。
下图提供了简要概览:
创建 Cloud Run 作业
本教程使用可在 GitHub 上找到的示例代码。部署脚本会构建容器映像以创建 Cloud Run 作业。该脚本还会创建一个 Cloud Storage 存储桶。Cloud Run 作业会读取 Cloud Storage 存储桶中存储的任何事件数据,然后输出事件数据。
如果您在 Cloud Shell 中运行部署脚本,并且 Compute Engine 默认服务账号没有 Editor 角色,请向 Compute Engine 默认服务账号授予项目中的以下角色。(否则,您可以跳过此步骤,并在下一步中继续克隆示例应用代码库。)
授予 Artifact Registry Writer 角色 (
roles/artifactregistry.writer
):gcloud projects add-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=roles/artifactregistry.writer
将
PROJECT_NUMBER
替换为您的 Google Cloud项目编号。您可以在 Google Cloud 控制台的欢迎页面上,或通过运行以下命令来查找项目编号:gcloud projects describe PROJECT_ID --format='value(projectNumber)'
授予 Storage Object User 角色 (
roles/storage.objectUser
):gcloud projects add-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=roles/storage.objectUser
授予 Logging Logs Writer 角色 (
roles/logging.logWriter
):gcloud projects add-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=roles/logging.logWriter
通过将示例应用代码库克隆到本地机器来获取示例代码:
git clone https://github.com/GoogleCloudPlatform/workflows-demos.git
或者,您也可以下载该示例的 ZIP 文件
转到包含示例代码的目录:
cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
运行部署脚本,创建 Cloud Run 作业:
./deploy-job.sh
该脚本会创建一个名为 message-payload-PROJECT_ID
的 Cloud Storage 存储桶,其中 PROJECT_ID
是您的 Google Cloud 项目的 ID。系统还会创建一个名为 message-payload-job
的 Cloud Run 作业。
部署用于执行 Cloud Run 作业的工作流
定义并部署一个工作流,用于执行您刚刚创建的 Cloud Run 作业。工作流定义由一系列使用 Workflows 语法描述的步骤组成。
工作流接收事件,将事件数据保存到 Cloud Storage 存储桶,然后执行 Cloud Run 作业来处理事件数据。
控制台
在 Google Cloud 控制台中,前往 Workflows 页面:
点击
创建。输入新工作流的名称,例如
message-payload-workflow
。选择适当的区域,例如
us-central1
。在服务账号字段中,选择您之前创建的服务账号。
该服务账号充当工作流的身份。您应该已向服务账号授予以下角色:
- Cloud Run Admin:用于执行 Cloud Run 作业
- Logs Writer:用于写入日志条目
- Storage Object Creator:用于在 Cloud Storage 中创建对象
点击下一步。
在工作流编辑器中,输入工作流的以下定义:
点击部署。
gcloud
为工作流创建源代码文件:
touch message-payload-workflow.yaml
将以下工作流定义复制到
message-payload-workflow.yaml
:输入以下命令以部署工作流:
gcloud workflows deploy message-payload-workflow \ --location=us-central1 \ --source=message-payload-workflow.yaml \ --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
替换以下内容:
SERVICE_ACCOUNT_NAME
:您之前创建的服务账号的名称PROJECT_ID
:您的Google Cloud 项目的 ID
该服务账号充当工作流的身份。您应该已向服务账号授予以下角色:
roles/logging.logWriter
:用于写入日志条目roles/run.admin
:用于执行 Cloud Run 作业roles/storage.objectCreator
:在 Cloud Storage 中创建对象
该工作流会执行以下操作:
init
步 - 接受一个事件作为实参,并设置必要的变量。log_event
步 - 使用函数 sys.log 在 Cloud Logging 中创建日志条目。write_payload_to_gcs
步 - 发出 HTTPPOST
请求,并将事件载荷数据写入 Cloud Storage 存储桶文件。run_job_to_process_payload
步 - 使用 Cloud Run Admin API 连接器方法googleapis.run.v1.namespaces.jobs.run
执行作业。Cloud Storage 存储桶和数据文件名会作为替换变量从工作流传递到作业。finish
步骤 - 返回有关作业执行的信息,作为工作流的结果。
创建 Pub/Sub 主题
创建一个 Pub/Sub 主题,以便您可以向其发布消息。 Pub/Sub 事件用于演示如何使用 Workflows 路由事件,以及如何将事件保存到 Cloud Storage,以便 Cloud Run 作业可以处理事件数据。
控制台
在 Google Cloud 控制台中,前往主题页面。
点击
创建主题。在主题 ID 字段中,输入主题的 ID,例如
message-payload-topic
。保留添加默认订阅选项。
请勿选择其他选项。
点击创建。
gcloud
如需创建 ID 为 message-payload-topic
的主题,请运行 gcloud pubsub topics create
命令:
gcloud pubsub topics create message-payload-topic
创建 Eventarc 触发器以将事件路由到工作流
如需自动执行工作流,进而执行 Cloud Run 作业,请创建一个响应 Pub/Sub 事件并将事件路由到工作流的 Eventarc 触发器。每当有消息写入 Pub/Sub 主题时,该事件都会触发工作流的执行。
控制台
在 Google Cloud 控制台中,前往 Workflows 页面:
点击工作流的名称,例如
message-payload-workflow
。在工作流详细信息页面上,点击
修改。在修改工作流页面上的触发器部分中,点击添加新触发器 > Eventarc。
Eventarc 触发器窗格随即会打开。
在触发器名称字段中,输入触发器的名称,例如
message-payload-trigger
。从事件提供方列表中,选择 Cloud Pub/Sub。
从事件列表中,选择 google.cloud.pubsub.topic.v1.messagePublished。
从选择 Cloud Pub/Sub 主题列表中,选择您之前创建的 Pub/Sub 主题。
在服务账号字段中,选择您之前创建的服务账号。
该服务账号充当触发器的身份。您应该已向服务账号授予以下角色:
- Eventarc Event Receiver:用于接收事件
- Workflows Invoker:用于执行工作流
点击保存触发器。
Eventarc 触发器现在显示在修改工作流页面上的触发器部分中。
点击下一步。
点击部署。
gcloud
运行以下命令,创建 Eventarc 触发器:
gcloud eventarc triggers create message-payload-trigger \ --location=us-central1 \ --destination-workflow=message-payload-workflow \ --destination-workflow-location=us-central1 \ --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \ --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \ --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
替换以下内容:
PROJECT_ID
:您的 Google Cloud 项目的 IDSERVICE_ACCOUNT_NAME
:您之前创建的服务账号的名称。
该服务账号充当触发器的身份。您应该已向服务账号授予以下角色:
roles/eventarc.eventReceiver
:接收事件roles/workflows.invoker
:执行工作流
触发工作流
通过向 Pub/Sub 主题发布消息并生成事件来测试端到端系统。如需了解详情,请参阅使用事件或 Pub/Sub 消息触发工作流。
向 Pub/Sub 主题发布消息以生成事件:
gcloud pubsub topics publish message-payload-topic --message="Hello World"
事件会被路由到工作流,该工作流会记录事件消息、将事件数据保存到 Cloud Storage 存储桶,并执行 Cloud Run 作业来处理保存在 Cloud Storage 中的数据。此过程可能需要 1 分钟左右。
查看作业执行情况,确认 Cloud Run 作业是否按预期运行:
gcloud run jobs executions list --job=message-payload-job
您应该会在输出中看到新的作业执行。
如需查看通过触发工作流创建的与事件相关的日志条目,请运行以下命令:
gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
查找类似如下的日志条目:
textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\ \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}" ... resource: labels: job_name: message-payload-job location: us-central1 project_id: MY_PROJECT type: cloud_run_job textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
您可以通过查看 Cloud Storage 存储桶对象中的事件数据来确认结果是否符合预期。
检索您的存储桶名称:
gcloud storage ls
输出类似于以下内容:
gs://message-payload-PROJECT_ID/
列出存储桶中的对象:
gcloud storage ls gs://message-payload-PROJECT_ID/** --recursive
输出应类似如下所示:
gs://message-payload-PROJECT_ID/OBJECT_ID.data.json
请记下要在下一步中使用的
OBJECT_ID
。将存储桶中的对象下载为文件:
gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt
将
OBJECT_ID
替换为上一步中返回的 ID。在文本编辑器中,打开
message-event.txt
文件。写入到文件中的事件正文应类似于以下内容:{ "message": { "data": "SGVsbG8gV29ybGQ=", "messageId": "8254002311197919", "publishTime": "2023-09-20T16:54:29.312Z" }, "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741" }
请注意,如果您从 Base64 格式对
SGVsbG8gV29ybGQ=
的数据值进行解码,则返回“Hello World”。