查看 Google Cloud Managed Service for Apache Kafka 主題

如要查看單一主題的詳細資訊,可以使用Google Cloud 控制台、Google Cloud CLI、用戶端程式庫、Managed Kafka API 或開放原始碼 Apache Kafka API。

查看主題所需的角色和權限

如要取得查看主題所需的權限,請要求管理員授予您專案的「Managed Kafka 檢視者 」(roles/managedkafka.viewer) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備查看主題所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要查看主題,必須具備下列權限:

  • 列出主題: managedkafka.topics.list
  • 取得主題: managedkafka.topics.get

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 Managed Kafka 檢視者 (roles/managedkafka.viewer) IAM 角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。

控制台中的主題屬性

在控制台中,您可以查看下列主題屬性:

  • 設定:這個分頁提供主題的一般設定詳細資料,包括:

    • 名稱:叢集內主題的專屬 ID。

    • 分區:主題中的分區數量。分區會將主題的資料分割成多個區段,以利擴充及平行處理。

    • 副本:為每個分區維護的副本數量,確保資料備援和可用性。

    • 叢集:主題所屬的 Managed Service for Apache Kafka 叢集名稱。

    • 地區:叢集和主題所在的 Google Cloud 地區。

    • 非預設主題參數:為主題設定的任何主題層級覆寫,與叢集範圍的預設值不同。

  • 監控:這個分頁會顯示視覺化圖表,呈現與主題活動和成效相關的重要指標。這些圖表包括:

    • 位元組計數:時間序列圖表,顯示產生或傳送至主題的位元組速率。這表示一段時間內發布至主題的資料量。對應的指標為 managedkafka.googleapis.com/byte_in_count

    • 要求數量:時間序列圖表,代表對主題提出的要求率。這項指標反映了主題的整體活動和使用情況。相關指標為 managedkafka.googleapis.com/topic_request_count

    • 依資料分割記錄區隔:這張圖表會顯示主題中每個資料分割的有效記錄區隔數量。記錄區段是磁碟上的實體檔案,Kafka 會在其中儲存主題資料。相關指標為 managedkafka.googleapis.com/log_segments

  • 消費者群組:這個部分會列出訂閱該主題的消費者群組。取用端群組是一組取用端,共同讀取主題中的訊息。

查看主題

控制台

  1. 前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

    系統會列出您在專案中建立的叢集。

  2. 按一下要查看主題的叢集。

    系統隨即會顯示叢集詳細資料頁面。在叢集詳細資料頁面中,主題會列在「資源」分頁中。

  3. 如要查看特定主題,請按一下主題名稱。

    系統隨即會顯示主題詳細資料頁面。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud managed-kafka topics describe 指令:

    gcloud managed-kafka topics describe TOPIC_ID \
      --cluster=CLUSTER_ID --location=LOCATION_ID
    

    這項指令會擷取並顯示指定主題的完整詳細資料。這類資訊包括設定,例如分割區數量、複寫因數,以及任何主題層級的設定覆寫。

    更改下列內容:

    • TOPIC_ID:主題 ID。
    • CLUSTER_ID:包含主題的叢集 ID。
    • LOCATION_ID:叢集位置。
  3. gcloud managed-kafka topics describe 指令會顯示主題的最低資訊量,例如分割區計數和複製因數。如要取得更詳細的資訊,包括分割區指派和完整設定,請使用 kafka-topics.sh 指令列工具。

    Kafka CLI

    執行這項指令前,請先在 Compute Engine VM 上安裝 Kafka 指令列工具。VM 必須能夠連上與 Managed Service for Apache Kafka 叢集連線的子網路。請按照「 使用 Kafka 指令列工具產生及取用訊息」中的操作說明進行操作。

    如要查看主題的詳細資料,請執行 kafka-topics.sh --describe 指令:

    kafka-topics.sh --describe \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties \
      --topic TOPIC_ID
    

    更改下列內容:

    • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 叢集的啟動位址
    • TOPIC_ID:主題 ID。

    這項指令會傳回主題屬性的子集,包括下列屬性:

    • 分區數量
    • 複製係數
    • 分區指派
    • 動態設定 (您明確設定的設定)
    • 靜態設定 (叢集啟動時套用的設定)

    如要查看主題的完整設定 (包括預設值設定),請執行 kafka-configs.sh --describe 指令:

    kafka-configs.sh --describe \
    --bootstrap-server=BOOTSTRAP_ADDRESS \
    --command-config client.properties \
    --entity-type topics \
    --entity-name TOPIC_ID \
    --all
    

    輸出內容為鍵/值組合形式的設定清單。--all 旗標會傳回所有設定。如要只取得動態設定清單,請省略 --all 旗標。

    REST

    使用任何要求資料之前,請先修改下列項目的值:

    • PROJECT_ID:您的 Google Cloud 專案 ID
    • LOCATION:叢集位置
    • CLUSTER_ID:叢集 ID
    • TOPIC_ID:主題的 ID

    HTTP 方法和網址:

    GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID

    請展開以下其中一個選項,以傳送要求:

    您應該會收到如下的 JSON 回覆:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func getTopic(w io.Writer, projectID, region, clusterID, topicID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	req := &managedkafkapb.GetTopicRequest{
    		Name: topicPath,
    	}
    	topic, err := client.GetTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.GetTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Got topic: %#v\n", topic)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import java.io.IOException;
    
    public class GetTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        getTopic(projectId, region, clusterId, topicId);
      }
    
      public static void getTopic(String projectId, String region, String clusterId, String topicId)
          throws Exception {
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          // This operation is being handled synchronously.
          Topic topic =
              managedKafkaClient.getTopic(TopicName.of(projectId, region, clusterId, topicId));
          System.out.println(topic.getAllFields());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.getTopic got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic_path = client.topic_path(project_id, region, cluster_id, topic_id)
    request = managedkafka_v1.GetTopicRequest(
        name=topic_path,
    )
    
    try:
        topic = client.get_topic(request=request)
        print("Got topic:", topic)
    except NotFound as e:
        print(f"Failed to get topic {topic_id} with error: {e.message}")
    

後續步驟