使用 Workflows 运行批处理作业

Batch 是一项全代管式服务,可让您在 Compute Engine 虚拟机 (VM) 实例上安排、排队和执行批处理工作负载。Batch 会代表您预配资源并管理容量,使批处理工作负载能够大规模运行。

借助 Workflows,您可以按您定义的顺序执行所需的服务,该顺序使用 Workflows 语法进行描述。

在本教程中,您将使用 Batch 的工作流连接器来调度和运行一个 Batch 作业,该作业将在两个 Compute Engine 虚拟机上并行执行六项任务。同时使用 Batch 和 Workflows 可让您结合两者的优势,高效地预配和编排整个流程。

创建 Artifact Registry 仓库

创建一个代码库来存储您的 Docker 容器映像。

控制台

  1. 在 Google Cloud 控制台中,前往制品库页面。

    前往制品库

  2. 点击 创建代码库

  3. 输入 containers 作为代码库名称。

  4. 格式字段中,选择 Docker

  5. 位置类型字段中,选择区域

  6. 区域列表中,选择 us-central1

  7. 点击创建

gcloud

运行以下命令:

  gcloud artifacts repositories create containers \
    --repository-format=docker \
    --location=us-central1

您已在 us-central1 区域中创建了一个名为 containers 的 Artifact Registry 代码库。如需详细了解支持的区域,请参阅 Artifact Registry 位置

获取代码示例

Google Cloud 将本教程的应用源代码存储在 GitHub 中。您可以克隆该代码库或下载示例。

  1. 将示例应用代码库克隆到本地机器:

    git clone https://github.com/GoogleCloudPlatform/batch-samples.git
    

    或者,您也可以下载 main.zip 文件中的示例并将其解压缩。

  2. 转到包含示例代码的目录:

    cd batch-samples/primegen
    

现在,您已拥有开发环境中应用的源代码。

使用 Cloud Build 构建 Docker 映像

Dockerfile 包含使用 Cloud Build 构建 Docker 映像所需的信息。运行以下命令以构建该应用:

gcloud builds submit \
  -t us-central1-docker.pkg.dev/PROJECT_ID/containers/primegen-service:v1 PrimeGenService/

PROJECT_ID 替换为您的 Google Cloud项目 ID。

构建完成后,您应该会看到如下所示的输出:

DONE
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ID: a54818cc-5d14-467b-bfda-5fc9590af68c
CREATE_TIME: 2022-07-29T01:48:50+00:00
DURATION: 48S
SOURCE: gs://project-name_cloudbuild/source/1659059329.705219-17aee3a424a94679937a7200fab15bcf.tgz
IMAGES: us-central1-docker.pkg.dev/project-name/containers/primegen-service:v1
STATUS: SUCCESS

您已使用 Dockerfile 构建了一个名为 primegen-service 的 Docker 映像,并将该映像推送到了名为 containers 的 Artifact Registry 代码库。

部署用于安排和运行 Batch 作业的工作流

以下工作流会调度并运行一个 Batch 作业,该作业会在两个 Compute Engine 虚拟机上并行运行一个 Docker 容器作为六项任务。结果是生成六批素数,并将其存储在 Cloud Storage 存储桶中。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面。

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称,例如 batch-workflow

  4. 区域列表中,选择 us-central1

  5. 选择您之前创建的服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    YAML

    main:
      params: [args]
      steps:
        - init:
            assign:
              - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - region: "us-central1"
              - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
              - jobId: ${"job-primegen-" + string(int(sys.now()))}
              - bucket: ${projectId + "-" + jobId}
        - createBucket:
            call: googleapis.storage.v1.buckets.insert
            args:
              query:
                project: ${projectId}
              body:
                name: ${bucket}
        - logCreateBucket:
            call: sys.log
            args:
              data: ${"Created bucket " + bucket}
        - logCreateBatchJob:
            call: sys.log
            args:
              data: ${"Creating and running the batch job " + jobId}
        - createAndRunBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.create
            args:
                parent: ${"projects/" + projectId + "/locations/" + region}
                jobId: ${jobId}
                body:
                  taskGroups:
                    taskSpec:
                      runnables:
                        - container:
                            imageUri: ${imageUri}
                          environment:
                            variables:
                              BUCKET: ${bucket}
                    # Run 6 tasks on 2 VMs
                    taskCount: 6
                    parallelism: 2
                  logsPolicy:
                    destination: CLOUD_LOGGING
            result: createAndRunBatchJobResponse
        # You can delete the batch job or keep it for debugging
        - logDeleteBatchJob:
            call: sys.log
            args:
              data: ${"Deleting the batch job " + jobId}
        - deleteBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.delete
            args:
                name: ${"projects/" + projectId + "/locations/" + region + "/jobs/" + jobId}
            result: deleteResult
        - returnResult:
            return:
              jobId: ${jobId}
              bucket: ${bucket}

    JSON

    {
      "main": {
        "params": [
          "args"
        ],
        "steps": [
          {
            "init": {
              "assign": [
                {
                  "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                },
                {
                  "region": "us-central1"
                },
                {
                  "imageUri": "${region + \"-docker.pkg.dev/\" + projectId + \"/containers/primegen-service:v1\"}"
                },
                {
                  "jobId": "${\"job-primegen-\" + string(int(sys.now()))}"
                },
                {
                  "bucket": "${projectId + \"-\" + jobId}"
                }
              ]
            }
          },
          {
            "createBucket": {
              "call": "googleapis.storage.v1.buckets.insert",
              "args": {
                "query": {
                  "project": "${projectId}"
                },
                "body": {
                  "name": "${bucket}"
                }
              }
            }
          },
          {
            "logCreateBucket": {
              "call": "sys.log",
              "args": {
                "data": "${\"Created bucket \" + bucket}"
              }
            }
          },
          {
            "logCreateBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Creating and running the batch job \" + jobId}"
              }
            }
          },
          {
            "createAndRunBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.create",
              "args": {
                "parent": "${\"projects/\" + projectId + \"/locations/\" + region}",
                "jobId": "${jobId}",
                "body": {
                  "taskGroups": {
                    "taskSpec": {
                      "runnables": [
                        {
                          "container": {
                            "imageUri": "${imageUri}"
                          },
                          "environment": {
                            "variables": {
                              "BUCKET": "${bucket}"
                            }
                          }
                        }
                      ]
                    },
                    "taskCount": 6,
                    "parallelism": 2
                  },
                  "logsPolicy": {
                    "destination": "CLOUD_LOGGING"
                  }
                }
              },
              "result": "createAndRunBatchJobResponse"
            }
          },
          {
            "logDeleteBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Deleting the batch job \" + jobId}"
              }
            }
          },
          {
            "deleteBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.delete",
              "args": {
                "name": "${\"projects/\" + projectId + \"/locations/\" + region + \"/jobs/\" + jobId}"
              },
              "result": "deleteResult"
            }
          },
          {
            "returnResult": {
              "return": {
                "jobId": "${jobId}",
                "bucket": "${bucket}"
              }
            }
          }
        ]
      }
    }
    
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch batch-workflow.JSON_OR_YAML

    JSON_OR_YAML 替换为 yamljson,具体取决于工作流的格式。

  2. 在文本编辑器中,将以下工作流复制到您的源代码文件中:

    YAML

    main:
      params: [args]
      steps:
        - init:
            assign:
              - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - region: "us-central1"
              - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
              - jobId: ${"job-primegen-" + string(int(sys.now()))}
              - bucket: ${projectId + "-" + jobId}
        - createBucket:
            call: googleapis.storage.v1.buckets.insert
            args:
              query:
                project: ${projectId}
              body:
                name: ${bucket}
        - logCreateBucket:
            call: sys.log
            args:
              data: ${"Created bucket " + bucket}
        - logCreateBatchJob:
            call: sys.log
            args:
              data: ${"Creating and running the batch job " + jobId}
        - createAndRunBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.create
            args:
                parent: ${"projects/" + projectId + "/locations/" + region}
                jobId: ${jobId}
                body:
                  taskGroups:
                    taskSpec:
                      runnables:
                        - container:
                            imageUri: ${imageUri}
                          environment:
                            variables:
                              BUCKET: ${bucket}
                    # Run 6 tasks on 2 VMs
                    taskCount: 6
                    parallelism: 2
                  logsPolicy:
                    destination: CLOUD_LOGGING
            result: createAndRunBatchJobResponse
        # You can delete the batch job or keep it for debugging
        - logDeleteBatchJob:
            call: sys.log
            args:
              data: ${"Deleting the batch job " + jobId}
        - deleteBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.delete
            args:
                name: ${"projects/" + projectId + "/locations/" + region + "/jobs/" + jobId}
            result: deleteResult
        - returnResult:
            return:
              jobId: ${jobId}
              bucket: ${bucket}

    JSON

    {
      "main": {
        "params": [
          "args"
        ],
        "steps": [
          {
            "init": {
              "assign": [
                {
                  "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                },
                {
                  "region": "us-central1"
                },
                {
                  "imageUri": "${region + \"-docker.pkg.dev/\" + projectId + \"/containers/primegen-service:v1\"}"
                },
                {
                  "jobId": "${\"job-primegen-\" + string(int(sys.now()))}"
                },
                {
                  "bucket": "${projectId + \"-\" + jobId}"
                }
              ]
            }
          },
          {
            "createBucket": {
              "call": "googleapis.storage.v1.buckets.insert",
              "args": {
                "query": {
                  "project": "${projectId}"
                },
                "body": {
                  "name": "${bucket}"
                }
              }
            }
          },
          {
            "logCreateBucket": {
              "call": "sys.log",
              "args": {
                "data": "${\"Created bucket \" + bucket}"
              }
            }
          },
          {
            "logCreateBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Creating and running the batch job \" + jobId}"
              }
            }
          },
          {
            "createAndRunBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.create",
              "args": {
                "parent": "${\"projects/\" + projectId + \"/locations/\" + region}",
                "jobId": "${jobId}",
                "body": {
                  "taskGroups": {
                    "taskSpec": {
                      "runnables": [
                        {
                          "container": {
                            "imageUri": "${imageUri}"
                          },
                          "environment": {
                            "variables": {
                              "BUCKET": "${bucket}"
                            }
                          }
                        }
                      ]
                    },
                    "taskCount": 6,
                    "parallelism": 2
                  },
                  "logsPolicy": {
                    "destination": "CLOUD_LOGGING"
                  }
                }
              },
              "result": "createAndRunBatchJobResponse"
            }
          },
          {
            "logDeleteBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Deleting the batch job \" + jobId}"
              }
            }
          },
          {
            "deleteBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.delete",
              "args": {
                "name": "${\"projects/\" + projectId + \"/locations/\" + region + \"/jobs/\" + jobId}"
              },
              "result": "deleteResult"
            }
          },
          {
            "returnResult": {
              "return": {
                "jobId": "${jobId}",
                "bucket": "${bucket}"
              }
            }
          }
        ]
      }
    }
    
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy batch-workflow \
      --source=batch-workflow.yaml \
      --location=us-central1 \
      --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    SERVICE_ACCOUNT_NAME 替换为您之前创建的服务账号的名称。

执行工作流

执行某个工作流会运行与该工作流关联的当前工作流定义。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面。

    进入 Workflows

  2. 工作流页面上,点击批量工作流工作流以转到其详情页面。

  3. 工作流详情页面上,点击 执行

  4. 再次点击执行

    工作流执行应需要几分钟时间。

  5. 输出窗格中查看工作流的结果。

    结果应如下所示:

    {
      "bucket": "project-name-job-primegen-TIMESTAMP",
      "jobId": "job-primegen-TIMESTAMP"
    }
    

gcloud

  1. 执行工作流:

    gcloud workflows run batch-workflow \
      --location=us-central1

    工作流执行应需要几分钟时间。

  2. 您可以检查长时间运行的执行操作的状态

  3. 如需获取最后一次完成的执行的状态,请运行以下命令:

    gcloud workflows executions describe-last

    结果应类似如下所示:

    name: projects/PROJECT_NUMBER/locations/us-central1/workflows/batch-workflow/executions/EXECUTION_ID
    result: '{"bucket":"project-name-job-primegen-TIMESTAMP","jobId":"job-primegen-TIMESTAMP"}'
    startTime: '2022-07-29T16:08:39.725306421Z'
    state: SUCCEEDED
    status:
      currentSteps:
      - routine: main
        step: returnResult
    workflowRevisionId: 000001-9ba
    

列出输出存储桶中的对象

通过列出 Cloud Storage 输出存储桶中的对象,您可以确认结果是否符合预期。

控制台

  1. 在 Google Cloud 控制台中,转到 Cloud Storage 存储桶页面。

    进入“存储桶”

  2. 在存储分区列表中,点击您要查看其内容的存储分区的名称。

    结果应类似于以下内容,其中总共有 6 个文件,每个文件列出了一批 10,000 个素数:

    primes-1-10000.txt
    primes-10001-20000.txt
    primes-20001-30000.txt
    primes-30001-40000.txt
    primes-40001-50000.txt
    primes-50001-60000.txt
    

gcloud

  1. 检索输出存储桶名称:

    gcloud storage ls

    输出类似于以下内容:

    gs://PROJECT_ID-job-primegen-TIMESTAMP/

  2. 列出输出存储桶中的对象:

    gcloud storage ls gs://PROJECT_ID-job-primegen-TIMESTAMP/** --recursive

    TIMESTAMP 替换为上一条命令返回的时间戳。

    输出应类似如下所示,总共有 6 个文件,每个文件列出了一批 10,000 个素数:

    gs://project-name-job-primegen-TIMESTAMP/primes-1-10000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-10001-20000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-20001-30000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-30001-40000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-40001-50000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-50001-60000.txt