构建一对多 Pub/Sub 系统

本教程将引导您设置一组应用,这些应用通过 Pub/Sub(而不是同步远程过程调用 (RPC))发送消息进行通信。通过解耦应用,消息传递:

  • 使应用更稳健
  • 可以简化开发

例如,调用方(发布方)不需要订阅方随时待命。发布方向 Pub/Sub 发送消息。发布方不需要知道需要接收消息的订阅方应用具体是哪个以及应用的数量是多少。因此,您可以借助该服务在消息可用时将消息传递给一个或多个订阅者应用。

系统概览

在本教程中,您将启动一个发布方应用,该应用使用一对多通信将“Hello, World!”消息发送给两个订阅方,如下图所示:

主题、其相关订阅以及向 Cloud Pub/Sub 发送消息的发布者和从 Cloud Pub/Sub 接收消息的订阅者应用的示意图

这两个订阅者应用使用相同的代码,但您可以在不同的时间启动它们。此过程演示了 Pub/Sub 启用异步通信的方式。如需构建此系统,请完成以下步骤:

  1. 创建应用用于进行身份验证的 IAM 服务账号。
  2. 设置 IAM 权限
  3. 创建 Pub/Sub 主题和订阅。
  4. 启动三个独立应用:一个发布者应用和两个订阅者应用

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。

  3. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  4. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  5. 创建或选择 Google Cloud 项目

    选择或创建项目所需的角色

    • 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
    • 创建项目:如需创建项目,您需要拥有 Project Creator 角色 (roles/resourcemanager.projectCreator),该角色包含 resourcemanager.projects.create 权限。了解如何授予角色
    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目名称。

  6. 验证是否已为您的 Google Cloud 项目启用结算功能

  7. 启用 Pub/Sub API:

    启用 API 所需的角色

    如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予角色

    gcloud services enable pubsub.googleapis.com
  8. 为您的用户账号创建本地身份验证凭证:

    gcloud auth application-default login

    如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI

  9. 向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/pubsub.publisher, roles/pubsub.subscriber

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com
    • ROLE:您授予用户账号的 IAM 角色。
  10. 安装 Google Cloud CLI。

  11. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  12. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  13. 创建或选择 Google Cloud 项目

    选择或创建项目所需的角色

    • 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
    • 创建项目:如需创建项目,您需要拥有 Project Creator 角色 (roles/resourcemanager.projectCreator),该角色包含 resourcemanager.projects.create 权限。了解如何授予角色
    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目名称。

  14. 验证是否已为您的 Google Cloud 项目启用结算功能

  15. 启用 Pub/Sub API:

    启用 API 所需的角色

    如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予角色

    gcloud services enable pubsub.googleapis.com
  16. 为您的用户账号创建本地身份验证凭证:

    gcloud auth application-default login

    如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI

  17. 向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/pubsub.publisher, roles/pubsub.subscriber

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com
    • ROLE:您授予用户账号的 IAM 角色。

安装 Python

本教程使用 Pub/Sub 客户端库,该库需要 Python 3.7 或更高版本。按照安装 Python 的说明完成操作。

设置 Pub/Sub 项目

为了管理发布方应用和订阅方应用之间的消息流,您需要创建一个主题和两个不同的订阅。

创建 Pub/Sub 主题

创建 ID 为 hello_topic 的主题:

gcloud pubsub topics create hello_topic

创建 Pub/Sub 订阅

创建两个订阅,并将它们附加到您的主题。

这些订阅是 StreamingPull 订阅,这是一种拉取订阅。

订阅 1

创建 ID 为 sub_one 的订阅,并将其附加到 hello_topic

gcloud pubsub subscriptions create sub_one --topic=hello_topic

订阅 2

创建 ID 为 sub_two 的订阅,并将其附加到 hello_topic

gcloud pubsub subscriptions create sub_two --topic=hello_topic

构建一对多系统

下载发布方和订阅方代码

  1. 下载本教程所需的 Pub/Sub Python 文件。

     git clone https://github.com/googleapis/python-pubsub.git
  2. 关闭所有打开的终端,然后再继续操作。

设置三个终端

  1. 为每个教程应用(一个发布者应用和两个订阅者应用)启动一个终端。为方便起见,本教程将这些终端称为:

    • 发布者终端
    • sub_one 终端
    • sub_two 终端
  2. 发布方终端中,创建并激活名为 pyenv-qsPython 虚拟环境

    Bash

    python -m venv pyenv-qs
    source pyenv-qs/bin/activate

    PowerShell

    py -m venv pyenv-qs
    .\pyenv-qs\Scripts\activate

    sub_onesub_two 终端中,运行以下命令:

    Bash

    source pyenv-qs/bin/activate

    PowerShell

    .\pyenv-qs\Scripts\activate

    运行 activate 命令后,命令提示符会包含以下值 (pyenv-qs) $

  3. 发布方终端中,使用 pip 安装 Pub/Sub Python 客户端库:

    python -m pip install --upgrade google-cloud-pubsub
  4. 在所有三个终端中,使用当前项目 ID 设置环境变量。此 gcloud 命令确定您选择的项目 ID 并将其设置为变量:

    Bash

    export PROJECT=`gcloud config get-value project`

    PowerShell

    $env:PROJECT=$(gcloud config get-value project)
  5. 在所有三个终端中,切换到包含示例代码的项目路径。

    cd python-pubsub/samples/snippets/quickstart/

启动应用并观察消息流

启动订阅者 1 应用

在 sub_one 终端中,启动订阅者 1

Bash

python sub.py $PROJECT sub_one

PowerShell

py sub.py $env:PROJECT sub_one

启动后,此应用会打开与服务器之间的双向流式传输连接。Pub/Sub 通过数据流传送消息。

订阅者 1 应用开始侦听有关 sub_one 订阅的消息。

启动发布者应用

在发布者终端中,启动发布者应用:

Bash

python pub.py $PROJECT hello_topic

PowerShell

py pub.py $env:PROJECT hello_topic

发布者应用启动后,Pub/Sub 系统将执行以下操作:

  • 发布者应用将“Hello, World!”消息发送给 Pub/Sub,此时该应用还不知道现在是否有任何订阅。该服务器还分配一个消息 ID。

  • 订阅者 1 应用接收“Hello World”消息,输出该消息,并向 Pub/Sub 发送确认。

  • 发布者应用输出确认。确认告知 Pub/Sub 消息已成功处理,无需重新发送给此订阅者或任何其他 sub_one 订阅者。

Pub/Sub 移除 sub_one 中的消息。

发布者应用发布消息并分配消息 ID。订阅者 1 应用接收“Hello World”消息并发送确认。

启动订阅者 2 应用

在 sub_two 终端中,启动订阅者 2

Bash

python sub.py $PROJECT sub_two

PowerShell

py sub.py $env:PROJECT sub_two

该订阅者接收传递给 sub_two 订阅的消息。 订阅者 2 重复使用 sub.py 脚本。其区别在于,订阅方 2 在发布方将消息发送到主题和订阅之后才会启动。如果发布者直接调用订阅者 2,则发布应用要么等到订阅者 2 启动,要么就会超时。Pub/Sub 通过有效地为订阅者 2 保存该消息来管理此过程。

订阅者 2 开始侦听并接收在 sub_two 中等待它的消息

现在,您可以使用 Pub/Sub 进行开发了!

结果怎么样?

Pub/Sub 支持页面提供了其他资源和链接。

清理

  1. 停止所有正在运行的应用。
  2. 从本地环境中删除示例代码目录。
  3. 删除主题。

    gcloud pubsub topics delete hello_topic
  4. 删除订阅。

    gcloud pubsub subscriptions delete sub_one
    gcloud pubsub subscriptions delete sub_two
  5. 在 Google Cloud 控制台的 IAM 和管理部分关闭教程项目。

  6. 可选:撤消您创建的身份验证凭据,并删除本地凭据文件。

    gcloud auth application-default revoke
  7. 可选:从 gcloud CLI 撤消凭据。

    gcloud auth revoke

后续步骤

以下是您可以尝试的一些事项:

  • 检查教程中的 pub.pysub.py 代码,并浏览 GitHub 中的其他 Pub/Sub 示例。作为练习,请创建一个每秒发布一次本地时间的 pub.py 版本。

  • 了解如何批量处理消息

  • 使用推送订阅,接收触发 App Engine 端点Cloud Functions 函数的消息。

  • 使用重放检索以前确认的消息。 默认情况下,Pub/Sub 会从订阅中移除已确认的消息。例如,在本教程中,您将无法重新运行 sub.py 来再次接收“Hello, World!”消息。您可以使用重放功能设置订阅,以便在确认消息后接收这些消息。

  • 开始使用其他语言的客户端库。