בקרת בו-זמניות

במאמר הזה מוסבר איך משתמשים בבקרת בו-זמניות עם הודעות שמתפרסמות בנושא.

בקרת בו-זמניות עוזרת לכם לשנות את מספר ברירת המחדל של השרשורים ברקע (קלט/פלט) שספריית הלקוח משתמשת בהם כדי לפרסם הודעות. כך לקוחות המוציאים לאור יכולים לשלוח הודעות במקביל.

בקרת בו-זמניות היא תכונה זמינה בספריית הלקוח ברמה גבוהה של Pub/Sub. אפשר גם להטמיע בקרת בו-זמניות משלכם כשמשתמשים בספרייה ברמה נמוכה.

התמיכה בבקרת בו-זמניות תלויה בשפת התכנות של ספריית הלקוח. בשפות תכנות שתומכות בשרשורים מקבילים, כמו C++,‏ Go ו-Java, ספריות הלקוח בוחרות כברירת מחדל את מספר השרשורים.

בדף הזה מוסבר על הרעיון של בקרת בו-זמניות ואיך מגדירים את התכונה הזו ללקוחות שלכם שהם בעלי אתרים. כדי להגדיר את לקוחות האפליקציות הרשומות שלכם לבקרת בו-זמניות, אפשר לעיין במאמר בנושא עיבוד של יותר הודעות באמצעות בקרת בו-זמניות.

לפני שמתחילים

לפני שמגדירים את תהליך העבודה של הפרסום, צריך לוודא שביצעתם את המשימות הבאות:

התפקידים הנדרשים

כדי לקבל את ההרשאות שנדרשות לפרסום הודעות בנושא, צריך לבקש מהאדמין להקצות לכם ב-IAM את התפקיד פרסום הודעות ב-Pub/Sub (roles/pubsub.publisher) בנושא. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש.

כדי ליצור או לעדכן נושאים ומינויים, צריך הרשאות נוספות.

הגדרות של בקרת בו-זמניות

ערכי ברירת המחדל של משתני בקרת בו-זמניות והשמות של המשתנים עשויים להיות שונים בספריות לקוח שונות. לדוגמה, בספריית הלקוח של Java, השיטות להגדרת בקרת בו-זמניות הן setExecutorProvider() ו-setChannelProvider(). מידע נוסף מופיע במאמרי העזרה של ה-API.

  • setExecutorProvider() מאפשרת להתאים אישית את ספק הביצוע שמשמש לעיבוד תשובות לפרסום. לדוגמה, אפשר לשנות את ספק ה-executor לספק שמחזיר executor יחיד ומשותף עם מספר מוגבל של threads בכמה לקוחות של בעלי תוכן דיגיטלי. ההגדרה הזו עוזרת להגביל את מספר השרשורים שנוצרים.

  • setChannelProvider() מאפשרת להתאים אישית את ספק הערוץ שמשמש לפתיחת חיבורים ל-Pub/Sub. בדרך כלל לא מגדירים את הערך הזה, אלא אם רוצים להשתמש באותו ערוץ בכמה לקוחות של בעלי תוכן דיגיטלי. שימוש חוזר בערוץ ביותר מדי לקוחות עלול לגרום לשגיאות GOAWAY או ENHANCE_YOUR_CALM. אם השגיאות האלה מופיעות ביומנים של האפליקציה או ביומני Cloud, צריך ליצור עוד ערוצים.

דוגמאות קוד לבקרת בו-זמניות

C++‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C++‎ במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה של Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	publisher := client.Publisher(topicID)
	publisher.PublishSettings.NumGoroutines = 1

	result := publisher.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התחלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

בדוגמה הבאה נעשה שימוש בספריית הלקוח של Ruby Pub/Sub בגרסה 3. אם אתם עדיין משתמשים בספרייה v2, כדאי לעיין במדריך להעברה לגרסה v3. כדי לראות רשימה של דוגמאות קוד של Ruby v2, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Ruby במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Ruby API.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}

publisher.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!

המאמרים הבאים