Memublikasikan pesan dengan setelan kontrol alur

Membuat klien penayang dengan setelan kontrol alur kustom dan menggunakannya untuk memublikasikan beberapa pesan.

Mempelajari lebih lanjut

Untuk dokumentasi mendetail yang menyertakan contoh kode ini, lihat artikel berikut:

Contoh kode

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ dalam Panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C++ Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Configure the publisher to block if either (1) 100 or more messages, or
  // (2) messages with 100MiB worth of data have not been acknowledged by the
  // service. By default the publisher never blocks, and its capacity is only
  // limited by the system's memory.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      Options{}
          .set<pubsub::MaxPendingMessagesOption>(100)
          .set<pubsub::MaxPendingBytesOption>(100 * 1024 * 1024L)
          .set<pubsub::FullPublisherActionOption>(
              pubsub::FullPublisherAction::kBlocks)));

  std::vector<future<void>> ids;
  for (char const* data : {"a", "b", "c"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go dalam Panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Go Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub/v2"
)

func publishWithFlowControlSettings(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	// Reuse this publisher for all publish calls to send messages in batches.
	publisher := client.Publisher(topicID)
	publisher.PublishSettings.FlowControlSettings = pubsub.FlowControlSettings{
		MaxOutstandingMessages: 100,                     // default 1000
		MaxOutstandingBytes:    10 * 1024 * 1024,        // default 0 (unlimited)
		LimitExceededBehavior:  pubsub.FlowControlBlock, // default Ignore, other options: Block and SignalError
	}

	var wg sync.WaitGroup
	var totalErrors uint64

	numMsgs := 1000
	// Rapidly publishing 1000 messages in a loop may be constrained by flow control.
	for i := 0; i < numMsgs; i++ {
		wg.Add(1)
		result := publisher.Publish(ctx, &pubsub.Message{
			Data: []byte("message #" + strconv.Itoa(i)),
		})
		go func(i int, res *pubsub.PublishResult) {
			fmt.Fprintf(w, "Publishing message %d\n", i)
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			_, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, numMsgs)
	}
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java dalam Panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Java Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithFlowControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithFlowControlExample(projectId, topicId);
  }

  public static void publishWithFlowControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Configure how many messages the publisher client can hold in memory
      // and what to do when messages exceed the limit.
      FlowControlSettings flowControlSettings =
          FlowControlSettings.newBuilder()
              // Block more messages from being published when the limit is reached. The other
              // options are Ignore (or continue publishing) and ThrowException (or error out).
              .setLimitExceededBehavior(LimitExceededBehavior.Block)
              .setMaxOutstandingRequestBytes(10 * 1024 * 1024L) // 10 MiB
              .setMaxOutstandingElementCount(100L) // 100 messages
              .build();

      // By default, messages are not batched.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder().setFlowControlSettings(flowControlSettings).build();

      publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

      // Publish 1000 messages in quick succession may be constrained by publisher flow control.
      for (int i = 0; i < 1000; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println(
          "Published " + messageIds.size() + " messages with flow control settings.");

      if (publisher != null) {
        // When finished with the publisher, shut down to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby dalam Panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Ruby Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id, async: {
  # Configure how many messages the publisher client can hold in memory
  # and what to do when messages exceed the limit.
  flow_control: {
    message_limit: 100,
    byte_limit: 10 * 1024 * 1024, # 10 MiB
    # Block more messages from being published when the limit is reached. The
    # other options are :ignore and :error.
    limit_exceeded_behavior: :block
  }
}

# Rapidly publishing 1000 messages in a loop may be constrained by flow
# control.
1000.times do |i|
  publisher.publish_async "message #{i}" do |result|
    raise "Failed to publish the message." unless result.succeeded?
  end
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!
puts "Published messages with flow control settings to #{topic_id}."

Langkah berikutnya

Untuk menelusuri dan memfilter contoh kode untuk produk Google Cloud lainnya, lihat browser contohGoogle Cloud .