查看 Connect 集群

您可以查看 Connect 集群的详细信息,以详细了解其配置详细信息,包括主 Google Cloud Managed Service for Apache Kafka 集群的名称、Connect 集群的状态、资源分配、网络设置以及 Connect 集群托管的连接器。

如需查看您的 Connect 集群,您可以使用 Google Cloud 控制台、gcloud CLI、客户端库或 Managed Kafka API。您无法使用开源 Apache Kafka API 列出 Connect 集群。

查看 Connect 集群所需的角色和权限

如需获得列出 Connect 集群所需的权限,请让您的管理员为您授予项目的 Managed Kafka Viewer(roles/managedkafka.viewer) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含列出 Connect 集群所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要具备以下权限才能列出您的 Connect 集群:

  • 在指定位置授予列出集群的权限: managedkafka.connectClusters.list
  • 在指定位置授予获取集群详情的权限: managedkafka.connectClusters.get

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解 Managed Kafka Viewer 角色,请参阅 Managed Service for Apache Kafka 预定义角色

查看 Connect 集群详情

控制台

  1. 在 Google Cloud 控制台中,前往连接集群页面。

    前往“关联集群”

  2. 点击要查看的 Connect 集群。

  3. 系统会显示“连接集群”页面,其中包含四个标签页:

    • 资源:集中显示集群中正在运行的连接器,包括其运行状态和类型。
    • 配置:显示 Connect 集群的基本属性和设置,例如主 Kafka 集群、状态、资源分配和网络设置。
    • 日志:提供来自 Connect 集群的日志条目的实时数据流,用于监控和问题排查。
    • 监控:提供指标,帮助您监控 Connect 集群的性能和资源利用率。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connect-clusters describe 命令查看 Connect 集群的详细信息:

    gcloud managed-kafka connect-clusters describe CONNECT_CLUSTER \
        --location=LOCATION
    
  3. 替换以下内容:

    • CONNECT_CLUSTER:您要查看的 Connect 集群的 ID。
    • LOCATION:Connect 集群的位置。
  4. 输出示例:

    capacityConfig:
    memoryBytes: '3221225472'
    vcpuCount: '3'
    createTime: '2025-03-05T15:19:17.998009888Z'
    gcpConfig:
    accessConfig:
    networkConfigs:
    -   primarySubnet: projects/sample-project/regions/us-central1/subnetworks/default
    kafkaCluster: projects/sample-project/locations/us-central1/clusters/kafka-test
    name: projects/sample-project/locations/us-central1/connectClusters/my-connect-cluster
    state: ACTIVE
    updateTime: '2025-03-05T15:24:40.861655595Z'
    

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func getConnectCluster(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
    	req := &managedkafkapb.GetConnectClusterRequest{
    		Name: clusterPath,
    	}
    	cluster, err := client.GetConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.GetConnectCluster got err: %w", err)
    	}
    	fmt.Fprintf(w, "Got connect cluster: %#v\n", cluster)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    
    public class GetConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        getConnectCluster(projectId, region, clusterId);
      }
    
      public static void getConnectCluster(String projectId, String region, String clusterId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          // This operation is being handled synchronously.
          ConnectCluster connectCluster = managedKafkaConnectClient
              .getConnectCluster(ConnectClusterName.of(projectId, region, clusterId));
          System.out.println(connectCluster.getAllFields());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.getConnectCluster got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.api_core.exceptions import NotFound
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    
    client = ManagedKafkaConnectClient()
    
    cluster_path = client.connect_cluster_path(project_id, region, connect_cluster_id)
    request = managedkafka_v1.GetConnectClusterRequest(
        name=cluster_path,
    )
    
    try:
        cluster = client.get_connect_cluster(request=request)
        print("Got Connect cluster:", cluster)
    except NotFound as e:
        print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e}")
    

Connect 集群详情概览

系统会显示“连接集群”页面,其中包含四个标签页,分别是资源配置监控日志

资源

“连接集群”页面上的资源标签页会汇总已部署的连接器类型及其运行状态。系统会显示以下信息:

  • 连接器表:列出 Connect 集群上所有连接器的表。

    • 名称:每个连接器的指定名称。

    • 状态:连接器的当前运行状态。

    • 连接器类型:指明连接器的类型,例如 MirrorMaker 2.0 Source、BigQuery Sink、Cloud Storage Sink 或 Pub/Sub Sink。

  • 过滤条件:一个搜索栏,您可以使用它按属性过滤连接器。

如需监控连接器健康状况,请使用资源标签页。

配置

配置标签页会显示有关集群状态、资源分配、网络设置和 DNS 配置的信息。

  • Kafka 主集群:显示与此 Connect 集群关联的 Managed Service for Apache Kafka 集群的名称。Connect 集群将其配置和偏移量存储在 Managed Service for Apache Kafka 集群中。

  • State:显示 Connect 集群的当前状态。可能的状态包括 ActiveCreatingDeletingState_unspecified

  • 区域:指示 Connect 集群的区域。

  • vCPUs:显示分配给 Connect 集群的虚拟 CPU 数量。vCPU 数量越多,集群处理能力越强。

  • 内存:显示 Connect 集群的总预配内存。

  • 标签:显示附加到 Connect 集群的标签。

  • 密文资源:列出与 Connect 集群关联的密文。

  • 子网:列出与 Connect 集群关联的子网。 该表格包含“名称”“区域”和“项目”列。

  • DNS 名称:显示为 Connect 集群配置的所有自定义 DNS 名称。如果未配置任何自定义 DNS 名称,则该表为空。

监控

监控标签页提供的指标可帮助您了解连接器的性能和资源利用率。指标包括:

  • 工作器 CPU 用量:每个工作器的总 CPU 用量(以 vCPU 秒为单位)。 此指标有助于识别资源限制。

  • 工作器内存用量:每个工作器的当前内存用量(以字节为单位)。 此指标有助于防止出现内存不足错误。

  • 连接器传入字节速率:每个连接器的平均每秒传入字节速率(来自客户端连接)。此指标有助于评估数据提取率。

  • 连接器传出字节速率:每个连接器每秒从客户端连接到服务器的平均传出字节速率。此指标有助于监控数据传送速率。

日志

日志标签页会实时显示来自 Connect 集群的日志条目流。您可以使用此标签页监控连接器的运行状况、活动并排查问题。这些功能可帮助您有效地监控和调试 Connect 集群:

  • 按严重程度过滤:按严重程度过滤日志条目,以便快速找出严重问题。

  • 搜索:搜索日志条目中的特定关键字或短语,以查找与特定连接器或任务相关的事件。

  • 时间戳:每条日志条目上的时间戳有助于跟踪事件序列并确定操作发生的时间。

  • 摘要:日志条目提供有关各种事件的详细信息,例如连接器启动和关闭、任务执行和数据处理。

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。