列出所有連接器

列出 Connect 叢集中執行的所有連接器,可概略瞭解已設定的資料整合。您可以監控連結器的健康狀態和狀態、找出潛在問題,並有效管理資料流程。

如要列出 Connect 叢集中的所有連接器,可以使用 Google Cloud 控制台、gcloud CLI、Managed Service for Apache Kafka 用戶端程式庫或 Managed Kafka API。您無法使用開放原始碼 Apache Kafka API 列出連接器。

列出所有連結器所需的角色和權限

如要取得列出所有連線器所需的權限,請要求管理員授予您專案的「代管 Kafka 檢視者 」(roles/managedkafka.viewer) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備列出所有連接器所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要列出所有連接器,必須具備下列權限:

  • 在父項 Connect 叢集上授予 list connectors 權限: managedkafka.connectors.list
  • 在父項 Connect 叢集上授予「取得連結器詳細資料」權限: managedkafka.connectors.get

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 Managed Kafka Viewer 角色,請參閱「 Managed Service for Apache Kafka 預先定義的角色」。

查看所有連接器

這個檢視畫面可讓您快速監控連結器的狀態,並找出需要注意的連結器。然後,您可以深入瞭解個別連結器,視需要查看詳細資料和設定。

控制台

  1. 前往 Google Cloud 控制台的「Connect clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要列出連接器的 Connect 叢集。

    系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。

  3. 「資源」分頁會顯示叢集中執行的所有連接器清單。清單會顯示每個連結器的下列資訊:

    • 名稱:連接器的名稱。
    • 狀態:連接器的運作狀態。例如「執行中」、「失敗」。
    • 連接器類型:連接器外掛程式的類型。

    您可以使用「篩選器」選項,依名稱搜尋特定連結器。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connectors list 指令列出連接器:

    gcloud managed-kafka connectors list CONNECT_CLUSTER_ID \
        --location=LOCATION
    
  3. 如要進一步篩選連接器清單,可以使用其他旗標:

    gcloud managed-kafka connectors list CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--filter=EXPRESSION] \
        [--limit=LIMIT] \
        [--page-size=PAGE_SIZE] \
        [--sort-by=SORT_BY]
    

    更改下列內容:

    • CONNECT_CLUSTER_ID:必填,包含要列出連接器的 Connect 叢集 ID。
    • LOCATION:必填,要列出連接器的 Connect 叢集位置。
    • EXPRESSION:(選用) 要套用至清單的布林值篩選條件運算式。如果運算式評估結果為 True,系統就會在清單中加入該項目。如需更多詳細資料和範例,請執行 gcloud topic filters

      範例:

      • 如要只列出處於「RUNNING」狀態的連接器,請執行下列操作:

        --filter="state=RUNNING"
        
      • 如要只列出「Pub/Sub Sink」連接器,請按照下列步驟操作:

        --filter="connector_plugin='Pub/Sub Sink'"
        
      • 如要列出名稱含有「prod」的連接器,請執行下列指令:

        --filter="name ~ 'prod'"
        
      • 如要列出「FAILED」或「Pub/Sub Source」外掛程式的連接器,請執行下列指令:

        --filter="state=FAILED OR connector_plugin='Pub/Sub Source'"
        
    • LIMIT:(選用) 要顯示的連接器數量上限。如未指定,則會列出所有連結器。

    • PAGE_SIZE:(選用) 每頁顯示的結果數。如未指定,服務會決定合適的網頁大小。

    • SORT_BY:(選用) 以半形逗號分隔的欄位清單,做為排序依據。預設排序順序為遞增。如要遞減排序,請在欄位前面加上 ~。支援的欄位可能是 namestate

  4. 以下是排序範例指令:

    gcloud managed-kafka connectors list test-connect-cluster \
        --location=us-central1 \
        --sort-by=~state,name
    

    以下是篩選及限制範例指令:

    gcloud managed-kafka connectors list test-connect-cluster \
        --location=us-central1 \
        --filter="state=RUNNING AND connector_plugin='Pub/Sub Sink'" \
        --limit=5
    

    輸出內容範例:

    NAME                                    STATE     CONNECTOR_PLUGIN
    pubsub-sink-connector                   RUNNING   Pub/Sub Sink
    another-pubsub-sink                     RUNNING   Pub/Sub Sink
    prod-pubsub-sink                        RUNNING   Pub/Sub Sink
    

    Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/iterator"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func listConnectors(w io.Writer, projectID, region, connectClusterID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    	req := &managedkafkapb.ListConnectorsRequest{
    		Parent: parent,
    	}
    	connectorIter := client.ListConnectors(ctx, req)
    	for {
    		connector, err := connectorIter.Next()
    		if err == iterator.Done {
    			break
    		}
    		if err != nil {
    			return fmt.Errorf("connectorIter.Next() got err: %w", err)
    		}
    		fmt.Fprintf(w, "Got connector: %v", connector)
    	}
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    
    public class ListConnectors {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        listConnectors(projectId, region, clusterId);
      }
    
      public static void listConnectors(String projectId, String region, String clusterId)
          throws IOException {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          ConnectClusterName parent = ConnectClusterName.of(projectId, region, clusterId);
          // This operation is handled synchronously.
          for (Connector connector : managedKafkaConnectClient.listConnectors(parent).iterateAll()) {
            System.out.println(connector.getAllFields());
          }
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.listConnectors got err: %s\n", e.getMessage());
        }
      }
    }

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.api_core.exceptions import GoogleAPICallError
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    
    connect_client = ManagedKafkaConnectClient()
    
    request = managedkafka_v1.ListConnectorsRequest(
        parent=connect_client.connect_cluster_path(project_id, region, connect_cluster_id),
    )
    
    try:
        response = connect_client.list_connectors(request=request)
        for connector in response:
            print("Got connector:", connector)
    except GoogleAPICallError as e:
        print(f"Failed to list connectors with error: {e}")
    

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。