Menerima pesan jenis skema Proto

Menerima pesan jenis skema buffer protokol, mengonversi data pesan menjadi objek class Proto yang dihasilkan, dan mengonfirmasi pesan.

Mempelajari lebih lanjut

Untuk dokumentasi mendetail yang menyertakan contoh kode ini, lihat artikel berikut:

Contoh kode

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ dalam Panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C++ Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
return [](pubsub::Subscriber subscriber) {
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        google::cloud::pubsub::samples::State state;
        state.ParseFromString(std::string{m.data()});
        std::cout << "Message contents: " << state.DebugString() << "\n";
        std::move(h).ack();
      });
  return session;
}

C#

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C# dalam Panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C# Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class PullProtoMessagesAsyncSample
{
    public async Task<int> PullProtoMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        int messageCount = 0;
        SubscriberClient subscriber = await new SubscriberClientBuilder
        {
            SubscriptionName = subscriptionName,
            Settings = new SubscriberClient.Settings
            {
                AckExtensionWindow = TimeSpan.FromSeconds(4),
                AckDeadline = TimeSpan.FromSeconds(10),
                FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
            }
        }.BuildAsync();
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string encoding = message.Attributes["googclient_schemaencoding"];
            Utilities.State state = null;
            switch (encoding)
            {
                case "BINARY":
                    state = Utilities.State.Parser.ParseFrom(message.Data.ToByteArray());
                    break;
                case "JSON":
                    state = Utilities.State.Parser.ParseJson(message.Data.ToStringUtf8());
                    break;
                default:
                    Console.WriteLine($"Encoding not provided in message.");
                    break;
            }
            Console.WriteLine($"Message {message.MessageId}: {state}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go dalam Panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Go Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"
	"sync"
	"time"

	"cloud.google.com/go/pubsub/v2"
	statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
)

func subscribeWithProtoSchema(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	// Create an instance of the message to be decoded (a single U.S. state).
	state := &statepb.State{}

	sub := client.Subscriber(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		encoding := msg.Attributes["googclient_schemaencoding"]

		if encoding == "BINARY" {
			if err := proto.Unmarshal(msg.Data, state); err != nil {
				fmt.Fprintf(w, "proto.Unmarshal err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Printf("Received a binary-encoded message:\n%#v\n", state)
		} else if encoding == "JSON" {
			if err := protojson.Unmarshal(msg.Data, state); err != nil {
				fmt.Fprintf(w, "proto.Unmarshal err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", state)
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state.Name, state.PostAbbr)
		msg.Ack()
	})
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java dalam Panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Java Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import utilities.StateProto.State;

public class SubscribeWithProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use an existing subscription.
    String subscriptionId = "your-subscription-id";

    subscribeWithProtoSchemaExample(projectId, subscriptionId);
  }

  public static void subscribeWithProtoSchemaExample(String projectId, String subscriptionId) {

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          ByteString data = message.getData();

          // Get the schema encoding type.
          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          block:
          try {
            switch (encoding) {
              case "BINARY":
                // Obtain an object of the generated proto class.
                State state = State.parseFrom(data);
                System.out.println("Received a BINARY-formatted message: " + state);
                break;

              case "JSON":
                State.Builder stateBuilder = State.newBuilder();
                JsonFormat.parser().merge(data.toStringUtf8(), stateBuilder);
                System.out.println("Received a JSON-formatted message:" + stateBuilder.build());
                break;

              default:
                break block;
            }
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }

          consumer.ack();
          System.out.println("Ack'ed the message");
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

    try {
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName);
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      subscriber.stopAsync();
    }
  }
}

PHP

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan PHP dalam Panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API PHP Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages using a protocol buffer schema.
 *
 * Relies on a proto message of the following form:
 * ```
 * syntax = "proto3";
 *
 * package utilities;
 *
 * message StateProto {
 *   string name = 1;
 *   string post_abbr = 2;
 * }
 * ```
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_proto_messages($projectId, $subscriptionId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        $decodedMessageData = '';
        $encoding = $message->attribute('googclient_schemaencoding');
        switch ($encoding) {
            case 'BINARY':
                $protobufMessage = new \Utilities\StateProto();
                $protobufMessage->mergeFromString($message->data());

                $decodedMessageData = $protobufMessage->serializeToJsonString();
                break;
            case 'JSON':
                $decodedMessageData = $message->data();
                break;
        }

        printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);
    }
}

Ruby

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby dalam Panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Ruby Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

listener = subscriber.listen do |received_message|
  encoding = received_message.attributes["googclient_schemaencoding"]
  case encoding
  when "BINARY"
    state = Utilities::StateProto.decode received_message.data
    puts "Received a binary-encoded message:\n#{state}"
  when "JSON"
    require "json"
    state = Utilities::StateProto.decode_json received_message.data
    puts "Received a JSON-encoded message:\n#{state}"
  else
    "Received a message with no encoding:\n#{received_message.message_id}"
  end
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

Langkah berikutnya

Untuk menelusuri dan memfilter contoh kode untuk produk Google Cloud lainnya, lihat browser contohGoogle Cloud .