查看 Google Cloud Managed Service for Apache Kafka 主题

如需查看单个主题的详细信息,您可以使用Google Cloud 控制台、Google Cloud CLI、客户端库、Managed Kafka API 或开源 Apache Kafka API。

查看主题所需的角色和权限

如需获得查看主题所需的权限,请让您的管理员为您授予项目的 Managed Kafka Viewer (roles/managedkafka.viewer) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含查看主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需查看主题,您需要具备以下权限:

  • 列出主题: managedkafka.topics.list
  • 获取主题: managedkafka.topics.get

您也可以使用自定义角色或其他预定义角色来获取这些权限。

控制台中的主题属性

在控制台中,您可以查看以下主题属性:

  • 配置:此标签页提供有关主题的一般配置详细信息,包括以下内容:

    • 名称:集群内主题的唯一标识符。

    • 分区:主题中的分区数量。分区将主题的数据划分为多个段,以实现可伸缩性和并行性。

    • 副本数:为每个分区维护的副本数量,以确保数据冗余和可用性。

    • 集群:主题所属的 Managed Service for Apache Kafka 集群的名称。

    • 区域:集群和主题所在的 Google Cloud 区域。

    • 非默认主题参数:已为主题设置的任何主题级配置替换项,不同于集群范围的默认设置。

  • 监控:此标签页提供直观的图表,用于显示与主题活动和效果相关的关键指标。这些图表包括:

    • 字节数:一个时间序列图表,显示了字节生成或发送到主题的速率。这表示一段时间内发布到主题的数据量。相应指标为 managedkafka.googleapis.com/byte_in_count

    • 请求数:表示向主题发出的请求速率的时间序列图表。它反映了主题的总体活动和使用情况。相关指标为 managedkafka.googleapis.com/topic_request_count

    • 按分区划分的日志段:此图表显示了主题中每个分区的有效日志段数量。日志段是 Kafka 在磁盘上存储主题数据的物理文件。相关指标为 managedkafka.googleapis.com/log_segments

  • 使用方群组:此部分列出了订阅相应主题的使用方群组。消费者群组是一组协同工作以从主题中读取消息的消费者。

查看主题

控制台

  1. 在 Google Cloud 控制台中,前往集群页面。

    转到“集群”

    系统会列出您在项目中创建的集群。

  2. 点击要查看主题的聚类。

    系统会显示集群详情页面。在集群详情页面中,资源标签页会列出主题。

  3. 如需查看特定主题,请点击相应主题名称。

    系统会显示主题详情页面。

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Google Cloud 控制台的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟时间来完成初始化。

  2. 运行 gcloud managed-kafka topics describe 命令:

    gcloud managed-kafka topics describe TOPIC_ID \
      --cluster=CLUSTER_ID --location=LOCATION_ID
    

    此命令会提取并显示有关指定主题的全面详细信息。此信息包括其配置设置,例如分区数、复制因子和任何主题级配置替换。

    替换以下内容:

    • TOPIC_ID:主题的 ID。
    • CLUSTER_ID:包含相应主题的集群的 ID。
    • LOCATION_ID:集群的位置。

gcloud managed-kafka topics describe 命令会显示有关主题的最少信息,例如分区数和复制因子。如需获取更详细的信息,包括分区分配和完整的配置设置,请使用 kafka-topics.sh 命令行工具。

Kafka CLI

运行此命令之前,请在 Compute Engine 虚拟机上安装 Kafka 命令行工具。虚拟机必须能够访问连接到 Managed Service for Apache Kafka 集群的子网。请按照 使用 Kafka 命令行工具生成和使用消息中的说明操作。

如需查看有关主题的详细信息,请运行 kafka-topics.sh --describe 命令:

kafka-topics.sh --describe \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID

替换以下内容:

  • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 集群的引导地址
  • TOPIC_ID:主题的 ID。

此命令会返回主题的部分属性,包括以下属性:

  • 分区数
  • 复制因子
  • 分区分配
  • 动态配置(您明确设置的设置)
  • 静态配置(在集群启动时应用的设置)

如需查看主题的完整配置设置(包括具有默认值的设置),请运行 kafka-configs.sh --describe 命令:

kafka-configs.sh --describe \
--bootstrap-server=BOOTSTRAP_ADDRESS \
--command-config client.properties \
--entity-type topics \
--entity-name TOPIC_ID \
--all

输出是一系列以键值对形式表示的设置。--all 标志会返回所有配置设置。如需仅获取动态配置设置的列表,请省略 --all 标志。

REST

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT_ID:您的 Google Cloud 项目 ID
  • LOCATION:集群的位置
  • CLUSTER_ID:集群的 ID
  • TOPIC_ID:主题的 ID

HTTP 方法和网址:

GET https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Go

在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func getTopic(w io.Writer, projectID, region, clusterID, topicID string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	req := &managedkafkapb.GetTopicRequest{
		Name: topicPath,
	}
	topic, err := client.GetTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.GetTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Got topic: %#v\n", topic)
	return nil
}

Java

在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅 为本地开发环境设置 ADC

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;

public class GetTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    getTopic(projectId, region, clusterId, topicId);
  }

  public static void getTopic(String projectId, String region, String clusterId, String topicId)
      throws Exception {
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      // This operation is being handled synchronously.
      Topic topic =
          managedKafkaClient.getTopic(TopicName.of(projectId, region, clusterId, topicId));
      System.out.println(topic.getAllFields());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.getTopic got err: %s", e.getMessage());
    }
  }
}

Python

在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置 ADC

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"

client = managedkafka_v1.ManagedKafkaClient()

topic_path = client.topic_path(project_id, region, cluster_id, topic_id)
request = managedkafka_v1.GetTopicRequest(
    name=topic_path,
)

try:
    topic = client.get_topic(request=request)
    print("Got topic:", topic)
except NotFound as e:
    print(f"Failed to get topic {topic_id} with error: {e.message}")

接下来怎么做?