Mit Gleichzeitigkeitssteuerung mehr Nachrichten verarbeiten

Die Nebenläufigkeitserkennung ist eine verfügbare Funktion in der Pub/Sub Clientbibliothek auf hoher Ebene. Sie können auch Ihre eigene Nebenläufigkeitserkennung implementieren, wenn Sie eine Bibliothek auf niedriger Ebene verwenden.

Die Unterstützung für die Nebenläufigkeitserkennung hängt von der Programmiersprache der Clientbibliothek ab. Bei Sprachimplementierungen, die parallele Threads wie C++, Go und Java unterstützen, legen die Clientbibliotheken die Anzahl der Threads jeweils anhand ihrer Standardeinstellung fest.

Diese Wahl ist möglicherweise nicht optimal für Ihre Anwendung. Wenn Ihre Abonnentenanwendung beispielsweise nicht mit dem eingehenden Nachrichtenvolumen mithält und nicht CPU-gebunden ist, müssen Sie die Anzahl der Threads erhöhen. Bei CPU-intensiver Nachrichtenverarbeitung kann es sinnvoll sein, die Anzahl der Threads zu verringern.

Auf dieser Seite wird das Konzept der Nebenläufigkeitserkennung erläutert und beschrieben, wie Sie die Funktion für Ihre Abonnentenclients einrichten. Informationen zum Konfigurieren Ihrer Publisher-Clients für die Nebenläufigkeitserkennung finden Sie unter Nebenläufigkeitserkennung.

Konfigurationen für die Nebenläufigkeitserkennung

Die Standardwerte für die Variablen der Nebenläufigkeitserkennung und die Namen der Variablen können sich zwischen den Clientbibliotheken unterscheiden. Weitere Informationen finden Sie in der API-Referenzdokumentation. In der Java-Clientbibliothek sind beispielsweise die Methoden zum Konfigurieren der Nebenläufigkeitserkennung setParallelPullCount(), setExecutorProvider(), setSystemExecutorProvider() und setChannelProvider().

  • Mit setParallelPullCount() können Sie festlegen, wie viele Streams geöffnet werden sollen. Sie können mehr Streams öffnen, wenn Ihr Abonnentenclient mehr Daten verarbeiten kann als die 10 MB/s, die über einen einzelnen Stream gesendet werden.

  • Mit setExecutorProvider() können Sie den Executor-Anbieter anpassen, der für die Verarbeitung von Nachrichten verwendet wird. Sie können den Executor-Anbieter beispielsweise in einen ändern, der einen einzelnen, gemeinsam genutzten Executor mit einer begrenzten Anzahl von Threads für mehrere Abonnentenclients zurückgibt. Diese Konfiguration trägt dazu bei, die Anzahl der erstellten Threads zu begrenzen. Die Gesamtzahl der Threads, die für die Nebenläufigkeitserkennung verwendet werden, hängt vom Executor-Bereitsteller ab, der in der Clientbibliothek übergeben wird, und von der Anzahl der parallelen Pull-Vorgänge.

  • setSystemExecutorProvider() ermöglicht Ihnen die Anpassung des Executor-Anbieters, der für die Lease-Verwaltung verwendet wird. Normalerweise konfigurieren Sie diesen Wert nur, wenn Sie denselben Executor-Anbieter in setExecutorProvider und setSystemExecutorProvider verwenden möchten. Sie können beispielsweise denselben Executor-Anbieter verwenden, wenn Sie eine Reihe von Abos mit geringem Durchsatz haben. Wenn Sie denselben Wert verwenden, wird die Anzahl der Threads im Client begrenzt.

  • Mit setChannelProvider() können Sie den Channel-Anbieter anpassen, der zum Öffnen von Verbindungen zu Pub/Sub verwendet wird. Normalerweise konfigurieren Sie diesen Wert nur, wenn Sie denselben Channel für mehrere Abonnentenclients verwenden möchten. Wenn Sie einen Channel für zu viele Clients wiederverwenden, kann dies zu GOAWAY- oder ENHANCE_YOUR_CALM-Fehlern führen. Wenn diese Fehler in den Logs Ihrer Anwendung oder in Cloud Loggingangezeigt werden, erstellen Sie weitere Channels.

Codebeispiele für die Nebenläufigkeitserkennung

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Pub/Sub C++ API-Referenzdokumentation.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

Go

Im folgenden Beispiel wird die Hauptversion der Go Pub/Sub-Clientbibliothek (Version 2) verwendet. Wenn Sie noch die Version 1-Bibliothek verwenden, siehe den Migrationsleitfaden zu Version 2. Eine Liste der Codebeispiele für Version 1 finden Sie unter Veraltete Codebeispiele.

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	// NumGoroutines determines the number of streams sub.Receive will spawn to pull
	// messages. It is recommended to set this to 1, unless your throughput
	// is greater than 10 MB/s, as even having 1 stream can still result in
	// messages being handled asynchronously.
	sub.ReceiveSettings.NumGoroutines = 1
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

Im folgenden Beispiel wird die Ruby Pub/Sub-Clientbibliothek Version 3 verwendet. Wenn Sie noch die Version 2-Bibliothek verwenden, lesen Sie den Migrationsleitfaden zu Version 3. Eine Liste der Codebeispiele für Ruby Version 2 finden Sie unter Veraltete Codebeispiele.

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Ruby API.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
listener = subscriber.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

Nächste Schritte

Weitere Informationen zu den anderen Übermittlungsoptionen, die Sie für ein Abo konfigurieren können: