C++ 用戶端程式庫中的背景執行緒

本指南說明 C++ 用戶端程式庫使用的執行緒模型,並示範如何在應用程式中覆寫預設執行緒集區。

目標

  • 說明 C++ 用戶端程式庫的預設執行緒模型。
  • 說明如何為需要覆寫這些預設值的應用程式進行覆寫。

為什麼用戶端程式庫會使用背景執行緒?

用戶端程式庫中的大多數函式都會使用呼叫函式的執行緒,完成所有工作,包括對服務執行任何 RPC,以及/或重新整理存取權杖以進行驗證。

非同步函式本質上無法使用目前的執行緒完成工作。某些獨立執行緒必須等待工作完成並處理回應。

如果長時間執行的作業需要數分鐘以上才能完成,封鎖呼叫執行緒也會造成浪費。對於這類作業,用戶端程式庫會使用背景執行緒,定期輪詢長時間執行的作業狀態。

哪些函式和程式庫需要背景執行緒?

針對某些型別 T 傳回 future<T> 的函式會使用背景執行緒,等待工作完成。

並非所有用戶端程式庫都有非同步函式或長時間執行的作業。 不需要的程式庫不會建立任何背景執行緒。

您可能會在應用程式中發現其他執行緒,但這些執行緒可能是由 C++ 用戶端程式庫的依附元件 (例如 gRPC) 所建立。這些執行緒通常較不有趣,因為應用程式程式碼不會在這些執行緒中執行,且這些執行緒只提供輔助功能。

這些背景執行緒會對我的應用程式造成什麼影響?

與平常一樣,這些執行緒會與應用程式的其餘部分競爭 CPU 和記憶體資源。如有需要,您可以建立自己的執行緒集區,精細控管這些執行緒使用的任何資源。詳情請見以下說明。

我的程式碼是否會在任何這些執行緒中執行?

可以。將回呼附加至 future<T> 時,回呼幾乎一律會由其中一個背景執行緒執行。只有在您附加回呼時,future<T> 已滿足條件的情況下,才不會發生這種情形。在這種情況下,回呼會立即執行,並附加至回呼的執行緒。

舉例來說,假設應用程式使用 Pub/Sub 用戶端程式庫。 Publish() 呼叫會傳回 Future,應用程式可以在執行某些工作後附加回呼:

namespace pubsub = ::google::cloud::pubsub;
namespace g = google::cloud;

void Callback(g::future<g::StatusOr<std::string>>);

void F(pubsub::Publisher publisher) {
  auto my_future = publisher.Publish(
      pubsub::MessageBuilder("Hello World!").Build());
  // do some work.
  my_future.then(Callback);
}

如果在呼叫 .then() 函式前滿足 my_future,系統會立即叫用回呼。如要確保程式碼在個別執行緒中執行,您必須使用自己的執行緒集區,並在 .then() 中提供可呼叫的項目,將執行作業轉送至執行緒集區。

預設執行緒集區

對於需要背景執行緒的程式庫,Make*Connection() 會建立預設執行緒集區。除非您覆寫執行緒集區,否則每個 *Connection 物件都有專屬的執行緒集區。

大多數程式庫中的預設執行緒集區都只包含單一執行緒。很少需要更多執行緒,因為背景執行緒用於輪詢長時間執行的作業狀態。這些呼叫的生命週期相當短,且 CPU 用量極低,因此單一背景執行緒可以處理數百個待處理的長時間執行作業,而很少有應用程式會執行這麼多作業。

其他非同步作業可能需要更多資源。 如有需要,請使用 GrpcBackgroundThreadPoolSizeOption 變更預設背景執行緒集區大小。

Pub/Sub 程式庫預期會有更多工作,因為 Pub/Sub 應用程式通常每秒會接收或傳送數千則訊息。因此,這個程式庫在 64 位元架構上預設為每個核心一個執行緒。在 32 位元架構上 (或以 32 位元模式編譯時,即使是在 64 位元架構上執行),這項預設值只會變更為 4 個執行緒。

提供自己的執行緒集區

您可以為背景執行緒提供自己的執行緒集區。建立 CompletionQueue 物件、將執行緒附加至該物件,並在初始化用戶端時設定 GrpcCompletionQueueOption。例如:

namespace admin = ::google::cloud::spanner_admin;
namespace g = ::google::cloud;

void F() {
  // You will need to create threads
  auto cq = g::CompletionQueue();
  std::vector<std::jthread> threads;
  for (int i = 0; i != 10; ++i) {
    threads.emplace_back([](auto cq) { cq.Run(); }, cq);
  }
  auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
      g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
  // Use `client` as usual
}

您可以在多個用戶端之間共用同一個 CompletionQueue 物件,即使是不同服務也適用:

namespace admin = ::google::cloud::spanner_admin;
namespace pubsub = ::google::cloud::pubsub;
namespace g = ::google::cloud;

void F(pubsub::Topic const& topic1, pubsub::Topic const& topic2) {
  // You will need to create threads
  auto cq = g::CompletionQueue();
  std::vector<std::jthread> threads;
  for (int i = 0; i != 10; ++i) {
    threads.emplace_back([](auto cq) { cq.Run(); }, cq);
  }
  auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
      g::Options{}.set<g::GrpcCompletionQueue>(cq)));
  auto p1 = pubsub::Publisher(pubsub::MakePublisherConnection(
      topic1, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
  auto p2 = pubsub::Publisher(pubsub::MakePublisherConnection(
      topic2, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
  // Use `client`, `p1`, and `p2` as usual
}

後續步驟