向具有架构的主题发布消息

本文档介绍了如何向具有架构的主题发布消息。

准备工作

在配置发布工作流之前,请确保您已完成以下任务:

所需的角色

如需获得向主题发布消息所需的权限,请让您的管理员向您授予主题的 Pub/Sub Publisher (roles/pubsub.publisher) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义 角色或其他预定义 角色来获取所需的权限。

您需要获得其他权限才能 创建或更新主题和订阅。

发布具有架构的消息

您可以向与架构关联的主题发布消息。 您必须以创建主题时指定的架构和格式对消息进行编码 如果消息与架构的任何修订版本(在 允许的修订版本范围内)匹配,则该消息与主题关联的架构匹配。 系统会按顺序从最新的允许修订版本到最旧的允许修订版本评估消息,直到找到匹配项或到达最旧的允许修订版本为止。Pub/Sub 会将以下属性添加到成功发布到与架构关联的主题的消息中:

  • googclient_schemaname:用于验证的架构的名称。

  • googclient_schemaencoding:消息的编码(JSON 或二进制)。

  • googclient_schemarevisionid:用于解析和验证消息的架构的修订版本 ID。每个修订版本都有一个与之关联的唯一修订版本 ID。修订版本 ID 是自动生成的 8 字符 UUID。

如果消息与主题允许的任何架构修订版本都不匹配,Pub/Sub 会向发布请求返回 INVALID_ARGUMENT 错误。

Pub/Sub 仅在发布时根据架构修订版本评估消息。 在发布消息后提交新的架构修订版本更改与主题关联的架构 不会重新评估该消息,也不会更改任何 附加的架构消息属性。

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库,向 Google Cloud 项目中具有关联架构的主题发布消息。

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在控制台的底部启动,并显示命令行提示符。 Google Cloud Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境 。该会话可能需要几秒钟来完成初始化。

  2. 使用 gcloud pubsub topics publish 命令发布示例消息。

    gcloud pubsub topics publish TOPIC_ID \
        --message=MESSAGE
    

    替换以下内容:

  • TOPIC_ID:您已创建的主题的名称。

  • MESSAGE:发布到主题的消息。示例消息 可以是 {"name": "Alaska", "post_abbr": "AK"}

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

Avro
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  auto constexpr kNewYork =
      R"js({ "name": "New York", "post_abbr": "NY" })js";
  auto constexpr kPennsylvania =
      R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
  std::vector<future<void>> done;
  auto handler = [](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
  };
  for (auto const* data : {kNewYork, kPennsylvania}) {
    done.push_back(
        publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
            .then(handler));
  }
  // Block until all messages are published.
  for (auto& d : done) d.get();
}
Proto
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  std::vector<std::pair<std::string, std::string>> states{
      {"New York", "NY"},
      {"Pennsylvania", "PA"},
  };
  std::vector<future<void>> done;
  auto handler = [](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
  };
  for (auto& data : states) {
    google::cloud::pubsub::samples::State state;
    state.set_name(data.first);
    state.set_post_abbr(data.second);
    done.push_back(publisher
                       .Publish(pubsub::MessageBuilder{}
                                    .SetData(state.SerializeAsString())
                                    .Build())
                       .then(handler));
  }
  // Block until all messages are published.
  for (auto& d : done) d.get();
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档

Avro

using Avro.IO;
using Avro.Specific;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishAvroMessagesAsyncSample
{
    public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
        var topic = publishApi.GetTopic(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageStates.Select(async state =>
        {

            try
            {
                string messageId = null;
                switch (topic.SchemaSettings.Encoding)
                {
                    case Encoding.Binary:
                        using (var ms = new MemoryStream())
                        {
                            var encoder = new BinaryEncoder(ms);
                            var writer = new SpecificDefaultWriter(state.Schema);
                            writer.Write(state, encoder);
                            messageId = await publisher.PublishAsync(ms.ToArray());
                        }
                        break;
                    case Encoding.Json:
                        var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
                        messageId = await publisher.PublishAsync(jsonMessage);
                        break;
                }
                Console.WriteLine($"Published message {messageId}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        // PublisherClient instance should be shutdown after use.
        // The TimeSpan specifies for how long to attempt to publish locally queued messages.
        await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));
        return publishedMessageCount;
    }
}
Proto

using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishProtoMessagesAsyncSample
{
    public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
        var topic = publishApi.GetTopic(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageStates.Select(async state =>
        {
            try
            {
                string messageId = null;
                switch (topic.SchemaSettings.Encoding)
                {
                    case Encoding.Binary:
                        var binaryMessage = state.ToByteString();
                        messageId = await publisher.PublishAsync(binaryMessage);
                        break;
                    case Encoding.Json:
                        var jsonMessage = JsonFormatter.Default.Format(state);
                        messageId = await publisher.PublishAsync(jsonMessage);
                        break;
                }
                Console.WriteLine($"Published message {messageId}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        // PublisherClient instance should be shutdown after use.
        // The TimeSpan specifies for how long to attempt to publish locally queued messages.
        await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));
        return publishedMessageCount;
    }
}

Go

以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅 迁移到 v2 的指南。 如需查看 v1 代码示例列表,请参阅 已废弃的代码示例

在尝试此示例之前,请按照 《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

Avro
import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"github.com/linkedin/goavro/v2"
)

func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	avroSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("os.ReadFile err: %w", err)
	}
	codec, err := goavro.NewCodec(string(avroSource))
	if err != nil {
		return fmt.Errorf("goavro.NewCodec err: %w", err)
	}
	record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}

	// Get the topic encoding type.
	req := &pubsubpb.GetTopicRequest{
		Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
	}
	t, err := client.TopicAdminClient.GetTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("got err in GetTopic: %w", err)
	}
	encoding := t.SchemaSettings.Encoding

	var msg []byte
	switch encoding {
	case pubsubpb.Encoding_BINARY:
		msg, err = codec.BinaryFromNative(nil, record)
		if err != nil {
			return fmt.Errorf("codec.BinaryFromNative err: %w", err)
		}
	case pubsubpb.Encoding_JSON:
		msg, err = codec.TextualFromNative(nil, record)
		if err != nil {
			return fmt.Errorf("codec.TextualFromNative err: %w", err)
		}
	default:
		return fmt.Errorf("invalid encoding: %v", encoding)
	}

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	publisher := client.Publisher(topicID)
	result := publisher.Publish(ctx, &pubsub.Message{
		Data: msg,
	})
	_, err = result.Get(ctx)
	if err != nil {
		return fmt.Errorf("result.Get: %w", err)
	}
	fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
	return nil
}
Proto
import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
)

func publishProtoMessages(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	state := &statepb.State{
		Name:     "Alaska",
		PostAbbr: "AK",
	}

	// Get the topic encoding type.
	req := &pubsubpb.GetTopicRequest{
		Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
	}
	t, err := client.TopicAdminClient.GetTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("got err in GetTopic: %w", err)
	}
	encoding := t.SchemaSettings.Encoding

	var msg []byte
	switch encoding {
	case pubsubpb.Encoding_BINARY:
		msg, err = proto.Marshal(state)
		if err != nil {
			return fmt.Errorf("proto.Marshal err: %w", err)
		}
	case pubsubpb.Encoding_JSON:
		msg, err = protojson.Marshal(state)
		if err != nil {
			return fmt.Errorf("protojson.Marshal err: %w", err)
		}
	default:
		return fmt.Errorf("invalid encoding: %v", encoding)
	}

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	publisher := client.Publisher(topicID)
	result := publisher.Publish(ctx, &pubsub.Message{
		Data: msg,
	})
	_, err = result.Get(ctx)
	if err != nil {
		return fmt.Errorf("result.Get: %w", err)
	}
	fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

Avro

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import utilities.State;

public class PublishAvroRecordsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use a topic created with an Avro schema.
    String topicId = "your-topic-id";

    publishAvroRecordsExample(projectId, topicId);
  }

  public static void publishAvroRecordsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {

    Encoding encoding = null;

    TopicName topicName = TopicName.of(projectId, topicId);

    // Get the topic encoding type.
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
    }

    // Instantiate an avro-tools-generated class defined in `us-states.avsc`.
    State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();

    Publisher publisher = null;

    block:
    try {
      publisher = Publisher.newBuilder(topicName).build();

      // Prepare to serialize the object to the output stream.
      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();

      Encoder encoder = null;

      // Prepare an appropriate encoder for publishing to the topic.
      switch (encoding) {
        case BINARY:
          System.out.println("Preparing a BINARY encoder...");
          encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /* reuse= */ null);
          break;

        case JSON:
          System.out.println("Preparing a JSON encoder...");
          encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
          break;

        default:
          break block;
      }

      // Encode the object and write it to the output stream.
      state.customEncode(encoder);
      encoder.flush();

      // Publish the encoded object as a Pub/Sub message.
      ByteString data = ByteString.copyFrom(byteStream.toByteArray());
      PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
      System.out.println("Publishing message: " + message);

      ApiFuture<String> future = publisher.publish(message);
      System.out.println("Published message ID: " + future.get());

    } finally {
      if (publisher != null) {
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}
Proto

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import utilities.StateProto.State;

public class PublishProtobufMessagesExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use a topic created with a proto schema.
    String topicId = "your-topic-id";

    publishProtobufMessagesExample(projectId, topicId);
  }

  public static void publishProtobufMessagesExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {

    Encoding encoding = null;

    TopicName topicName = TopicName.of(projectId, topicId);

    // Get the topic encoding type.
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
    }

    Publisher publisher = null;

    // Instantiate a protoc-generated class defined in `us-states.proto`.
    State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();

    block:
    try {
      publisher = Publisher.newBuilder(topicName).build();

      PubsubMessage.Builder message = PubsubMessage.newBuilder();

      // Prepare an appropriately formatted message based on topic encoding.
      switch (encoding) {
        case BINARY:
          message.setData(state.toByteString());
          System.out.println("Publishing a BINARY-formatted message:\n" + message);
          break;

        case JSON:
          String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
          message.setData(ByteString.copyFromUtf8(jsonString));
          System.out.println("Publishing a JSON-formatted message:\n" + message);
          break;

        default:
          break block;
      }

      // Publish the message.
      ApiFuture<String> future = publisher.publish(message.build());
      System.out.println("Published message ID: " + future.get());

    } finally {
      if (publisher != null) {
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

在尝试此示例之前,请按照 《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

Avro
/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');

// And the Apache Avro library
const avro = require('avro-js');
const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishAvroRecords(topicNameOrId) {
  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  // Get the topic metadata to learn about its schema encoding.
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Make an encoder using the official avro-js library.
  const definition = fs
    .readFileSync('system-test/fixtures/provinces.avsc')
    .toString();
  const type = avro.parse(definition);

  // Encode the message.
  const province = {
    name: 'Ontario',
    post_abbr: 'ON',
  };
  let dataBuffer;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = type.toBuffer(province);
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(type.toString(province));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publish(dataBuffer);
  console.log(`Avro record ${messageId} published.`);
}
协议缓冲区
/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');

// And the protobufjs library
const protobuf = require('protobufjs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishProtobufMessages(topicNameOrId) {
  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  // Get the topic metadata to learn about its schema.
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Encode the message.
  const province = {
    name: 'Ontario',
    postAbbr: 'ON',
  };

  // Make an encoder using the protobufjs library.
  //
  // Since we're providing the test message for a specific schema here, we'll
  // also code in the path to a sample proto definition.
  const root = await protobuf.load('system-test/fixtures/provinces.proto');
  const Province = root.lookupType('utilities.Province');
  const message = Province.create(province);

  let dataBuffer;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = Buffer.from(Province.encode(message).finish());
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publishMessage({data: dataBuffer});
  console.log(`Protobuf message ${messageId} published.`);
}

Node.js

在尝试此示例之前,请按照 《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

Avro
/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';

// And the Apache Avro library
import * as avro from 'avro-js';
import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

interface ProvinceObject {
  name: string;
  post_abbr: string;
}

async function publishAvroRecords(topicNameOrId: string) {
  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  // Get the topic metadata to learn about its schema encoding.
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Make an encoder using the official avro-js library.
  const definition = fs
    .readFileSync('system-test/fixtures/provinces.avsc')
    .toString();
  const type = avro.parse(definition);

  // Encode the message.
  const province: ProvinceObject = {
    name: 'Ontario',
    post_abbr: 'ON',
  };
  let dataBuffer: Buffer | undefined;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = type.toBuffer(province);
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(type.toString(province));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publish(dataBuffer);
  console.log(`Avro record ${messageId} published.`);
}
协议缓冲区
/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';

// And the protobufjs library
import * as protobuf from 'protobufjs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

interface ProvinceObject {
  name: string;
  postAbbr: string;
}

async function publishProtobufMessages(topicNameOrId: string) {
  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  // Get the topic metadata to learn about its schema.
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Encode the message.
  const province: ProvinceObject = {
    name: 'Ontario',
    postAbbr: 'ON',
  };

  // Make an encoder using the protobufjs library.
  //
  // Since we're providing the test message for a specific schema here, we'll
  // also code in the path to a sample proto definition.
  const root = await protobuf.load('system-test/fixtures/provinces.proto');
  const Province = root.lookupType('utilities.Province');
  const message = Province.create(province);

  let dataBuffer: Buffer | undefined;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = Buffer.from(Province.encode(message).finish());
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }
  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publishMessage({data: dataBuffer});
  console.log(`Protobuf message ${messageId} published.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

Avro
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;

use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroIOBinaryEncoder;

/**
 * Publish a message using an AVRO schema.
 *
 * This sample uses `wikimedia/avro` for AVRO encoding.
 *
 * @param string $projectId
 * @param string $topicId
 * @param string $definitionFile
 */
function publish_avro_records($projectId, $topicId, $definitionFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = (string) file_get_contents($definitionFile);

    $messageData = [
        'name' => 'Alaska',
        'post_abbr' => 'AK',
    ];

    $topic = $pubsub->topic($topicId);

    // get the encoding type.
    $topicInfo = $topic->info();
    $encoding = '';
    if (isset($topicInfo['schemaSettings']['encoding'])) {
        $encoding = $topicInfo['schemaSettings']['encoding'];
    }

    // if encoding is not set, we can't continue.
    if ($encoding === '') {
        printf('Topic %s does not have schema enabled', $topicId);
        return;
    }

    // If you are using gRPC, encoding may be an integer corresponding to an
    // enum value on Google\Cloud\PubSub\V1\Encoding.
    if (!is_string($encoding)) {
        $encoding = Encoding::name($encoding);
    }

    $encodedMessageData = '';
    if ($encoding == 'BINARY') {
        // encode as AVRO binary.
        $io = new AvroStringIO();
        $schema = AvroSchema::parse($definition);
        $writer = new AvroIODatumWriter($schema);
        $encoder = new AvroIOBinaryEncoder($io);
        $writer->write($messageData, $encoder);

        $encodedMessageData = $io->string();
    } else {
        // encode as JSON.
        $encodedMessageData = json_encode($messageData);
    }

    $topic->publish(['data' => $encodedMessageData]);

    printf('Published message with %s encoding', $encoding);
}
协议缓冲区
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;

use Utilities\StateProto;

/**
 * Publish a message using a protocol buffer schema.
 *
 * Relies on a proto message of the following form:
 * ```
 * syntax = "proto3";
 *
 * package utilities;
 *
 * message StateProto {
 *   string name = 1;
 *   string post_abbr = 2;
 * }
 * ```
 *
 * @param string $projectId
 * @param string $topicId
 * @return void
 */
function publish_proto_messages($projectId, $topicId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $messageData = new StateProto([
        'name' => 'Alaska',
        'post_abbr' => 'AK',
    ]);

    $topic = $pubsub->topic($topicId);

    // get the encoding type.
    $topicInfo = $topic->info();
    $encoding = '';
    if (isset($topicInfo['schemaSettings']['encoding'])) {
        $encoding = $topicInfo['schemaSettings']['encoding'];
    }

    // if encoding is not set, we can't continue.
    if ($encoding === '') {
        printf('Topic %s does not have schema enabled', $topicId);
        return;
    }

    // If you are using gRPC, encoding may be an integer corresponding to an
    // enum value on Google\Cloud\PubSub\V1\Encoding.
    if (!is_string($encoding)) {
        $encoding = Encoding::name($encoding);
    }

    $encodedMessageData = '';
    if ($encoding == 'BINARY') {
        // encode as protobuf binary.
        $encodedMessageData = $messageData->serializeToString();
    } else {
        // encode as JSON.
        $encodedMessageData = $messageData->serializeToJsonString();
    }

    $topic->publish(['data' => $encodedMessageData]);

    printf('Published message with %s encoding', $encoding);
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

Avro
from avro.io import BinaryEncoder, DatumWriter
import avro.schema as schema
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

# Prepare to write Avro records to the binary output stream.
with open(avsc_file, "rb") as file:
    avro_schema = schema.parse(file.read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()

# Prepare some data using a Python dictionary that matches the Avro schema
record = {"name": "Alaska", "post_abbr": "AK"}

try:
    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        encoder = BinaryEncoder(bout)
        writer.write(record, encoder)
        data = bout.getvalue()
        print(f"Preparing a binary-encoded message:\n{data.decode()}")
    elif encoding == Encoding.JSON:
        data_str = json.dumps(record)
        print(f"Preparing a JSON-encoded message:\n{data_str}")
        data = data_str.encode("utf-8")
    else:
        print(f"No encoding specified in {topic_path}. Abort.")
        exit(0)

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")
协议缓冲区
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.protobuf.json_format import MessageToJson
from google.pubsub_v1.types import Encoding

from utilities import us_states_pb2  # type: ignore

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

try:
    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Instantiate a protoc-generated class defined in `us-states.proto`.
    state = us_states_pb2.StateProto()
    state.name = "Alaska"
    state.post_abbr = "AK"

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        data = state.SerializeToString()
        print(f"Preparing a binary-encoded message:\n{data}")
    elif encoding == Encoding.JSON:
        json_object = MessageToJson(state)
        data = str(json_object).encode("utf-8")
        print(f"Preparing a JSON-encoded message:\n{data}")
    else:
        print(f"No encoding specified in {topic_path}. Abort.")
        exit(0)

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")

Ruby

以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。 如需查看 Ruby v2 代码示例列表,请参阅 已废弃的代码示例

在尝试此示例之前,请按照 《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

Avro
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

pubsub = Google::Cloud::PubSub.new
topic_admin = pubsub.topic_admin
publisher = pubsub.publisher topic_id
record = { "name" => "Alaska", "post_abbr" => "AK" }

topic = topic_admin.get_topic topic: pubsub.topic_path(topic_id)
encoding = topic.schema_settings.encoding

case encoding
when :BINARY
  require "avro"
  avro_schema = Avro::Schema.parse File.read(avsc_file)
  writer = Avro::IO::DatumWriter.new avro_schema
  buffer = StringIO.new
  encoder = Avro::IO::BinaryEncoder.new buffer
  writer.write record, encoder
  publisher.publish buffer
  puts "Published binary-encoded AVRO message."
when :JSON
  require "json"
  publisher.publish record.to_json
  puts "Published JSON-encoded AVRO message."
else
  raise "No encoding specified in #{topic.name}."
end
协议缓冲区
# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
topic_admin = pubsub.topic_admin
publisher = pubsub.publisher topic_id
state = Utilities::StateProto.new name: "Alaska", post_abbr: "AK"

topic = topic_admin.get_topic topic: pubsub.topic_path(topic_id)
encoding = topic.schema_settings.encoding

case encoding
when :BINARY
  publisher.publish Utilities::StateProto.encode(state)
  puts "Published binary-encoded protobuf message."
when :JSON
  publisher.publish Utilities::StateProto.encode_json(state)
  puts "Published JSON-encoded protobuf message."
else
  raise "No encoding specified in #{topic.name}."
end

后续步骤