C++ 客户端库中的后台线程

本指南介绍了 C++ 客户端库使用的线程模型,并展示了如何在应用中替换默认线程池。

目标

  • 说明 C++ 客户端库的默认线程处理模型。
  • 说明如何针对需要覆盖这些默认设置的应用覆盖这些设置。

为什么客户端库使用后台线程?

客户端库中的大多数函数都使用调用该函数的线程来完成所有工作,包括对服务的任何 RPC 和/或刷新用于身份验证的访问令牌。

异步函数本质上无法使用当前线程来完成其工作。某个单独的线程必须等待工作完成并处理响应。

在长时间运行的操作中阻塞调用线程也是一种浪费,因为服务可能需要几分钟或更长时间才能完成工作。对于此类操作,客户端库会使用后台线程定期轮询长时间运行的操作的状态。

哪些函数和库需要后台线程?

对于某种类型 T,返回 future<T> 的函数使用后台线程等待工作完成。

并非所有客户端库都具有异步函数或长时间运行的操作。 不需要它们的库不会创建任何后台线程。

您可能会注意到应用中存在其他线程,但这些线程可能是由 C++ 客户端库(例如 gRPC)的依赖项创建的。这些线程通常不太有趣,因为没有任何应用代码会在这些线程中运行,它们仅用于辅助功能。

这些后台线程对我的应用有何影响?

与应用的其余部分一样,这些线程会争用 CPU 和内存资源。如果需要,您可以创建自己的线程池,以便精细控制这些线程使用的任何资源。有关详细信息,请参见下文。

我的任何代码是否在这些线程中运行?

可以。将回调附加到 future<T> 时,回调几乎总是由某个后台线程执行。只有在您附加回调时 future<T> 已满足的情况下,才不会发生这种情况。在这种情况下,回调会在附加回调的线程的上下文中立即运行。

例如,假设某个应用使用 Pub/Sub 客户端库。Publish() 调用会返回一个 future,应用可以在执行一些工作后附加回调:

namespace pubsub = ::google::cloud::pubsub;
namespace g = google::cloud;

void Callback(g::future<g::StatusOr<std::string>>);

void F(pubsub::Publisher publisher) {
  auto my_future = publisher.Publish(
      pubsub::MessageBuilder("Hello World!").Build());
  // do some work.
  my_future.then(Callback);
}

如果在调用 .then() 函数之前满足 my_future,则立即调用回调。如果您想保证代码在单独的线程中运行,则需要使用自己的线程池,并在 .then() 中提供一个可调用对象,该对象将执行转发到您的线程池。

默认线程池

对于需要后台线程的库,Make*Connection() 会创建一个默认线程池。除非您替换线程池,否则每个 *Connection 对象都有一个单独的线程池。

大多数库中的默认线程池都包含单个线程。很少需要更多线程,因为后台线程用于轮询长时间运行的操作的状态。这些调用的生命周期相对较短,并且消耗的 CPU 资源非常少,因此单个后台线程可以处理数百个待处理的长时间运行的操作,而很少有应用的待处理操作数量达到如此之多。

其他异步操作可能需要更多资源。 如果需要,可以使用 GrpcBackgroundThreadPoolSizeOption 更改默认后台线程池大小。

Pub/Sub 库预计会有更多工作,因为 Pub/Sub 应用通常每秒接收或发送数千条消息。因此,在 64 位架构上,此库默认使用每个核心一个线程。在 32 位架构上(或在 32 位模式下编译时,即使在 64 位架构上运行)此默认值会更改为仅 4 个线程。

提供您自己的线程池

您可以为后台线程提供自己的线程池。创建 CompletionQueue 对象,将线程附加到该对象,并在初始化客户端时配置 GrpcCompletionQueueOption。例如:

namespace admin = ::google::cloud::spanner_admin;
namespace g = ::google::cloud;

void F() {
  // You will need to create threads
  auto cq = g::CompletionQueue();
  std::vector<std::jthread> threads;
  for (int i = 0; i != 10; ++i) {
    threads.emplace_back([](auto cq) { cq.Run(); }, cq);
  }
  auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
      g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
  // Use `client` as usual
}

您可以在多个客户端之间共享同一 CompletionQueue 对象,即使是针对不同的服务也是如此:

namespace admin = ::google::cloud::spanner_admin;
namespace pubsub = ::google::cloud::pubsub;
namespace g = ::google::cloud;

void F(pubsub::Topic const& topic1, pubsub::Topic const& topic2) {
  // You will need to create threads
  auto cq = g::CompletionQueue();
  std::vector<std::jthread> threads;
  for (int i = 0; i != 10; ++i) {
    threads.emplace_back([](auto cq) { cq.Run(); }, cq);
  }
  auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
      g::Options{}.set<g::GrpcCompletionQueue>(cq)));
  auto p1 = pubsub::Publisher(pubsub::MakePublisherConnection(
      topic1, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
  auto p2 = pubsub::Publisher(pubsub::MakePublisherConnection(
      topic2, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
  // Use `client`, `p1`, and `p2` as usual
}

后续步骤