列出您的 Google Cloud Managed Service for Apache Kafka 主题

如需列出集群中的主题,您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库、Managed Kafka API 或开源 Apache Kafka API。

列出主题所需的角色和权限

如需获得列出主题所需的权限,请让您的管理员为您授予项目的 Managed Kafka Viewer (roles/managedkafka.viewer) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含列出主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要具备以下权限才能列出主题:

  • 列出主题: managedkafka.topics.list
  • 获取主题: managedkafka.topics.get

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解 Managed Kafka Viewer (roles/managedkafka.viewer) IAM 角色,请参阅 Managed Service for Apache Kafka 预定义角色

列出您的主题

控制台

  1. 在 Google Cloud 控制台中,前往集群页面。

    转到“集群”

    系统会列出您在项目中创建的集群。

  2. 点击要查看主题的聚类。

    系统会显示集群详情页面。在集群详情页面中,资源标签页会列出主题。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud managed-kafka topics list 命令:

    gcloud managed-kafka topics list CLUSTER_ID \
        --location=LOCATION_ID \
        --limit=LIMIT
    

    此命令会检索指定 Managed Service for Apache Kafka 集群中的所有主题的列表。您可以使用可选标志来过滤、限制和排序输出。

    替换以下内容:

    • CLUSTER_ID:您要列出其主题的集群的名称。
    • LOCATION_ID:集群的位置。
    • LIMIT:(可选)要列出的主题数上限。
  3. Kafka CLI

    运行此命令之前,请在 Compute Engine 虚拟机上安装 Kafka 命令行工具。虚拟机必须能够访问连接到 Managed Service for Apache Kafka 集群的子网。按照 使用 Kafka 命令行工具生成和使用消息中的说明操作。

    运行 kafka-topics.sh 命令,如下所示:

    kafka-topics.sh --list \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties
    

    替换以下内容:

    • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 集群的引导地址

    REST

    在使用任何请求数据之前,请先进行以下替换:

    • PROJECT_ID:您的 Google Cloud 项目 ID
    • LOCATION:集群的位置
    • CLUSTER_ID:集群的 ID

    HTTP 方法和网址:

    GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics

    如需发送您的请求,请展开以下选项之一:

    您应该收到类似以下内容的 JSON 响应:

    {
      "topics": [
        {
          "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/__remote_log_metadata",
          "partitionCount": 50,
          "replicationFactor": 3,
          "configs": {
            "remote.storage.enable": "false",
            "cleanup.policy": "delete",
            "retention.ms": "-1"
          }
        },
        {
          "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
          "partitionCount": 3,
          "replicationFactor": 3
        }
      ]
    }
    
    

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/iterator"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func listTopics(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	req := &managedkafkapb.ListTopicsRequest{
    		Parent: clusterPath,
    	}
    	topicIter := client.ListTopics(ctx, req)
    	for {
    		res, err := topicIter.Next()
    		if err == iterator.Done {
    			break
    		}
    		if err != nil {
    			return fmt.Errorf("topicIter.Next() got err: %w", err)
    		}
    		fmt.Fprintf(w, "Got topic: %v", res)
    	}
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import java.io.IOException;
    
    public class ListTopics {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        listTopics(projectId, region, clusterId);
      }
    
      public static void listTopics(String projectId, String region, String clusterId)
          throws Exception {
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          ClusterName clusterName = ClusterName.of(projectId, region, clusterId);
          // This operation is being handled synchronously.
          for (Topic topic : managedKafkaClient.listTopics(clusterName).iterateAll()) {
            System.out.println(topic.getAllFields());
          }
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.listTopics got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    request = managedkafka_v1.ListTopicsRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
    )
    
    response = client.list_topics(request=request)
    for topic in response:
        print("Got topic:", topic)
    

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。