列出所有连接器

列出在 Connect 集群中运行的所有连接器,可大致了解已配置的数据集成。您可以监控连接器的运行状况和状态,发现潜在问题,并有效管理数据流。

如需列出 Connect 集群中的所有连接器,您可以使用 Google Cloud 控制台、gcloud CLI、Managed Service for Apache Kafka 客户端库或 Managed Kafka API。您无法使用开源 Apache Kafka API 列出连接器。

列出所有连接器所需的角色和权限

如需获得列出所有连接器所需的权限,请让您的管理员为您授予项目的 Managed Kafka Viewer (roles/managedkafka.viewer) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含列出所有连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要拥有以下权限才能列出所有连接器:

  • 在父级 Connect 集群上授予 list connectors 权限: managedkafka.connectors.list
  • 在父级 Connect 集群上授予“获取连接器详细信息”权限: managedkafka.connectors.get

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解 Managed Kafka Viewer 角色,请参阅 Managed Service for Apache Kafka 预定义角色

查看所有连接器

通过此视图,您可以快速监控连接器的状态,并确定需要注意的连接器。然后,您可以深入了解各个连接器,根据需要查看其详细信息和配置。

控制台

  1. 在 Google Cloud 控制台中,前往连接集群页面。

    前往“关联集群”

  2. 点击要列出连接器的 Connect 集群。

    系统会显示连接集群详情页面。

  3. 资源标签页会显示集群中运行的所有连接器的列表。此列表包含每个连接器的以下信息:

    • 名称:连接器的名称。
    • 状态:连接器的运行状态。例如,正在运行、失败。
    • 连接器类型:连接器插件的类型。

    您可以使用过滤选项按名称搜索特定连接器。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connectors list 命令列出连接器:

    gcloud managed-kafka connectors list CONNECT_CLUSTER_ID \
        --location=LOCATION
    
  3. 如需进一步优化连接器列表,您可以使用其他标志:

    gcloud managed-kafka connectors list CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--filter=EXPRESSION] \
        [--limit=LIMIT] \
        [--page-size=PAGE_SIZE] \
        [--sort-by=SORT_BY]
    

    替换以下内容:

    • CONNECT_CLUSTER_ID:必填。包含您要列出的连接器的 Connect 集群的 ID。
    • LOCATION:必填。包含要列出的连接器的 Connect 集群的位置。
    • EXPRESSION:(可选)要应用于列表的布尔值过滤条件表达式。如果表达式的计算结果为 True,则相应项会包含在列表中。如需了解更多详情和示例,请运行 gcloud topic filters

      示例:

      • 如需仅列出处于“RUNNING”状态的连接器,请运行以下命令:

        --filter="state=RUNNING"
        
      • 如需仅列出“Pub/Sub 接收器”连接器,请执行以下操作:

        --filter="connector_plugin='Pub/Sub Sink'"
        
      • 如需列出名称中包含“prod”的连接器,请运行以下命令:

        --filter="name ~ 'prod'"
        
      • 如需列出状态为“FAILED”或属于“Pub/Sub Source”插件的连接器,请执行以下操作:

        --filter="state=FAILED OR connector_plugin='Pub/Sub Source'"
        
    • LIMIT:(可选)要显示的最大连接器数量。如果未指定,则列出所有连接器。

    • PAGE_SIZE:(可选)每页显示的结果数。如果未指定,服务将确定合适的页面大小。

    • SORT_BY:(可选)以英文逗号分隔的排序依据字段列表。默认排序顺序为升序。如需按降序排序,请在相应字段前面加上 ~。支持的字段可能为 namestate

  4. 包含排序的命令示例:

    gcloud managed-kafka connectors list test-connect-cluster \
        --location=us-central1 \
        --sort-by=~state,name
    

    包含过滤和限制的命令示例:

    gcloud managed-kafka connectors list test-connect-cluster \
        --location=us-central1 \
        --filter="state=RUNNING AND connector_plugin='Pub/Sub Sink'" \
        --limit=5
    

    输出示例:

    NAME                                    STATE     CONNECTOR_PLUGIN
    pubsub-sink-connector                   RUNNING   Pub/Sub Sink
    another-pubsub-sink                     RUNNING   Pub/Sub Sink
    prod-pubsub-sink                        RUNNING   Pub/Sub Sink
    

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/iterator"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func listConnectors(w io.Writer, projectID, region, connectClusterID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    	req := &managedkafkapb.ListConnectorsRequest{
    		Parent: parent,
    	}
    	connectorIter := client.ListConnectors(ctx, req)
    	for {
    		connector, err := connectorIter.Next()
    		if err == iterator.Done {
    			break
    		}
    		if err != nil {
    			return fmt.Errorf("connectorIter.Next() got err: %w", err)
    		}
    		fmt.Fprintf(w, "Got connector: %v", connector)
    	}
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    
    public class ListConnectors {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        listConnectors(projectId, region, clusterId);
      }
    
      public static void listConnectors(String projectId, String region, String clusterId)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          ConnectClusterName parent = ConnectClusterName.of(projectId, region, clusterId);
          // This operation is handled synchronously.
          for (Connector connector : managedKafkaConnectClient.listConnectors(parent).iterateAll()) {
            System.out.println(connector.getAllFields());
          }
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.listConnectors got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.api_core.exceptions import GoogleAPICallError
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.ListConnectorsRequest(
        parent=connect_client.connect_cluster_path(project_id, region, connect_cluster_id),
    )
    
    try:
        response = connect_client.list_connectors(request=request)
        for connector in response:
            print("Got connector:", connector)
    except GoogleAPICallError as e:
        print(f"Failed to list connectors with error: {e}")
    

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。