创建主题

在 Pub/Sub 中,主题是代表消息信息流的命名资源。您必须先创建一个主题,然后才能向其发布消息或订阅该主题。Pub/Sub 支持两种类型的主题:标准主题和导入主题。

本文档介绍如何创建 Pub/Sub 标准主题。如果您想详细了解导入主题以及如何创建导入主题,请参阅 关于导入主题

如需创建主题,您可以使用 Google Cloud 控制台、Google Cloud CLI、 客户端库或 Pub/Sub API。

准备工作

所需的角色和权限

如需获得创建主题所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor(roles/pubsub.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色可提供创建主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要具备以下权限才能创建主题:

  • 授予此权限可在项目中创建主题: pubsub.topics.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级层和个别资源级层配置访问权限控制。您可以在一个项目中创建订阅,并将其附加到位于不同项目中的主题。 确保您对每个项目都拥有所需的权限。

主题的属性

创建或更新主题时,您必须指定其属性。

添加默认订阅

向 Pub/Sub 主题添加默认订阅。 您可以在创建主题后为该主题创建另一个订阅。 默认订阅具有以下属性:

  • 订阅 ID 为 -sub
  • 拉取传送类型
  • 消息保留时长为 7 天
  • 在处于非活跃状态 31 天后过期
  • 确认时限为 10 秒
  • 立即重试政策

使用架构

架构是消息数据字段必须遵循的格式。 架构是发布者和订阅者之间 Pub/Sub 将强制执行的合同。主题架构有助于标准化消息类型和权限,以便组织中的不同团队使用它们。Pub/Sub 为消息类型和权限创建中央授权机构。如需创建具有架构的主题, 请参阅 架构概览

启用注入

启用此属性后,您可以将来自外部来源的流式数据 注入到主题中,以便使用的 功能 Google Cloud。如需创建用于注入的导入主题,请参阅以下内容:

启用消息保留功能

指定 Pub/Sub 主题在发布后保留消息的时间。超过消息保留时长之后,Pub/Sub 可以随意舍弃消息,无论其确认状态为何。会对发布到主题的所有消息收取消息存储费用

  • 默认值 = 未启用
  • 最小值 = 10 分钟
  • 最大值 = 31 天

将消息数据导出到 BigQuery 中

启用此属性后,您可以创建 BigQuery 订阅,以便在收到消息时将其写入现有 BigQuery 表。您无需配置单独的订阅者客户端。如需详细了解 BigQuery 订阅, 请参阅 BigQuery 订阅

将消息数据备份到 Cloud Storage

启用此属性后,您可以创建 Cloud Storage 订阅,以便在收到消息时将其写入现有 Cloud Storage 表。您无需配置单独的订阅者客户端。如需详细了解 Cloud Storage 订阅, 请参阅 Cloud Storage 订阅

转换

借助主题 SMT,您可以直接在 Pub/Sub 内对消息数据及属性进行轻量级修改。借助此功能,您可以在将消息发布到主题之前进行数据清理、过滤或转换格式。

如需详细了解 SMT,请参阅SMT 概览

Google-owned and Google-managed encryption key

指定使用 Google-owned and Google-managed encryption keys加密主题。默认情况下,Pub/Sub 使用 加密消息 Google-owned and Google-managed encryption keys ,因此选择 此选项会保留默认行为。Google 会自动处理密钥管理和轮替,确保您的消息始终受到最强大的可用加密保护。此选项无需进一步配置。 如需详细了解 Google-owned and Google-managed encryption keys, 请参阅使用进行默认加密 Google-owned and Google-managed encryption keys

Cloud KMS 密钥

指定主题是否使用客户管理的加密密钥 (CMEK) 进行加密。默认情况下,Pub/Sub 使用加密消息 Google-owned and Google-managed encryption keys 。如果您指定此选项,Pub/Sub 会通过 CMEK 使用信封加密模式。在这种方法中,Cloud KMS 不会加密消息。相反,Cloud KMS 会加密 Pub/Sub 为每个主题创建的数据加密密钥 (DEK)。Pub/Sub 使用系统为相应主题生成的最新 DEK 对消息进行加密。在消息即将传送给订阅者之前,Pub/Sub 会对消息进行解密。 如需详细了解如何创建密钥,请参阅配置消息加密

创建主题

您必须先创建一个主题,然后才能向其发布消息或订阅该主题。

控制台

如需创建主题,请按以下步骤操作:

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 创建主题 页面。

    前往“创建主题”

  2. 主题 ID 字段中,输入主题 ID。如需详细了解如何命名主题,请参阅 命名准则

  3. 如需为主题创建默认订阅,请选择添加默认订阅。此选项默认处于启用状态。

  4. 可选。如需将架构与主题搭配使用,请点击使用架构 并提供架构。如需了解详情,请参阅 在创建主题时创建和关联架构

  5. 对于标准主题,请将启用注入 保持为未选中状态。

  6. 可选。如需在发布消息后保留消息,请选择启用消息保留功能 。以天、小时和分钟为单位选择保留期限。如需了解详情,请参阅 启用消息保留功能

  7. 可选。如需将发布的消息导出到 BigQuery 表,请选择将数据导出到 BigQuery 并输入表的详细信息。 如需了解详情,请参阅 创建 BigQuery 订阅

  8. 可选。如需将发布的消息备份到 Cloud Storage 存储桶,请选择将消息数据备份到 Cloud Storage 并输入 Cloud Storage 存储桶的详细信息。如需了解详情,请参阅 创建 Cloud Storage 订阅

  9. 可选。在转换下,添加一个或多个 单条消息转换 (SMT),以 操作和过滤消息数据。如需了解详情,请参阅 创建具有 SMT 的主题

  10. 可选。如需使用客户管理的加密密钥 (CMEK) 加密消息,请选择 Cloud KMS 密钥 。默认情况下,Pub/Sub 使用 Google 默认加密,无需 CMEK。如需了解更多 信息,请参阅配置消息加密

  11. 可选。如需管理与主题关联的密钥,请点击 管理密钥。如需了解详情,请参阅 标记概览

  12. 点击创建主题

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在控制台的底部启动,并显示命令行提示符。 Google Cloud Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境 。该会话可能需要几秒钟来完成初始化。

  2. 如需创建主题,请运行 gcloud pubsub topics create 命令:

    gcloud pubsub topics create TOPIC_ID
    

REST

如需创建主题,请使用 projects.topics.create 方法:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认 凭据 的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是主题 ID。

回答:

{
"name": "projects/PROJECT_ID/topics/TOPIC_ID"
}

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id) {
  auto topic = client.CreateTopic(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;

public class CreateTopicSample
{
    public Topic CreateTopic(string projectId, string topicId)
    {
        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        Topic topic = null;

        try
        {
            topic = publisher.CreateTopic(topicName);
            Console.WriteLine($"Topic {topic.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Topic {topicName} already exists.");
        }
        return topic;
    }
}

Go

以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅 迁移到 v2 的指南。 如需查看 v1 代码示例列表,请参阅 已废弃的代码示例

在尝试此示例之前,请按照 《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func create(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	topic := &pubsubpb.Topic{
		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
	}
	t, err := client.TopicAdminClient.CreateTopic(ctx, topic)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Topic created: %v\n", t)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    createTopicExample(projectId, topicId);
  }

  public static void createTopicExample(String projectId, String topicId) throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);
      Topic topic = topicAdminClient.createTopic(topicName);
      System.out.println("Created topic: " + topic.getName());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopic(topicNameOrId) {
  // Creates a new topic
  await pubSubClient.createTopic(topicNameOrId);
  console.log(`Topic ${topicNameOrId} created.`);
}

Node.ts

在尝试此示例之前,请按照 《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopic(topicNameOrId: string) {
  // Creates a new topic
  await pubSubClient.createTopic(topicNameOrId);
  console.log(`Topic ${topicNameOrId} created.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 */
function create_topic($projectId, $topicName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->createTopic($topicName);

    printf('Topic created: %s' . PHP_EOL, $topic->name());
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

topic = publisher.create_topic(request={"name": topic_path})

print(f"Created topic: {topic.name}")

Ruby

以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。 如需查看 Ruby v2 代码示例列表,请参阅 已废弃的代码示例

在尝试此示例之前,请按照 《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
topic_admin = pubsub.topic_admin

topic = topic_admin.create_topic name: pubsub.topic_path(topic_id)

puts "Topic #{topic.name} created."

组织政策限制条件

组织政策可能会限制主题创建,例如,某项政策可能会限制在 Compute Engine 区域中存储消息。为避免主题创建错误,请在创建主题之前检查并更新组织政策(如有必要)。

如果您的项目是新创建的,请等待几分钟,让组织政策初始化,然后再创建主题。

前往组织政策

如需了解详情,请参阅配置消息存储政策

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。