本指南介绍了 C++ 客户端库使用的线程模型,并展示了如何在应用中替换默认线程池。
目标
- 说明 C++ 客户端库的默认线程处理模型。
- 说明如何针对需要覆盖这些默认设置的应用覆盖这些设置。
为什么客户端库使用后台线程?
客户端库中的大多数函数都使用调用该函数的线程来完成所有工作,包括对服务的任何 RPC 和/或刷新用于身份验证的访问令牌。
异步函数本质上无法使用当前线程来完成其工作。某个单独的线程必须等待工作完成并处理响应。
在长时间运行的操作中阻塞调用线程也是一种浪费,因为服务可能需要几分钟或更长时间才能完成工作。对于此类操作,客户端库会使用后台线程定期轮询长时间运行的操作的状态。
哪些函数和库需要后台线程?
对于某种类型 T,返回 future<T> 的函数使用后台线程等待工作完成。
并非所有客户端库都具有异步函数或长时间运行的操作。 不需要它们的库不会创建任何后台线程。
您可能会注意到应用中存在其他线程,但这些线程可能是由 C++ 客户端库(例如 gRPC)的依赖项创建的。这些线程通常不太有趣,因为没有任何应用代码会在这些线程中运行,它们仅用于辅助功能。
这些后台线程对我的应用有何影响?
与应用的其余部分一样,这些线程会争用 CPU 和内存资源。如果需要,您可以创建自己的线程池,以便精细控制这些线程使用的任何资源。有关详细信息,请参见下文。
我的任何代码是否在这些线程中运行?
可以。将回调附加到 future<T> 时,回调几乎总是由某个后台线程执行。只有在您附加回调时 future<T> 已满足的情况下,才不会发生这种情况。在这种情况下,回调会在附加回调的线程的上下文中立即运行。
例如,假设某个应用使用 Pub/Sub 客户端库。Publish() 调用会返回一个 future,应用可以在执行一些工作后附加回调:
namespace pubsub = ::google::cloud::pubsub;
namespace g = google::cloud;
void Callback(g::future<g::StatusOr<std::string>>);
void F(pubsub::Publisher publisher) {
auto my_future = publisher.Publish(
pubsub::MessageBuilder("Hello World!").Build());
// do some work.
my_future.then(Callback);
}
如果在调用 .then() 函数之前满足 my_future,则立即调用回调。如果您想保证代码在单独的线程中运行,则需要使用自己的线程池,并在 .then() 中提供一个可调用对象,该对象将执行转发到您的线程池。
默认线程池
对于需要后台线程的库,Make*Connection() 会创建一个默认线程池。除非您替换线程池,否则每个 *Connection 对象都有一个单独的线程池。
大多数库中的默认线程池都包含单个线程。很少需要更多线程,因为后台线程用于轮询长时间运行的操作的状态。这些调用的生命周期相对较短,并且消耗的 CPU 资源非常少,因此单个后台线程可以处理数百个待处理的长时间运行的操作,而很少有应用的待处理操作数量达到如此之多。
其他异步操作可能需要更多资源。
如果需要,可以使用 GrpcBackgroundThreadPoolSizeOption 更改默认后台线程池大小。
Pub/Sub 库预计会有更多工作,因为 Pub/Sub 应用通常每秒接收或发送数千条消息。因此,在 64 位架构上,此库默认使用每个核心一个线程。在 32 位架构上(或在 32 位模式下编译时,即使在 64 位架构上运行)此默认值会更改为仅 4 个线程。
提供您自己的线程池
您可以为后台线程提供自己的线程池。创建 CompletionQueue 对象,将线程附加到该对象,并在初始化客户端时配置 GrpcCompletionQueueOption。例如:
namespace admin = ::google::cloud::spanner_admin;
namespace g = ::google::cloud;
void F() {
// You will need to create threads
auto cq = g::CompletionQueue();
std::vector<std::jthread> threads;
for (int i = 0; i != 10; ++i) {
threads.emplace_back([](auto cq) { cq.Run(); }, cq);
}
auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
// Use `client` as usual
}
您可以在多个客户端之间共享同一 CompletionQueue 对象,即使是针对不同的服务也是如此:
namespace admin = ::google::cloud::spanner_admin;
namespace pubsub = ::google::cloud::pubsub;
namespace g = ::google::cloud;
void F(pubsub::Topic const& topic1, pubsub::Topic const& topic2) {
// You will need to create threads
auto cq = g::CompletionQueue();
std::vector<std::jthread> threads;
for (int i = 0; i != 10; ++i) {
threads.emplace_back([](auto cq) { cq.Run(); }, cq);
}
auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
g::Options{}.set<g::GrpcCompletionQueue>(cq)));
auto p1 = pubsub::Publisher(pubsub::MakePublisherConnection(
topic1, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
auto p2 = pubsub::Publisher(pubsub::MakePublisherConnection(
topic2, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
// Use `client`, `p1`, and `p2` as usual
}
后续步骤
- 如需详细了解常见的库配置选项,请参阅客户端库配置。
- 如需查看完整的 Pub/Sub 客户端库参考文档,请参阅 Cloud Pub/Sub C++ 客户端库。