列出 Google Cloud Managed Service for Apache Kafka 主題

如要列出叢集中的主題,可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫、Managed Kafka API 或開放原始碼 Apache Kafka API。

列出主題所需的角色和權限

如要取得列出主題所需的權限,請要求管理員授予您專案的「Managed Kafka 檢視者 」(roles/managedkafka.viewer) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備列出主題所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要列出主題,必須具備下列權限:

  • 列出主題: managedkafka.topics.list
  • 取得主題: managedkafka.topics.get

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 Managed Kafka 檢視者 (roles/managedkafka.viewer) IAM 角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。

列出主題

控制台

  1. 前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

    系統會列出您在專案中建立的叢集。

  2. 按一下要查看主題的叢集。

    系統隨即會顯示叢集詳細資料頁面。在叢集詳細資料頁面中,主題會列在「資源」分頁中。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud managed-kafka topics list 指令:

    gcloud managed-kafka topics list CLUSTER_ID \
        --location=LOCATION_ID \
        --limit=LIMIT
    

    這項指令會擷取指定 Managed Service for Apache Kafka 叢集內的所有主題清單。您可以使用選用旗標來篩選、限制及排序輸出內容。

    更改下列內容:

    • CLUSTER_ID:要列出主題的叢集名稱。
    • LOCATION_ID:叢集位置。
    • LIMIT:(選用) 要列出的主題數量上限。
  3. Kafka CLI

    執行這項指令前,請先在 Compute Engine VM 上安裝 Kafka 指令列工具。VM 必須能夠連上與 Managed Service for Apache Kafka 叢集連線的子網路。請按照「 使用 Kafka 指令列工具產生及取用訊息」中的操作說明進行操作。

    執行 kafka-topics.sh 指令,如下所示:

    kafka-topics.sh --list \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties
    

    更改下列內容:

    • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 叢集的啟動位址

    REST

    使用任何要求資料之前,請先修改下列項目的值:

    • PROJECT_ID:您的 Google Cloud 專案 ID
    • LOCATION:叢集位置
    • CLUSTER_ID:叢集 ID

    HTTP 方法和網址:

    GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics

    請展開以下其中一個選項,以傳送要求:

    您應該會收到如下的 JSON 回覆:

    {
      "topics": [
        {
          "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/__remote_log_metadata",
          "partitionCount": 50,
          "replicationFactor": 3,
          "configs": {
            "remote.storage.enable": "false",
            "cleanup.policy": "delete",
            "retention.ms": "-1"
          }
        },
        {
          "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
          "partitionCount": 3,
          "replicationFactor": 3
        }
      ]
    }
    
    

    Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/iterator"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func listTopics(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	req := &managedkafkapb.ListTopicsRequest{
    		Parent: clusterPath,
    	}
    	topicIter := client.ListTopics(ctx, req)
    	for {
    		res, err := topicIter.Next()
    		if err == iterator.Done {
    			break
    		}
    		if err != nil {
    			return fmt.Errorf("topicIter.Next() got err: %w", err)
    		}
    		fmt.Fprintf(w, "Got topic: %v", res)
    	}
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import java.io.IOException;
    
    public class ListTopics {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        listTopics(projectId, region, clusterId);
      }
    
      public static void listTopics(String projectId, String region, String clusterId)
          throws Exception {
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          ClusterName clusterName = ClusterName.of(projectId, region, clusterId);
          // This operation is being handled synchronously.
          for (Topic topic : managedKafkaClient.listTopics(clusterName).iterateAll()) {
            System.out.println(topic.getAllFields());
          }
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.listTopics got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    request = managedkafka_v1.ListTopicsRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
    )
    
    response = client.list_topics(request=request)
    for topic in response:
        print("Got topic:", topic)
    

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。