查看連接器

查看連接器的詳細資料時,您可以檢查其設定、運作狀態、工作重新啟動政策,以及監控其成效指標。

如要查看 Connect 叢集中連接器的詳細資料,可以使用 Google Cloud 控制台、gcloud CLI、Managed Service for Apache Kafka 用戶端程式庫或 Managed Kafka API。您無法使用開放原始碼 Apache Kafka API 查看連接器。

查看連結器所需的角色和權限

如要取得查看連線器所需的權限,請要求管理員授予您專案的「代管 Kafka 檢視者 」(roles/managedkafka.viewer) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備查看連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要查看連接器,必須具備下列權限:

  • 在父項 Connect 叢集上授予 list connectors 權限: managedkafka.connectors.list
  • 在父項 Connect 叢集上授予「取得連結器詳細資料」權限: managedkafka.connectors.get

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 Managed Kafka Viewer 角色,請參閱「Managed Service for Apache Kafka 預先定義的角色」。

查看連接器詳細資料

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要查看的連接器所屬的 Connect 叢集。

    系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。

  3. 在「資源」分頁中,找出清單中的連結器,然後按一下連結器名稱。系統會將您重新導向至「連接器詳細資料」頁面。

  4. 「連線器詳細資料」頁面會顯示下列分頁:

    • 設定:顯示連結器的設定,包括:
      • 名稱:連接器的名稱。
      • 狀態:連接器的運作狀態。例如「跑步」。
      • 工作重新啟動政策:重新啟動失敗工作的政策。 例如是否要重新啟動失敗的工作,以及使用哪些退避設定。
      • 設定屬性:定義連線器設定的鍵/值組合清單。
    • 監控:提供監控連接器的圖表,例如:
      • 工作錯誤數:發生錯誤的工作數量。
      • 有效工作數:目前有效的工作數。

    這個頁面也提供按鈕,可編輯刪除暫停停止重新啟動連結器。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connectors describe 指令描述連接器:

    gcloud managed-kafka connectors describe CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID
    

    更改下列內容:

    • CONNECTOR_ID:必填,要說明的連接器 ID。
    • LOCATION:必填,含有連接器的 Connect 叢集位置。
    • CONNECT_CLUSTER_ID:必填,包含連接器的 Connect 叢集 ID。
  3. 範例指令:

    gcloud managed-kafka connectors describe test-connector \
      --location=us-central1 \
      --connect-cluster=test-connect-cluster
    

    輸出內容範例:

    config:
      connector.class: com.google.cloud.kafka.connect.pubsub.PubsubSinkConnector
      kafka.topic.regex: .*
      key.converter: org.apache.kafka.connect.storage.StringConverter
      project: test-project
      tasks.max: '1'
      topic: test-pubsub-topic
      value.converter: org.apache.kafka.connect.json.JsonConverter
      value.converter.schemas.enable: 'false'
    createTime: '2024-03-13T05:17:34.123456Z'
    labels:
      test-label-key: test-label-value
    name: projects/test-project/locations/us-central1/connectClusters/test-connect-cluster/connectors/test-connector
    state: RUNNING
    taskRestartPolicy: RESTART_WITH_EXPONENTIAL_BACKOFF
    updateTime: '2024-03-13T05:18:15.987654Z'
    

    Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func getConnector(w io.Writer, projectID, region, connectClusterID, connectorID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "my-connector"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	connectorPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s/connectors/%s", projectID, region, connectClusterID, connectorID)
    	req := &managedkafkapb.GetConnectorRequest{
    		Name: connectorPath,
    	}
    	connector, err := client.GetConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.GetConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Got connector: %#v\n", connector)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    
    public class GetConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String connectorId = "my-connector";
        getConnector(projectId, region, clusterId, connectorId);
      }
    
      public static void getConnector(
          String projectId, String region, String clusterId, String connectorId) throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          ConnectorName name = ConnectorName.of(projectId, region, clusterId, connectorId);
          // This operation is handled synchronously.
          Connector connector = managedKafkaConnectClient.getConnector(name);
          System.out.println(connector.getAllFields());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.getConnector got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import NotFound
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # connector_id = "my-connector"
    
    connect_client = ManagedKafkaConnectClient()
    
    connector_path = connect_client.connector_path(
        project_id, region, connect_cluster_id, connector_id
    )
    request = managedkafka_v1.GetConnectorRequest(
        name=connector_path,
    )
    
    try:
        connector = connect_client.get_connector(request=request)
        print("Got connector:", connector)
    except NotFound as e:
        print(f"Failed to get connector {connector_id} with error: {e}")
    

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。