עיבוד של יותר הודעות באמצעות בקרת בו-זמניות

בקרת בו-זמניות היא תכונה זמינה בספריית הלקוח ברמה גבוהה של Pub/Sub. אפשר גם להטמיע בקרת בו-זמניות משלכם כשמשתמשים בספרייה ברמה נמוכה.

התמיכה בבקרת בו-זמניות תלויה בשפת התכנות של ספריית הלקוח. בשפות תכנות שתומכות בשרשורים מקבילים, כמו C++‎,‏ Go ו-Java, ספריות הלקוח בוחרות כברירת מחדל את מספר השרשורים.

יכול להיות שהאפשרות הזו לא תהיה אופטימלית לאפליקציה שלכם. לדוגמה, אם אפליקציית המנוי לא עומדת בקצב של נפח ההודעות הנכנסות והיא לא מוגבלת על ידי CPU, צריך להגדיל את מספר השרשורים. במקרים של פעולות עיבוד הודעות שדורשות הרבה משאבי CPU, יכול להיות שכדאי להקטין את מספר השרשורים.

בדף הזה מוסבר על הרעיון של בקרת בו-זמניות ואיך מגדירים את התכונה הזו ללקוחות המנויים. כדי להגדיר את לקוחות המוציא לאור שלכם לבקרת בו-זמניות, אפשר לעיין במאמר בנושא בקרת בו-זמניות.

הגדרות של בקרת בו-זמניות

ערכי ברירת המחדל של משתני בקרת בו-זמניות והשמות של המשתנים עשויים להיות שונים בספריות לקוח שונות. מידע נוסף זמין במאמרי העזרה של ה-API. לדוגמה, בספריית הלקוח של Java, השיטות להגדרת בקרת בו-זמניות הן setParallelPullCount(),‏ setExecutorProvider(),‏ setSystemExecutorProvider() ו-setChannelProvider().

  • הפונקציה setParallelPullCount() מאפשרת לכם להחליט כמה שידורים לפתוח. אפשר לפתוח עוד מקורות נתונים אם לקוח המנוי יכול לעבד יותר נתונים מאלה שנשלחים במקור נתונים יחיד, שהוא 10MBps.

  • השיטה setExecutorProvider() מאפשרת להתאים אישית את ספק הביצוע שמשמש לעיבוד הודעות. לדוגמה, אפשר לשנות את ספק ה-executor לספק שמחזיר executor יחיד ומשותף עם מספר מוגבל של threads בכמה לקוחות של מנויים. ההגדרה הזו עוזרת להגביל את מספר השרשורים שנוצרים. המספר הכולל של השרשורים שמשמשים לבקרה על פעולות מקבילות תלוי בספק של מנהל השרשורים שמועבר בספריית הלקוח ובמספר המשיכות המקבילות.

  • setSystemExecutorProvider() מאפשרת להתאים אישית את ספק שירותי הביצוע שמשמש לניהול חוזי ליסינג. בדרך כלל לא מגדירים את הערך הזה, אלא אם רוצים להשתמש באותו ספק של מנהל ביצוע ב-setExecutorProvider וב-setSystemExecutorProvider. לדוגמה, אפשר להשתמש באותו ספק של מפעיל אם יש לכם מספר מינויים עם תפוקה נמוכה. שימוש באותו ערך מגביל את מספר השרשורים בלקוח.

  • setChannelProvider() מאפשרת להתאים אישית את ספק הערוץ שמשמש לפתיחת חיבורים ל-Pub/Sub. בדרך כלל לא מגדירים את הערך הזה, אלא אם רוצים להשתמש באותו ערוץ בכמה לקוחות של מינויים. שימוש חוזר בערוץ ביותר מדי לקוחות עלול לגרום לשגיאות GOAWAY או ENHANCE_YOUR_CALM. אם השגיאות האלה מופיעות ביומנים של האפליקציה או ביומני Cloud, צריך ליצור עוד ערוצים.

דוגמאות קוד לבקרת בו-זמניות

C++‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C++‎ במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה של Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	// NumGoroutines determines the number of streams sub.Receive will spawn to pull
	// messages. It is recommended to set this to 1, unless your throughput
	// is greater than 10 MB/s, as even having 1 stream can still result in
	// messages being handled asynchronously.
	sub.ReceiveSettings.NumGoroutines = 1
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התחלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

בדוגמה הבאה נעשה שימוש בספריית הלקוח של Ruby Pub/Sub בגרסה 3. אם אתם עדיין משתמשים בספרייה v2, כדאי לעיין במדריך להעברה לגרסה v3. כדי לראות רשימה של דוגמאות קוד של Ruby v2, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Ruby במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Ruby API.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
listener = subscriber.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

המאמרים הבאים

מידע נוסף על אפשרויות משלוח אחרות שאפשר להגדיר למינוי: