根据 Pub/Sub 队列量自动扩缩工作器池

本教程介绍如何部署 Cloud Run 工作器池来 处理 Pub/Sub 消息,以及如何使用 Cloud Run 外部指标 自动扩缩 (CREMA) 根据队列深度自动扩缩使用方 实例。

目标

在此教程中,您将学习以下操作:

费用

在本文档中,您将使用的以下收费组件: Google Cloud

您可使用 价格计算器 根据您的预计使用情况来估算费用。

新 Google Cloud 用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud的新用户, 请创建账号,以评估我们的产品在 实际场景中的表现。新客户还可获享 $300 赠金,用于 运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  5. Verify that billing is enabled for your Google Cloud project.

  6. 启用 Cloud Run API、Parameter Manager API、Artifact Registry API、Pub/Sub API 和 Cloud Build API。

    启用 API 所需的角色

    如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予 角色

    启用 API

  7. 安装并初始化 gcloud CLI
  8. 更新组件:
    gcloud components update
  9. 本教程使用多个环境变量。为了改进调试,请运行以下命令,以便在引用未设置的本地环境变量时生成错误:
    set -u
  10. 设置本教程中使用的 CREMA 的以下配置变量:
    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    export TOPIC_ID=crema-pubsub-topic
    export SUBSCRIPTION_ID=crema-subscription
    export CREMA_SA_NAME=crema-service-account
    export CONSUMER_SA_NAME=consumer-service-account
    export CONSUMER_WORKER_POOL_NAME=worker-pool-consumer
    export CREMA_SERVICE_NAME=my-crema-service
    PROJECT_ID 替换为您的 Google Cloud 项目 ID。
  11. 运行以下命令以设置项目 ID:
    gcloud config set project $PROJECT_ID
  12. 您需要为 Cloud Run 伸缩服务付费,费用取决于您触发伸缩的频率。如需了解详情,请使用价格计算器估算费用。

所需的角色

如需获得完成本教程所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建 Pub/Sub 主题和订阅

如需自动扩缩工作器,请按照以下步骤为使用方应用创建拉取订阅:

  1. 创建 Pub/Sub 主题,以表示消息 Feed:

    gcloud pubsub topics create $TOPIC_ID
    
  2. 创建拉取订阅,以使用 Pub/Sub 主题中的消息:

    gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC_ID
    

创建自定义服务账号

本教程需要以下两个服务账号,它们具有使用预配资源所需的最低 权限才能使用预配的 资源:

  • 使用方服务帐号:处理消息的使用方工作器池的身份。运行以下命令以创建使用方服务账号:

    gcloud iam service-accounts create $CONSUMER_SA_NAME \
      --display-name="Pub/Sub consumer service account"
    
  • CREMA 服务帐号:自动扩缩器的身份。运行以下命令以创建 CREMA 服务帐号:

    gcloud iam service-accounts create $CREMA_SA_NAME \
      --display-name="CREMA service account"
    

向自定义服务账号授予其他权限

如需扩缩工作器池,请向自定义服务账号授予以下权限:

  1. 向 CREMA 服务帐号授予从 Parameter Manager 读取的权限:

    gcloud projects add-iam-policy-binding $PROJECT_ID \
      --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
      --role="roles/parametermanager.parameterViewer"
    
  2. 向 CREMA 服务帐号授予扩缩工作器池的权限:

    gcloud projects add-iam-policy-binding $PROJECT_ID \
      --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
      --role="roles/run.developer"
    
  3. 向 CREMA 服务帐号授予服务帐号 User 角色:

    gcloud projects add-iam-policy-binding $PROJECT_ID \
      --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
      --role="roles/iam.serviceAccountUser"
    
  4. 向 CREMA 服务帐号授予查看指标的权限:

     gcloud projects add-iam-policy-binding $PROJECT_ID \
       --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
       --role="roles/monitoring.viewer"
    
  5. 向 CREMA 服务帐号授予写入指标的权限:

     gcloud projects add-iam-policy-binding $PROJECT_ID \
       --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
       --role="roles/monitoring.metricWriter"
    
  6. 向 CREMA 服务帐号授予查看 Pub/Sub 消息的权限:

    gcloud pubsub subscriptions add-iam-policy-binding $SUBSCRIPTION_ID \
      --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
      --role="roles/pubsub.viewer"
    
  7. 授予您的使用方服务帐号从订阅中拉取消息的权限:

    gcloud pubsub subscriptions add-iam-policy-binding $SUBSCRIPTION_ID \
      --member="serviceAccount:$CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
      --role="roles/pubsub.subscriber"
    

部署 Cloud Run 工作器池

如需部署使用 Pub/Sub 订阅中的消息的工作器池,请按以下步骤操作:

  1. 创建一个名为 consumer 的文件夹,然后将目录更改为该文件夹:

    mkdir consumer
    cd consumer
    
  2. 创建一个名为 worker.py 的文件,并添加以下代码:

    import os
    import time
    from google.cloud import pubsub_v1
    from concurrent.futures import TimeoutError
    
    # Configuration
    PROJECT_ID = os.environ.get('PROJECT_ID')
    SUBSCRIPTION_ID = os.environ.get('SUBSCRIPTION_ID')
    
    subscription_path = f"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_ID}"
    
    print(f"Worker Pool instance starting. Watching {subscription_path}...")
    
    subscriber = pubsub_v1.SubscriberClient()
    
    def callback(message):
        try:
            data = message.data.decode("utf-8")
            print(f"Processing job: {data}")
            time.sleep(5)  # Simulate work
            print(f"Done {data}")
            message.ack()
        except Exception as e:
            print(f"Error processing message: {e}")
            message.nack()
    
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    print(f"Listening for messages on {subscription_path}...")
    
    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result()
        except TimeoutError:
            streaming_pull_future.cancel()  # Trigger the shutdown.
            streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(f"Streaming pull failed: {e}")
    
  3. 创建一个 Dockerfile 并添加以下代码:

    FROM python:3.12-slim
    RUN pip install google-cloud-pubsub
    COPY worker.py .
    CMD ["python", "-u", "worker.py"]
    
  4. 部署使用方工作器池,其中包含 0 个实例,以便 CREMA 进行扩容:

    gcloud beta run worker-pools deploy $CONSUMER_WORKER_POOL_NAME \
      --source . \
      --region $REGION \
      --service-account="$CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
      --instances=0 \
      --set-env-vars PROJECT_ID=$PROJECT_ID,SUBSCRIPTION_ID=$SUBSCRIPTION_ID
    

部署自动扩缩器 CREMA 服务

将工作器池部署为使用 Pub/Sub 中的消息后,请配置 CREMA 自动扩缩器以根据消息量预配工作器实例。

配置自动扩缩器

本教程使用 Parameter Manager 来存储 CREMA 的 YAML 配置文件。

  1. 在 Parameter Manager 中创建一个参数,以存储 CREMA 的参数版本:

    PARAMETER_ID=crema-config
    PARAMETER_REGION=global
    gcloud parametermanager parameters create $PARAMETER_ID --location=$PARAMETER_REGION --parameter-format=YAML
    
  2. 运行以下命令以导航到项目的根目录:

    cd
    
  3. 在根目录中,创建一个 YAML 文件 my-crema-config.yaml 以定义自动扩缩器配置:

    apiVersion: crema/v1
    kind: CremaConfig
    spec:
      pollingInterval: 30
      triggerAuthentications:
        - metadata:
            name: adc-trigger-auth
          spec:
            podIdentity:
              provider: gcp
      scaledObjects:
        - spec:
            scaleTargetRef:
              name: projects/PROJECT_ID/locations/us-central1/workerpools/worker-pool-consumer
            triggers:
              - type: gcp-pubsub
                metadata:
                  subscriptionName: "crema-subscription"
                  # Target number of undelivered messages per worker instance
                  value: "10"
                  mode: "SubscriptionSize"
                authenticationRef:
                  name: adc-trigger-auth
    

    PROJECT_ID 替换为 Google Cloud 项目 ID。

  4. 将本地 YAML 文件作为新的参数版本上传:

    LOCAL_YAML_CONFIG_FILE=my-crema-config.yaml
    PARAMETER_VERSION=1
    
    gcloud parametermanager parameters versions create $PARAMETER_VERSION \
      --location=$PARAMETER_REGION \
      --parameter=$PARAMETER_ID \
      --payload-data-from-file=$LOCAL_YAML_CONFIG_FILE
    
  5. 运行以下命令以验证参数添加是否成功:

    gcloud parametermanager parameters versions list \
    --parameter=$PARAMETER_ID \
    --location=$PARAMETER_REGION
    

    您应该会看到参数路径,例如 projects/PROJECT_ID/locations/global/parameters/crema-config/versions/1

部署服务以扩缩工作负载

如需部署服务以扩缩工作器池,请使用预构建的容器映像运行以下命令:

CREMA_CONFIG_PARAM_VERSION=projects/$PROJECT_ID/locations/$PARAMETER_REGION/parameters/$PARAMETER_ID/versions/$PARAMETER_VERSION
IMAGE=us-central1-docker.pkg.dev/cloud-run-oss-images/crema-v1/autoscaler:1.0

gcloud beta run deploy $CREMA_SERVICE_NAME \
  --image=${IMAGE} \
  --region=${REGION} \
  --service-account="${CREMA_SA_NAME}" \
  --no-allow-unauthenticated \
  --no-cpu-throttling \
  --base-image=us-central1-docker.pkg.dev/serverless-runtimes/google-24/runtimes/java25 \
  --labels=created-by=crema \
  --set-env-vars="CREMA_CONFIG=${CREMA_CONFIG_PARAM_VERSION},OUTPUT_SCALER_METRICS=True"

测试自动扩缩服务

通过创建生成 100 条消息并将其推送到 Pub/Sub 队列的脚本来测试 CREMA 服务:

  1. 在根目录中,创建一个名为 load-pubsub.sh 的文件,并添加以下代码:

    #!/bin/bash
    
    TOPIC_ID=${TOPIC_ID}
    PROJECT_ID=${PROJECT_ID}
    NUM_MESSAGES=100
    
    echo "Publishing $NUM_MESSAGES messages to topic $TOPIC_ID..."
    
    for i in $(seq 1 $NUM_MESSAGES); do
      gcloud pubsub topics publish $TOPIC_ID --message="job-$i" --project=$PROJECT_ID &
      if (( $i % 10 == 0 )); then
        wait
        echo "Published $i messages..."
      fi
    done
    wait
    echo "Done. All messages published."
    
  2. 运行负载测试:

    chmod +x load-pubsub.sh
    ./load-pubsub.sh
    

此命令会生成 100 条消息并将其推送到 Pub/Sub 订阅。

监控扩缩

load-pubsub.sh 脚本完成后,请等待 3 到 4 分钟 ,然后再 检查服务 my-crema-service日志。CREMA 自动扩缩器服务会将使用方工作器实例从 0 扩容。

您应该会看到以下日志:

每条日志消息都标有发出该消息的组件。

[INFO] [METRIC-PROVIDER] Starting metric collection cycle
[INFO] [METRIC-PROVIDER] Successfully fetched scaled object metrics ...
[INFO] [METRIC-PROVIDER] Sending scale request ...
[INFO] [SCALER] Received ScaleRequest ...
[INFO] [SCALER] Current instances ...
[INFO] [SCALER] Recommended instances ...

或者,运行以下命令以验证 CREMA 服务是否根据队列深度推荐实例:

gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=$CREMA_SERVICE_NAME AND textPayload:SCALER" \
  --limit=20 \
  --format="value(textPayload)" \
  --freshness=5m

如需查看使用消息的使用方日志,请运行以下命令:

gcloud beta run worker-pools logs tail $CONSUMER_WORKER_POOL_NAME --region=$REGION

您应该会看到遵循 Done job-100 格式的日志。

清理

为避免您的 Google Cloud 账号产生额外费用,请删除您在本教程中部署的所有资源。

删除项目

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是某个现有项目,并且需要保留此项目但不保留在本教程中所做的更改,请删除为本教程创建的资源

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往 管理资源 页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击 关闭以删除项目。

删除教程资源

  1. 删除您在本教程中部署的 Cloud Run 服务。Cloud Run 服务在收到请求之前不会产生费用。

    如需删除 Cloud Run 服务,请运行以下命令:

    gcloud run services delete SERVICE-NAME

    SERVICE-NAME 替换为服务的名称。

    您还可以通过Google Cloud 控制台删除 Cloud Run 服务。

  2. 移除您在教程设置过程中添加的 gcloud 默认区域配置:

     gcloud config unset run/region
    
  3. 移除项目配置:

     gcloud config unset project
    
  4. 删除 Pub/Sub 资源:

    gcloud pubsub subscriptions delete $SUBSCRIPTION_ID
    gcloud pubsub topics delete $TOPIC_ID
    
  5. 删除在本教程中创建的其他 Google Cloud 资源:

后续步骤