接收可能適用於不同結構定義修訂版本的訊息

接收可能適用於不同結構定義修訂版本的訊息

深入探索

如需包含這個程式碼範例的詳細說明文件,請參閱下列文章:

程式碼範例

C++

在試用這個範例之前,請先按照「使用用戶端程式庫的 Pub/Sub 快速入門導覽課程」中的 C++ 設定說明操作。詳情請參閱 Pub/Sub C++ API 參考文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
    pubsub::Subscription(project_id, subscription_id)));

// Create a schema client.
auto schema_client =
    pubsub::SchemaServiceClient(pubsub::MakeSchemaServiceConnection());

// Read the reader schema. This is the schema you want the messages to be
// evaluated using.
std::ifstream ifs(avro_file);
avro::ValidSchema reader_schema;
avro::compileJsonSchema(ifs, reader_schema);

std::unordered_map<std::string, avro::ValidSchema> revisions_to_schemas;
auto session = subscriber.Subscribe(
    [&](pubsub::Message const& message, pubsub::AckHandler h) {
      // Get the reader schema revision for the message.
      auto schema_name = message.attributes()["googclient_schemaname"];
      auto schema_revision_id =
          message.attributes()["googclient_schemarevisionid"];
      // If we haven't received a message with this schema, look it up.
      if (revisions_to_schemas.find(schema_revision_id) ==
          revisions_to_schemas.end()) {
        auto schema_path = schema_name + "@" + schema_revision_id;
        // Use the schema client to get the path.
        auto schema = schema_client.GetSchema(schema_path);
        if (!schema) {
          std::cout << "Schema not found:" << schema_path << "\n";
          return;
        }
        avro::ValidSchema writer_schema;
        std::stringstream in;
        in << schema.value().definition();
        avro::compileJsonSchema(in, writer_schema);
        revisions_to_schemas[schema_revision_id] = writer_schema;
      }
      auto writer_schema = revisions_to_schemas[schema_revision_id];

      auto encoding = message.attributes()["googclient_schemaencoding"];
      if (encoding == "JSON") {
        std::stringstream in;
        in << message.data();
        auto avro_in = avro::istreamInputStream(in);
        avro::DecoderPtr decoder = avro::resolvingDecoder(
            writer_schema, reader_schema, avro::jsonDecoder(writer_schema));
        decoder->init(*avro_in);

        v2::State state;
        avro::decode(*decoder, state);
        std::cout << "Name: " << state.name << "\n";
        std::cout << "Postal Abbreviation: " << state.post_abbr << "\n";
        std::cout << "Population: " << state.population << "\n";
      } else {
        std::cout << "Unable to decode. Received message using encoding"
                  << encoding << "\n";
      }
      std::move(h).ack();
    });

C#

在試用這個範例之前,請先按照「使用用戶端程式庫的 Pub/Sub 快速入門導覽課程」中的 C# 設定說明操作。詳情請參閱 Pub/Sub C# API 參考文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

using Avro.Generic;
using Avro.IO;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public class SubscribeAvroRecordsWithRevisionsSample
{
    public async Task<(int, int)> SubscribeAvroRecordsWithRevisions(string projectId, string subscriptionId)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();

        var schemaCache = new ConcurrentDictionary<(string, string), Avro.Schema>();

        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            // Get the schema name, revision ID and encoding type from the message.
            string encoding = message.Attributes["googclient_schemaencoding"];
            string schemaName = message.Attributes["googclient_schemaname"];
            string revision = message.Attributes["googclient_schemarevisionid"];

            // Fetch the schema if we don't already have it.
            var avroSchema = schemaCache.GetOrAdd((schemaName, revision), key =>
            {
                var pubSubSchema = schemaService.GetSchema($"{schemaName}@{revision}");
                return Avro.Schema.Parse(pubSubSchema.Definition);
            });

            // Read the message.
            if (encoding == "BINARY")
            {
                using var ms = new MemoryStream(message.Data.ToByteArray());
                var decoder = new BinaryDecoder(ms);
                var reader = new DefaultReader(avroSchema, avroSchema);
                var record = reader.Read<GenericRecord>(null, decoder);
                Console.WriteLine($"Message {message.MessageId}: {record.GetValue(0)}");
                Interlocked.Increment(ref messageCount);
            }
            else
            {
                Console.WriteLine("Expected only binary messages in this sample");
            }
            return Task.FromResult(SubscriberClient.Reply.Ack);
        });
        // Run for 10 seconds.
        await Task.Delay(10_000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return (messageCount, schemaCache.Count);
    }
}

Go

在試用這個範例之前,請先按照「使用用戶端程式庫的 Pub/Sub 快速入門導覽課程」中的 Go 設定說明操作。詳情請參閱 Pub/Sub Go API 參考文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import (
	"context"
	"fmt"
	"io"
	"sync"
	"time"

	"cloud.google.com/go/pubsub/v2"
	schema "cloud.google.com/go/pubsub/v2/apiv1"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"github.com/linkedin/goavro/v2"
)

func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	schemaClient, err := schema.NewSchemaClient(ctx)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}

	// Create the cache for the codecs for different revision IDs.
	revisionCodecs := make(map[string]*goavro.Codec)

	sub := client.Subscriber(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		name := msg.Attributes["googclient_schemaname"]
		revision := msg.Attributes["googclient_schemarevisionid"]

		codec, ok := revisionCodecs[revision]
		// If the codec doesn't exist in the map, this is the first time we
		// are seeing this revision. We need to fetch the schema and cache the
		// codec. It would be more typical to do this asynchronously, but is
		// shown here in a synchronous way to ease readability.
		if !ok {
			s := &pubsubpb.GetSchemaRequest{
				Name: fmt.Sprintf("%s@%s", name, revision),
				View: pubsubpb.SchemaView_FULL,
			}
			schema, err := schemaClient.GetSchema(ctx, s)
			if err != nil {
				fmt.Fprintf(w, "Nacking, cannot read message without schema: %v\n", err)
				msg.Nack()
				return
			}
			codec, err = goavro.NewCodec(schema.Definition)
			if err != nil {
				msg.Nack()
				fmt.Fprintf(w, "goavro.NewCodec err: %v\n", err)
			}
			revisionCodecs[revision] = codec
		}

		encoding := msg.Attributes["googclient_schemaencoding"]

		var state map[string]interface{}
		if encoding == "BINARY" {
			data, _, err := codec.NativeFromBinary(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else if encoding == "JSON" {
			data, _, err := codec.NativeFromTextual(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
		msg.Ack()
	})
	return nil
}

Java

在試用這個範例之前,請先按照「使用用戶端程式庫的 Pub/Sub 快速入門導覽課程」中的 Java 設定說明操作。詳情請參閱 Pub/Sub Java API 參考文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Schema;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import utilities.State;

public class SubscribeWithAvroSchemaRevisionsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use an existing subscription.
    String subscriptionId = "your-subscription-id";

    subscribeWithAvroSchemaRevisionsExample(projectId, subscriptionId);
  }

  static SchemaServiceClient getSchemaServiceClient() {
    try {
      return SchemaServiceClient.create();
    } catch (IOException e) {
      System.out.println("Could not get schema client: " + e);
      return null;
    }
  }

  public static void subscribeWithAvroSchemaRevisionsExample(
      String projectId, String subscriptionId) {
    // Used to get the schemas for revsions.
    final SchemaServiceClient schemaServiceClient = getSchemaServiceClient();
    if (schemaServiceClient == null) {
      return;
    }

    // Cache for the readers for different revision IDs.
    Map<String, SpecificDatumReader<State>> revisionReaders =
        new HashMap<String, SpecificDatumReader<State>>();

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Get the schema encoding type.
          String name = message.getAttributesMap().get("googclient_schemaname");
          String revision = message.getAttributesMap().get("googclient_schemarevisionid");

          SpecificDatumReader<State> reader = null;
          synchronized (revisionReaders) {
            reader = revisionReaders.get(revision);
          }
          if (reader == null) {
            // This is the first time we are seeing this revision. We need to
            // fetch the schema and cache its decoder. It would be more typical
            // to do this asynchronously, but is shown here in a synchronous
            // way to ease readability.
            try {
              Schema schema = schemaServiceClient.getSchema(name + "@" + revision);
              org.apache.avro.Schema avroSchema =
                  new org.apache.avro.Schema.Parser().parse(schema.getDefinition());
              reader = new SpecificDatumReader<State>(avroSchema, State.getClassSchema());
              synchronized (revisionReaders) {
                revisionReaders.put(revision, reader);
              }
            } catch (Exception e) {
              System.out.println("Could not get schema: " + e);
              // Without the schema, we cannot read the message, so nack it.
              consumer.nack();
              return;
            }
          }

          ByteString data = message.getData();
          // Send the message data to a byte[] input stream.
          InputStream inputStream = new ByteArrayInputStream(data.toByteArray());

          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          Decoder decoder = null;

          // Prepare an appropriate decoder for the message data in the input stream
          // based on the schema encoding type.
          try {
            switch (encoding) {
              case "BINARY":
                decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /* reuse= */ null);
                System.out.println("Receiving a binary-encoded message:");
                break;
              case "JSON":
                decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream);
                System.out.println("Receiving a JSON-encoded message:");
                break;
              default:
                System.out.println("Unknown message type; nacking.");
                consumer.nack();
                break;
            }

            // Obtain an object of the generated Avro class using the decoder.
            State state = reader.read(null, decoder);
            System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr());

            // Ack the message.
            consumer.ack();
          } catch (IOException e) {
            System.err.println(e);
            // If we failed to process the message, nack it.
            consumer.nack();
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      subscriber.stopAsync();
    }
  }
}

後續步驟

如要搜尋及篩選其他 Google Cloud 產品的程式碼範例,請參閱Google Cloud 範例瀏覽工具