查看连接器

查看连接器的详细信息可让您检查其配置、运行状态、任务重启政策,并监控其效果指标。

如需查看 Connect 集群中连接器的详细信息,您可以使用 Google Cloud 控制台、gcloud CLI、Managed Service for Apache Kafka 客户端库或 Managed Kafka API。您无法使用开源 Apache Kafka API 查看连接器。

查看连接器所需的角色和权限

如需获得查看连接器所需的权限,请让管理员向您授予项目的 Managed Kafka Viewer (roles/managedkafka.viewer) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含查看连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需查看连接器,您需要具备以下权限:

  • 在父级 Connect 集群上授予 list connectors 权限: managedkafka.connectors.list
  • 在父级 Connect 集群上授予“获取连接器详细信息”权限: managedkafka.connectors.get

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解 Managed Kafka Viewer 角色,请参阅 Managed Service for Apache Kafka 预定义角色

查看连接器详细信息

控制台

  1. 在 Google Cloud 控制台中,前往连接集群页面。

    前往“关联集群”

  2. 点击要查看的连接器所在的 Connect 集群。

    系统会显示连接集群详情页面。

  3. 资源标签页中,找到列表中的连接器,然后点击其名称。系统会将您重定向到连接器详情页面。

  4. 连接器详情页面会显示以下标签页:

    • 配置:显示连接器的配置,包括:
      • 名称:连接器的名称。
      • 状态:连接器的运行状态。例如,跑步。
      • 任务重启政策:用于重启失败任务的政策。 例如,是否重启失败的任务,以及使用哪种退避设置。
      • 配置属性:定义连接器配置的键值对列表。
    • 监控:提供用于监控连接器的图表,例如:
      • 任务错误数:遇到错误的任务数。
      • 有效任务数:当前有效任务的数量。

    此页面还包含用于修改删除暂停停止重新启动连接器的按钮。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connectors describe 命令描述连接器:

    gcloud managed-kafka connectors describe CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    替换以下内容:

    • CONNECTOR_ID:必填。您要描述的连接器的 ID。
    • LOCATION:必填。包含连接器的 Connect 集群的位置。
    • CONNECT_CLUSTER_ID:必填。包含连接器的 Connect 集群的 ID。
  3. 示例命令:

    gcloud managed-kafka connectors describe test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster
    

    输出示例:

    config:
      connector.class: com.google.cloud.kafka.connect.pubsub.PubsubSinkConnector
      kafka.topic.regex: .*
      key.converter: org.apache.kafka.connect.storage.StringConverter
      project: test-project
      tasks.max: '1'
      topic: test-pubsub-topic
      value.converter: org.apache.kafka.connect.json.JsonConverter
      value.converter.schemas.enable: 'false'
    createTime: '2024-03-13T05:17:34.123456Z'
    labels:
      test-label-key: test-label-value
    name: projects/test-project/locations/us-central1/connectClusters/test-connect-cluster/connectors/test-connector
    state: RUNNING
    taskRestartPolicy: RESTART_WITH_EXPONENTIAL_BACKOFF
    updateTime: '2024-03-13T05:18:15.987654Z'
    

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func getConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.GetConnectorRequest{
    		Name: connectorPath,
    	}
    	connector, err := client.GetConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.GetConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Got connector: %#v\n", connector)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    
    public class GetConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        getConnector(projectId, region, clusterId, connectorId);
      }
    
      public static void getConnector(
          String projectId, String region, String clusterId, String connectorId) throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          ConnectorName name = ConnectorName.of(projectId, region, clusterId, connectorId);
          // This operation is handled synchronously.
          Connector connector = managedKafkaConnectClient.getConnector(name);
          System.out.println(connector.getAllFields());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.getConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.api_core.exceptions import NotFound
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    connector_path = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    request = managedkafka_v1.GetConnectorRequest(
        name=connector_path,
    )
    
    try:
        connector = connect_client.get_connector(request=request)
        print("Got connector:", connector)
    except NotFound as e:
        print(f"Failed to get connector {connector_id} with error: {e}")
    

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。