建立主題

在 Pub/Sub 中,主題是具名資源,代表訊息來源。您必須先建立主題,然後才能發布或訂閱該主題。Pub/Sub 支援兩種主題:標準主題和匯入主題。

本文說明如何建立 Pub/Sub 標準主題。如要進一步瞭解匯入主題和如何建立匯入主題,請參閱「關於匯入主題」。

如要建立主題,可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Pub/Sub API。

事前準備

必要角色和權限

如要取得建立主題所需的權限,請要求管理員授予您專案的 Pub/Sub 編輯者(roles/pubsub.editor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色具備建立主題所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要建立主題,您必須具備下列權限:

  • 授予這項權限,即可在專案中建立主題: pubsub.topics.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以在專案層級和個別資源層級設定存取權控管。您可以在一個專案中建立訂閱項目,並將其附加至位於其他專案的主題。請確認您具備每個專案的必要權限。

主題的屬性

建立或更新主題時,必須指定主題的屬性。

新增預設訂閱項目

Adds a default subscription to the Pub/Sub topic. 主題建立後,您可以為該主題建立其他訂閱項目。 預設訂閱項目具有下列屬性:

  • -sub」的訂閱 ID
  • 提取傳送類型
  • 訊息保留時間為七天
  • 持續處於非活躍狀態達 31 天即失效
  • 確認期限為 10 秒
  • 立即重試政策

使用結構定義

結構定義是訊息資料欄位必須遵循的格式。結構定義是發布者和訂閱者之間的合約,Pub/Sub 會強制執行。主題結構定義有助於標準化訊息類型和權限,讓機構中的不同團隊都能使用這些訊息。Pub/Sub 會為訊息類型和權限建立中央授權單位。如要使用結構定義建立主題,請參閱結構定義總覽

啟用擷取功能

啟用這項屬性後,您就能將外部來源的串流資料擷取至主題,以便使用 Google Cloud的功能。如要建立匯入主題以供擷取,請參閱下列內容:

啟用訊息保留功能

指定 Pub/Sub 主題在發布訊息後保留訊息的時間長度。訊息保留期限過後,不論確認狀態為何,Pub/Sub 都可能會捨棄訊息。系統會儲存發布至主題的所有訊息,因此會產生訊息儲存費用

  • 預設值 = 未啟用
  • 最小值 = 10 分鐘
  • 最大值 = 31 天

將訊息資料匯出至 BigQuery

啟用這項屬性後,您就能建立 BigQuery 訂閱項目,在收到訊息時將訊息寫入現有的 BigQuery 資料表。您不需要設定個別的訂閱端。如要進一步瞭解 BigQuery 訂閱項目,請參閱「BigQuery 訂閱項目」。

將訊息資料備份至 Cloud Storage

啟用這項屬性後,您就能建立 Cloud Storage 訂閱項目,在收到訊息時將訊息寫入現有的 Cloud Storage 表格。您不需要設定個別的訂閱端。如要進一步瞭解 Cloud Storage 訂閱方案,請參閱「Cloud Storage 訂閱方案」。

轉換

主題 SMT 可直接在 Pub/Sub 中輕量修改訊息資料和屬性。這項功能可讓您在訊息發布至主題前,先清理、篩選資料或轉換格式。

如要進一步瞭解 SMT,請參閱 SMT 總覽

Google-owned and Google-managed encryption key

指出主題是使用Google-owned and Google-managed encryption keys加密。Pub/Sub 預設會使用 Google-owned and Google-managed encryption keys 加密訊息,因此選擇這個選項會維持預設行為。Google 會自動處理金鑰管理和輪替作業,確保訊息一律採用最強大的加密技術保護。這個選項不需要進一步設定。 如要進一步瞭解 Google-owned and Google-managed encryption keys,請參閱使用 Google-owned and Google-managed encryption keys進行預設加密

Cloud KMS 金鑰

指定主題是否使用客戶自行管理的加密金鑰 (CMEK) 加密。Pub/Sub 預設會使用 Google-owned and Google-managed encryption keys 加密訊息。如果您指定這個選項,Pub/Sub 會使用信封加密模式搭配 CMEK。採用這種做法時,Cloud KMS 不會加密訊息。而是加密 Pub/Sub 為每個主題建立的資料加密金鑰 (DEK)。Pub/Sub 會使用為主題產生的最新 DEK 加密訊息。Pub/Sub 會在訊息傳送給訂閱端不久前解密訊息。如要進一步瞭解如何建立金鑰,請參閱「設定訊息加密」。

建立主題

您必須先建立主題,然後才能發布或訂閱該主題。

控制台

如要建立主題,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的 Pub/Sub「建立主題」頁面。

    前往「建立主題」

  2. 在「主題 ID」欄位中,輸入主題的 ID。如要進一步瞭解如何命名主題,請參閱命名規範

  3. 如要為主題建立預設訂閱,請選取「新增預設訂閱」。這個選項預設為啟用。

  4. 選用。如要搭配主題使用結構定義,請按一下「使用結構定義」並提供結構定義。詳情請參閱「建立主題時建立及關聯結構定義」。

  5. 如果是標準主題,請取消選取「啟用擷取」

  6. 選用。如要在發布訊息後保留訊息,請選取「啟用訊息保留功能」。以天、小時和分鐘為單位選取保留期限。詳情請參閱「啟用訊息保留功能」。

  7. 選用。如要將已發布的訊息匯出至 BigQuery 資料表,請選取「將資料匯出至 BigQuery」,然後輸入資料表的詳細資料。詳情請參閱「建立 BigQuery 訂閱項目」。

  8. 選用。如要將已發布的訊息備份至 Cloud Storage 值區,請選取「將訊息資料備份至 Cloud Storage」,然後輸入 Cloud Storage 值區的詳細資料。詳情請參閱「建立 Cloud Storage 訂閱方案」。

  9. 選用。在「轉換」下方,新增一或多個單一訊息轉換 (SMT),以操控及篩選訊息資料。詳情請參閱「建立含有 SMT 的主題」。

  10. 選用。如要使用客戶自行管理的加密金鑰 (CMEK) 加密訊息,請選取「Cloud KMS key」(Cloud KMS 金鑰)。根據預設,Pub/Sub 會使用 Google 預設加密,不需要 CMEK。詳情請參閱「設定訊息加密」。

  11. 選用。如要管理與主題相關聯的金鑰,請按一下「管理金鑰」。詳情請參閱「標記總覽」。

  12. 按一下「建立主題」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 如要建立主題,請執行 gcloud pubsub topics create 指令:

    gcloud pubsub topics create TOPIC_ID
    
  3. REST

    如要建立主題,請使用 projects.topics.create 方法:

    要求必須使用 Authorization 標頭中的存取權杖進行驗證。如要取得目前應用程式預設憑證的存取權杖,請執行:gcloud auth application-default print-access-token

    PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
    Authorization: Bearer ACCESS_TOKEN
    

    其中:

    • PROJECT_ID 是您的專案 ID。
    • TOPIC_ID 是主題 ID。

    回應:

    {
    "name": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
    

    C++

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

    namespace pubsub = ::google::cloud::pubsub;
    namespace pubsub_admin = ::google::cloud::pubsub_admin;
    [](pubsub_admin::TopicAdminClient client, std::string project_id,
       std::string topic_id) {
      auto topic = client.CreateTopic(
          pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
      // Note that kAlreadyExists is a possible error when the library retries.
      if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
        std::cout << "The topic already exists\n";
        return;
      }
      if (!topic) throw std::move(topic).status();
    
      std::cout << "The topic was successfully created: " << topic->DebugString()
                << "\n";
    }

    C#

    在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件

    
    using Google.Cloud.PubSub.V1;
    using Grpc.Core;
    using System;
    
    public class CreateTopicSample
    {
        public Topic CreateTopic(string projectId, string topicId)
        {
            PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
            var topicName = TopicName.FromProjectTopic(projectId, topicId);
            Topic topic = null;
    
            try
            {
                topic = publisher.CreateTopic(topicName);
                Console.WriteLine($"Topic {topic.Name} created.");
            }
            catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
            {
                Console.WriteLine($"Topic {topicName} already exists.");
            }
            return topic;
        }
    }

    Go

    下列範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的操作說明設定 Go 環境。詳情請參閱 Pub/Sub Go API 參考說明文件

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    )
    
    func create(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	topic := &pubsubpb.Topic{
    		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    	}
    	t, err := client.TopicAdminClient.CreateTopic(ctx, topic)
    	if err != nil {
    		return fmt.Errorf("CreateTopic: %w", err)
    	}
    	fmt.Fprintf(w, "Topic created: %v\n", t)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

    
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreateTopicExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
    
        createTopicExample(projectId, topicId);
      }
    
      public static void createTopicExample(String projectId, String topicId) throws IOException {
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          TopicName topicName = TopicName.of(projectId, topicId);
          Topic topic = topicAdminClient.createTopic(topicName);
          System.out.println("Created topic: " + topic.getName());
        }
      }
    }

    Node.js

    在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createTopic(topicNameOrId) {
      // Creates a new topic
      await pubSubClient.createTopic(topicNameOrId);
      console.log(`Topic ${topicNameOrId} created.`);
    }

    Node.ts

    在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    import {PubSub} from '@google-cloud/pubsub';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createTopic(topicNameOrId: string) {
      // Creates a new topic
      await pubSubClient.createTopic(topicNameOrId);
      console.log(`Topic ${topicNameOrId} created.`);
    }

    PHP

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 PHP 設定說明進行操作。 詳情請參閱 Pub/Sub PHP API 參考說明文件

    use Google\Cloud\PubSub\PubSubClient;
    
    /**
     * Creates a Pub/Sub topic.
     *
     * @param string $projectId  The Google project ID.
     * @param string $topicName  The Pub/Sub topic name.
     */
    function create_topic($projectId, $topicName)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $topic = $pubsub->createTopic($topicName);
    
        printf('Topic created: %s' . PHP_EOL, $topic->name());
    }

    Python

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

    from google.cloud import pubsub_v1
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    topic = publisher.create_topic(request={"name": topic_path})
    
    print(f"Created topic: {topic.name}")

    Ruby

    以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例

    在試用這個範例之前,請先按照「快速入門:使用用戶端程式庫」的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

    # topic_id = "your-topic-id"
    
    pubsub = Google::Cloud::PubSub.new
    topic_admin = pubsub.topic_admin
    
    topic = topic_admin.create_topic name: pubsub.topic_path(topic_id)
    
    puts "Topic #{topic.name} created."

機構政策限制

組織政策可以限制主題建立作業,例如,政策可以限制在 Compute Engine 區域中儲存訊息。為避免建立主題時發生錯誤,請先檢查並視需要更新機構政策,再建立主題。

如果專案是新建立的,請稍候幾分鐘,等機構政策初始化後再建立主題。

前往「Organization Policies」(機構政策)

詳情請參閱「設定訊息儲存空間政策」。

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。