Nachrichten empfangen, die für verschiedene Schemaversionen bestimmt sein könnten

Nachrichten empfangen, die für verschiedene Schemaversionen bestimmt sein könnten

Weitere Informationen

Eine ausführliche Dokumentation, die dieses Codebeispiel enthält, finden Sie hier:

Codebeispiel

C++

Lesen Sie unter Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von C++, bevor Sie dieses Beispiel ausprobieren.C++ Weitere Informationen finden Sie in der Pub/Sub C++ API Referenzdokumentation.

Richten Sie zur Authentifizierung bei Pub/Sub die Standardanmeldedaten für Anwendungen (ADC) ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
    pubsub::Subscription(project_id, subscription_id)));

// Create a schema client.
auto schema_client =
    pubsub::SchemaServiceClient(pubsub::MakeSchemaServiceConnection());

// Read the reader schema. This is the schema you want the messages to be
// evaluated using.
std::ifstream ifs(avro_file);
avro::ValidSchema reader_schema;
avro::compileJsonSchema(ifs, reader_schema);

std::unordered_map<std::string, avro::ValidSchema> revisions_to_schemas;
auto session = subscriber.Subscribe(
    [&](pubsub::Message const& message, pubsub::AckHandler h) {
      // Get the reader schema revision for the message.
      auto schema_name = message.attributes()["googclient_schemaname"];
      auto schema_revision_id =
          message.attributes()["googclient_schemarevisionid"];
      // If we haven't received a message with this schema, look it up.
      if (revisions_to_schemas.find(schema_revision_id) ==
          revisions_to_schemas.end()) {
        auto schema_path = schema_name + "@" + schema_revision_id;
        // Use the schema client to get the path.
        auto schema = schema_client.GetSchema(schema_path);
        if (!schema) {
          std::cout << "Schema not found:" << schema_path << "\n";
          return;
        }
        avro::ValidSchema writer_schema;
        std::stringstream in;
        in << schema.value().definition();
        avro::compileJsonSchema(in, writer_schema);
        revisions_to_schemas[schema_revision_id] = writer_schema;
      }
      auto writer_schema = revisions_to_schemas[schema_revision_id];

      auto encoding = message.attributes()["googclient_schemaencoding"];
      if (encoding == "JSON") {
        std::stringstream in;
        in << message.data();
        auto avro_in = avro::istreamInputStream(in);
        avro::DecoderPtr decoder = avro::resolvingDecoder(
            writer_schema, reader_schema, avro::jsonDecoder(writer_schema));
        decoder->init(*avro_in);

        v2::State state;
        avro::decode(*decoder, state);
        std::cout << "Name: " << state.name << "\n";
        std::cout << "Postal Abbreviation: " << state.post_abbr << "\n";
        std::cout << "Population: " << state.population << "\n";
      } else {
        std::cout << "Unable to decode. Received message using encoding"
                  << encoding << "\n";
      }
      std::move(h).ack();
    });

C#

Lesen Sie unter Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von C#, bevor Sie dieses Beispiel ausprobieren.C# Weitere Informationen finden Sie in der Pub/Sub C# API Referenzdokumentation.

Richten Sie zur Authentifizierung bei Pub/Sub die Standardanmeldedaten für Anwendungen (ADC) ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

using Avro.Generic;
using Avro.IO;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public class SubscribeAvroRecordsWithRevisionsSample
{
    public async Task<(int, int)> SubscribeAvroRecordsWithRevisions(string projectId, string subscriptionId)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();

        var schemaCache = new ConcurrentDictionary<(string, string), Avro.Schema>();

        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            // Get the schema name, revision ID and encoding type from the message.
            string encoding = message.Attributes["googclient_schemaencoding"];
            string schemaName = message.Attributes["googclient_schemaname"];
            string revision = message.Attributes["googclient_schemarevisionid"];

            // Fetch the schema if we don't already have it.
            var avroSchema = schemaCache.GetOrAdd((schemaName, revision), key =>
            {
                var pubSubSchema = schemaService.GetSchema($"{schemaName}@{revision}");
                return Avro.Schema.Parse(pubSubSchema.Definition);
            });

            // Read the message.
            if (encoding == "BINARY")
            {
                using var ms = new MemoryStream(message.Data.ToByteArray());
                var decoder = new BinaryDecoder(ms);
                var reader = new DefaultReader(avroSchema, avroSchema);
                var record = reader.Read<GenericRecord>(null, decoder);
                Console.WriteLine($"Message {message.MessageId}: {record.GetValue(0)}");
                Interlocked.Increment(ref messageCount);
            }
            else
            {
                Console.WriteLine("Expected only binary messages in this sample");
            }
            return Task.FromResult(SubscriberClient.Reply.Ack);
        });
        // Run for 10 seconds.
        await Task.Delay(10_000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return (messageCount, schemaCache.Count);
    }
}

Go

Lesen Sie unter Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von Go, bevor Sie dieses Beispiel ausprobieren.Go Weitere Informationen finden Sie in der Pub/Sub Go API Referenzdokumentation.

Richten Sie zur Authentifizierung bei Pub/Sub die Standardanmeldedaten für Anwendungen (ADC) ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"
	"sync"
	"time"

	"cloud.google.com/go/pubsub/v2"
	schema "cloud.google.com/go/pubsub/v2/apiv1"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"github.com/linkedin/goavro/v2"
)

func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	schemaClient, err := schema.NewSchemaClient(ctx)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}

	// Create the cache for the codecs for different revision IDs.
	revisionCodecs := make(map[string]*goavro.Codec)

	sub := client.Subscriber(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		name := msg.Attributes["googclient_schemaname"]
		revision := msg.Attributes["googclient_schemarevisionid"]

		codec, ok := revisionCodecs[revision]
		// If the codec doesn't exist in the map, this is the first time we
		// are seeing this revision. We need to fetch the schema and cache the
		// codec. It would be more typical to do this asynchronously, but is
		// shown here in a synchronous way to ease readability.
		if !ok {
			s := &pubsubpb.GetSchemaRequest{
				Name: fmt.Sprintf("%s@%s", name, revision),
				View: pubsubpb.SchemaView_FULL,
			}
			schema, err := schemaClient.GetSchema(ctx, s)
			if err != nil {
				fmt.Fprintf(w, "Nacking, cannot read message without schema: %v\n", err)
				msg.Nack()
				return
			}
			codec, err = goavro.NewCodec(schema.Definition)
			if err != nil {
				msg.Nack()
				fmt.Fprintf(w, "goavro.NewCodec err: %v\n", err)
			}
			revisionCodecs[revision] = codec
		}

		encoding := msg.Attributes["googclient_schemaencoding"]

		var state map[string]interface{}
		if encoding == "BINARY" {
			data, _, err := codec.NativeFromBinary(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else if encoding == "JSON" {
			data, _, err := codec.NativeFromTextual(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
		msg.Ack()
	})
	return nil
}

Java

Lesen Sie unter Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von Java, bevor Sie dieses Beispiel ausprobieren.Java Weitere Informationen finden Sie in der Pub/Sub Java API Referenzdokumentation.

Richten Sie zur Authentifizierung bei Pub/Sub die Standardanmeldedaten für Anwendungen (ADC) ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Schema;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import utilities.State;

public class SubscribeWithAvroSchemaRevisionsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use an existing subscription.
    String subscriptionId = "your-subscription-id";

    subscribeWithAvroSchemaRevisionsExample(projectId, subscriptionId);
  }

  static SchemaServiceClient getSchemaServiceClient() {
    try {
      return SchemaServiceClient.create();
    } catch (IOException e) {
      System.out.println("Could not get schema client: " + e);
      return null;
    }
  }

  public static void subscribeWithAvroSchemaRevisionsExample(
      String projectId, String subscriptionId) {
    // Used to get the schemas for revsions.
    final SchemaServiceClient schemaServiceClient = getSchemaServiceClient();
    if (schemaServiceClient == null) {
      return;
    }

    // Cache for the readers for different revision IDs.
    Map<String, SpecificDatumReader<State>> revisionReaders =
        new HashMap<String, SpecificDatumReader<State>>();

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Get the schema encoding type.
          String name = message.getAttributesMap().get("googclient_schemaname");
          String revision = message.getAttributesMap().get("googclient_schemarevisionid");

          SpecificDatumReader<State> reader = null;
          synchronized (revisionReaders) {
            reader = revisionReaders.get(revision);
          }
          if (reader == null) {
            // This is the first time we are seeing this revision. We need to
            // fetch the schema and cache its decoder. It would be more typical
            // to do this asynchronously, but is shown here in a synchronous
            // way to ease readability.
            try {
              Schema schema = schemaServiceClient.getSchema(name + "@" + revision);
              org.apache.avro.Schema avroSchema =
                  new org.apache.avro.Schema.Parser().parse(schema.getDefinition());
              reader = new SpecificDatumReader<State>(avroSchema, State.getClassSchema());
              synchronized (revisionReaders) {
                revisionReaders.put(revision, reader);
              }
            } catch (Exception e) {
              System.out.println("Could not get schema: " + e);
              // Without the schema, we cannot read the message, so nack it.
              consumer.nack();
              return;
            }
          }

          ByteString data = message.getData();
          // Send the message data to a byte[] input stream.
          InputStream inputStream = new ByteArrayInputStream(data.toByteArray());

          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          Decoder decoder = null;

          // Prepare an appropriate decoder for the message data in the input stream
          // based on the schema encoding type.
          try {
            switch (encoding) {
              case "BINARY":
                decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /* reuse= */ null);
                System.out.println("Receiving a binary-encoded message:");
                break;
              case "JSON":
                decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream);
                System.out.println("Receiving a JSON-encoded message:");
                break;
              default:
                System.out.println("Unknown message type; nacking.");
                consumer.nack();
                break;
            }

            // Obtain an object of the generated Avro class using the decoder.
            State state = reader.read(null, decoder);
            System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr());

            // Ack the message.
            consumer.ack();
          } catch (IOException e) {
            System.err.println(e);
            // If we failed to process the message, nack it.
            consumer.nack();
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      subscriber.stopAsync();
    }
  }
}

Ruby

Lesen Sie unter Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von Ruby, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Pub/Sub Ruby API Referenzdokumentation.

Richten Sie zur Authentifizierung bei Pub/Sub die Standardanmeldedaten für Anwendungen (ADC) ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

# Cache for the parsed Avro schemas mapped by revision ID.
schema_cache = {}
cache_mutex = Mutex.new

listener = subscriber.listen do |received_message|
  schema_name = received_message.attributes["googclient_schemaname"]
  revision_id = received_message.attributes["googclient_schemarevisionid"]
  encoding = received_message.attributes["googclient_schemaencoding"]

  # Prevent concurrent threads from racing to fetch and parse the same schema.
  avro_schema = cache_mutex.synchronize { schema_cache[revision_id] }

  if avro_schema.nil?
    begin
      require "avro"
      # The resource name format is projects/{project}/schemas/{schema}@{revision}.
      schema_resource = pubsub.schemas.get_schema name: "#{schema_name}@#{revision_id}"

      avro_schema = Avro::Schema.parse schema_resource.definition

      cache_mutex.synchronize { schema_cache[revision_id] = avro_schema }
    rescue StandardError => e
      puts "Could not get schema for revision #{revision_id}: #{e.message}"
      received_message.reject!
      next
    end
  end

  begin
    case encoding
    when "BINARY"
      require "avro"
      buffer = StringIO.new received_message.data
      decoder = Avro::IO::BinaryDecoder.new buffer
      reader = Avro::IO::DatumReader.new avro_schema
      message_data = reader.read decoder
      puts "Received a binary-encoded message:\n#{message_data}"
    when "JSON"
      require "json"
      message_data = JSON.parse received_message.data
      puts "Received a JSON-encoded message:\n#{message_data}"
    else
      puts "Unknown message encoding: #{encoding}. Rejecting message."
      received_message.reject!
      next
    end

    received_message.acknowledge!
  rescue StandardError => e
    puts "Failed to process message: #{e.message}"
    received_message.reject!
  end
end

listener.start

# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit.
sleep 60
listener.stop.wait!

Weitere Informationen

Wenn Sie nach Codebeispielen für andere Produkte von Google Cloud suchen und filtern möchten, können Sie den Beispielbrowser fürGoogle Cloud verwenden.