このガイドでは、C++ クライアント ライブラリで使用されるスレッド モデルについて説明し、アプリケーションでデフォルトのスレッド プールをオーバーライドする方法を示します。
目標
- C++ クライアント ライブラリのデフォルトのスレッド モデルについて説明します。
- 必要に応じて、これらのデフォルトをオーバーライドする方法について説明します。
クライアント ライブラリがバックグラウンド スレッドを使用するのはなぜですか?
クライアント ライブラリのほとんどの関数は、関数を呼び出すスレッドを使用して、サービスへの RPC や認証用のアクセス トークンの更新など、すべての処理を完了します。
非同期関数は、その性質上、現在のスレッドを使用して処理を完了することはできません。別のスレッドが作業の完了を待って、レスポンスを処理する必要があります。
また、長時間実行されるオペレーションでは呼び出し元のスレッドをブロックするのも無駄です。この場合、サービスがオペレーションを完了するまでに数分かかる場合があります。このようなオペレーションの場合、クライアント ライブラリはバックグラウンド スレッドを使用して、長時間実行オペレーションの状態を定期的にポーリングします。
バックグラウンド スレッドを必要とする関数とライブラリ
型 T の future<T> を返す関数は、バックグラウンド スレッドを使用して、作業が完了するまで待機します。
すべてのクライアント ライブラリに非同期関数や長時間実行オペレーションがあるわけではありません。これらを必要としないライブラリは、バックグラウンド スレッドを作成しません。
アプリケーション内で追加のスレッドが見つかることもありますが、これらは、gRPC など C++ クライアント ライブラリの依存関係によって作成されることがあります。これらのスレッドは、アプリケーション コードが実行されず、補助関数のみを処理するため、通常はあまり重要ではありません。
これらのバックグラウンド スレッドはアプリにどのような影響を与えますか?
通常、これらのスレッドはアプリの残りの部分と CPU リソースとメモリリソースを競合します。必要に応じて、独自のスレッドプールを作成して、これらのスレッドが使用するリソースをきめ細かく制御できます。詳しくは、以下をご覧ください。
これらのいずれかのスレッドでコードが実行されることはありますか?
はい。コールバックを future<T> にアタッチすると、コールバックはほとんどの場合、バックグラウンド スレッドのいずれかによって実行されます。コールバックをアタッチした時点で future<T> がすでに満たされている場合を除き、この処理は必ず行われます。その場合、コールバックはコールバックをアタッチするスレッドのコンテキストで直ちに実行されます。
たとえば、Pub/Sub クライアント ライブラリを使用するアプリケーションを考えてみましょう。
Publish() 呼び出しは Future を返し、アプリケーションはいくつかの処理を行った後にコールバックをアタッチできます。
namespace pubsub = ::google::cloud::pubsub;
namespace g = google::cloud;
void Callback(g::future<g::StatusOr<std::string>>);
void F(pubsub::Publisher publisher) {
auto my_future = publisher.Publish(
pubsub::MessageBuilder("Hello World!").Build());
// do some work.
my_future.then(Callback);
}
.then() 関数が呼び出される前に my_future が満たされると、コールバックが直ちに呼び出されます。コードを別のスレッドで確実に実行するようにするには、独自のスレッドプールを使用し、.then() に呼び出し可能を指定して、実行をスレッドプールに転送します。
デフォルトのスレッドプール
バックグラウンド スレッドを必要とするライブラリの場合、Make*Connection() はデフォルトのスレッド プールを作成します。スレッド プールをオーバーライドしない限り、各 *Connection オブジェクトには個別のスレッド プールがあります。
ほとんどのライブラリのデフォルトのスレッドプールには、単一のスレッドが含まれています。バックグラウンド スレッドは長時間実行オペレーションの状態をポーリングするために使用されるため、スレッドを追加する必要はほとんどありません。これらの呼び出しは適度に存続時間が短く、CPU をほとんど消費しません。そのため、1 つのバックグラウンド スレッドで保留の数百の長時間実行オペレーションを処理できます。それほど多くのオペレーションを持つアプリケーションはあまりありません。
他の非同期オペレーションでは、より多くのリソースが必要になる場合があります。
必要に応じて、GrpcBackgroundThreadPoolSizeOption を使用してデフォルトのバックグラウンド スレッド プールのサイズを変更します。
Pub/Sub アプリケーションは 1 秒あたり数千件のメッセージを受信または送信するのが一般的であるため、Pub/Sub ライブラリはより多くの作業を想定しています。そのため、このライブラリは 64 ビット アーキテクチャでコアごとに 1 つのスレッドをデフォルトで使用します。32 ビット アーキテクチャ(または 64 ビット アーキテクチャで実行されている場合でも 32 ビットモードでコンパイルされている場合)では、このデフォルトは 4 スレッドのみに変更されます。
独自のスレッドプールを指定する
バックグラウンド スレッド用に独自のスレッドプールを指定できます。CompletionQueue オブジェクトを作成し、スレッドをアタッチして、クライアントの初期化時に GrpcCompletionQueueOption を構成します。次に例を示します。
namespace admin = ::google::cloud::spanner_admin;
namespace g = ::google::cloud;
void F() {
// You will need to create threads
auto cq = g::CompletionQueue();
std::vector<std::jthread> threads;
for (int i = 0; i != 10; ++i) {
threads.emplace_back([](auto cq) { cq.Run(); }, cq);
}
auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
// Use `client` as usual
}
異なるサービス間でも、同じ CompletionQueue オブジェクトを複数のクライアントで共有できます。
namespace admin = ::google::cloud::spanner_admin;
namespace pubsub = ::google::cloud::pubsub;
namespace g = ::google::cloud;
void F(pubsub::Topic const& topic1, pubsub::Topic const& topic2) {
// You will need to create threads
auto cq = g::CompletionQueue();
std::vector<std::jthread> threads;
for (int i = 0; i != 10; ++i) {
threads.emplace_back([](auto cq) { cq.Run(); }, cq);
}
auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
g::Options{}.set<g::GrpcCompletionQueue>(cq)));
auto p1 = pubsub::Publisher(pubsub::MakePublisherConnection(
topic1, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
auto p2 = pubsub::Publisher(pubsub::MakePublisherConnection(
topic2, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
// Use `client`, `p1`, and `p2` as usual
}
次のステップ
- 一般的なライブラリ構成オプションの詳細については、クライアント ライブラリの構成をご覧ください。
- Pub/Sub クライアント ライブラリのリファレンス ドキュメントの完全版については、Cloud Pub/Sub C++ クライアント ライブラリをご覧ください。