查看連結叢集

您可以查看 Connect 叢集的詳細資料,進一步瞭解其設定詳細資料,包括主要 Google Cloud Managed Service for Apache Kafka 叢集的名稱、Connect 叢集的狀態、資源分配、網路設定,以及 Connect 叢集代管的連接器。

如要查看 Connect 叢集,可以使用 Google Cloud 控制台、gcloud CLI、用戶端程式庫或 Managed Kafka API。您無法使用開放原始碼 Apache Kafka API 列出 Connect 叢集。

查看 Connect 叢集所需的角色和權限

如要取得列出 Connect 叢集所需的權限,請要求管理員授予您專案的 Managed Kafka 檢視者(roles/managedkafka.viewer) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色具備列出 Connect 叢集所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要列出 Connect 叢集,必須具備下列權限:

  • 在指定位置授予列出叢集的權限: managedkafka.connectClusters.list
  • 在指定位置授予取得叢集詳細資料的權限: managedkafka.connectClusters.get

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解 Managed Kafka Viewer 角色,請參閱「Managed Service for Apache Kafka 預先定義角色」。

查看 Connect 叢集詳細資料

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要查看的 Connect 叢集。

  3. 「Connect cluster」(連結叢集) 頁面隨即顯示,並提供四個分頁:

    • 資源:集中顯示叢集上執行的連接器,包括運作狀態和類型。
    • 「設定」:顯示 Connect 叢集的重要屬性和設定,例如主要 Kafka 叢集、狀態、資源分配和網路設定。
    • 記錄:提供來自 Connect 叢集的記錄項目即時串流,用於監控和疑難排解。
    • 監控:提供指標,協助您監控 Connect 叢集的效能和資源用量。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud managed-kafka connect-clusters describe 指令查看 Connect 叢集的詳細資料:

    gcloud managed-kafka connect-clusters describe CONNECT_CLUSTER \
        --location=LOCATION
    
  3. 更改下列內容:

    • CONNECT_CLUSTER:要查看的 Connect 叢集 ID。
    • LOCATION:Connect 叢集的位置。
  4. 輸出內容範例:

    capacityConfig:
    memoryBytes: '3221225472'
    vcpuCount: '3'
    createTime: '2025-03-05T15:19:17.998009888Z'
    gcpConfig:
    accessConfig:
    networkConfigs:
    -   primarySubnet: projects/sample-project/regions/us-central1/subnetworks/default
    kafkaCluster: projects/sample-project/locations/us-central1/clusters/kafka-test
    name: projects/sample-project/locations/us-central1/connectClusters/my-connect-cluster
    state: ACTIVE
    updateTime: '2025-03-05T15:24:40.861655595Z'
    

    Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func getConnectCluster(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
    	req := &managedkafkapb.GetConnectClusterRequest{
    		Name: clusterPath,
    	}
    	cluster, err := client.GetConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.GetConnectCluster got err: %w", err)
    	}
    	fmt.Fprintf(w, "Got connect cluster: %#v\n", cluster)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    
    public class GetConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        getConnectCluster(projectId, region, clusterId);
      }
    
      public static void getConnectCluster(String projectId, String region, String clusterId)
          throws Exception {
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          // This operation is being handled synchronously.
          ConnectCluster connectCluster = managedKafkaConnectClient
              .getConnectCluster(ConnectClusterName.of(projectId, region, clusterId));
          System.out.println(connectCluster.getAllFields());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.getConnectCluster got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import NotFound
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    
    client = ManagedKafkaConnectClient()
    
    cluster_path = client.connect_cluster_path(project_id, region, connect_cluster_id)
    request = managedkafka_v1.GetConnectClusterRequest(
        name=cluster_path,
    )
    
    try:
        cluster = client.get_connect_cluster(request=request)
        print("Got Connect cluster:", cluster)
    except NotFound as e:
        print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e}")
    

Connect 叢集詳細資料總覽

系統會顯示「Connect cluster」(連結叢集) 頁面,其中包含「Resources」(資源)、「Configurations」(設定)、「Monitoring」(監控) 和「Logs」(記錄) 四個分頁。

資源

「連結叢集」頁面的「資源」分頁會匯總已部署的連接器類型及其運作狀態。系統會顯示下列資訊:

  • 連接器資料表:列出 Connect 叢集上所有連接器的資料表。

    • 名稱:各個連接器的指派名稱。

    • 狀態:連接器的目前運作狀態。

    • 連接器類型:指出連接器類型,例如 MirrorMaker 2.0 來源、BigQuery 接收器、Cloud Storage 接收器或 Pub/Sub 接收器。

  • 篩選器:搜尋列,可用於依連接器屬性篩選連接器。

如要監控連結器健康狀態,請使用「資源」分頁。

設定

「Configurations」(設定) 分頁標籤會顯示叢集狀態、資源分配、網路設定和 DNS 設定等資訊。

  • 主要 Kafka 叢集:顯示與這個 Connect 叢集相關聯的 Managed Service for Apache Kafka 叢集名稱。Connect 叢集會在 Managed Service for Apache Kafka 叢集中儲存設定和偏移。

  • 狀態:顯示 Connect 叢集的目前狀態。可能狀態包括 ActiveCreatingDeletingState_unspecified

  • 區域:指出 Connect 叢集的區域。

  • vCPUs:顯示分配給 Connect 叢集的虛擬 CPU 數量。vCPU 數量越多,叢集處理容量就越大。

  • 記憶體:顯示 Connect 叢集的佈建記憶體總量。

  • 標籤:顯示附加至 Connect 叢集的標籤。

  • Secret 資源:列出與 Connect 叢集相關聯的密鑰。

  • 子網路:列出與 Connect 叢集相關聯的子網路。 這個表格包含「名稱」、「區域」和「專案」的資料欄。

  • DNS 名稱:顯示為 Connect 叢集設定的任何自訂 DNS 名稱。如果未設定任何自訂 DNS 名稱,這個表格會是空白。

監控

「監控」分頁提供各項指標,協助您瞭解連接器的效能和資源使用率。指標包括:

  • 工作站 CPU 使用量:每個工作站的 CPU 使用量總計 (以 vCPU 秒為單位)。這項指標有助於找出資源限制。

  • 工作站記憶體用量:每個工作站目前的記憶體用量 (以位元組為單位)。 這項指標有助於避免記憶體不足的錯誤。

  • 連接器傳入位元組速率:每個連接器每秒從用戶端連線傳入的平均位元組速率。這項指標有助於評估資料擷取率。

  • 連接器傳出位元組速率:每個連接器從用戶端連線傳送至伺服器的每秒平均傳出位元組速率。這項指標有助於監控資料傳送率。

記錄

「記錄」分頁會即時串流顯示 Connect 叢集的記錄項目。您可以使用這個分頁監控連接器健康狀態、活動,以及排解問題。這些功能可協助您有效監控及偵錯 Connect 叢集:

  • 依嚴重性篩選:依嚴重性等級篩選記錄項目,快速找出重大問題。

  • 搜尋:搜尋特定關鍵字或詞組的記錄項目,找出與特定連接器或工作相關的事件。

  • 時間戳記:每筆記錄項目都有時間戳記,可協助追蹤事件順序,並判斷動作發生時間。

  • 摘要:記錄項目提供各種事件的詳細資料,例如連接器啟動和關閉、工作執行和資料處理。

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。