建立連結叢集

Connect 叢集提供連接器環境,可協助您將資料從現有的 Kafka 部署作業移至 Google Cloud Managed Service for Apache Kafka 叢集,或將資料從 Managed Service for Apache Kafka 叢集移至其他 Google Cloud 服務或 Kafka 叢集。次要 Kafka 叢集可以是另一個 Google Cloud Managed Service for Apache Kafka 叢集、自行管理的叢集,或是地端叢集。

事前準備

請確認您已建立 Managed Service for Apache Kafka 叢集。您需要 Connect 叢集要附加的 Managed Service for Apache Kafka 叢集名稱。

每個 Connect 叢集都會與一個 Managed Service for Apache Kafka 叢集相關聯。這個叢集會儲存在 Connect 叢集上執行的連接器狀態。

建立 Connect 叢集所需的角色和權限

如要取得建立 Connect 叢集所需的權限,請要求管理員授予您專案的代管 Kafka Connect 叢集編輯者 (roles/managedkafka.connectClusterEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備建立 Connect 叢集所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要建立 Connect 叢集,您必須具備下列權限:

  • 在指定位置授予建立 Connect 叢集的權限: managedkafka.connectClusters.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解這個角色,請參閱「Managed Service for Apache Kafka 預先定義的角色」。

必要的 ACL 主體

根據預設,如果未設定存取控制清單,Managed Service for Apache Kafka 叢集會允許 Connect 叢集存取資源。方法是將 allow.everyone.if.no.acl.found 設為 true,這是預設設定。

不過,如果 Managed Service for Apache Kafka 叢集已設定 ACL,Connect 叢集不會自動取得資源的讀寫權限。您必須手動授予。

在存取控制清單中做為主體的 Connect 叢集服務帳戶格式如下:User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com

如果您已在 Kafka 叢集上設定 ACL,請使用下列指令,將主題的讀寫權限和消費者群組的讀取權限授予 Connect 叢集:

/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --operation WRITE --topic *
/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --group *

如要進一步瞭解這些指令,請參閱「設定 Apache Kafka ACL,進行精細的存取權控管」。

在其他專案中建立連結叢集

建立 Connect 叢集時,該叢集會與同一專案中的 Managed Service for Apache Kafka 叢集共用服務代理。如果這個 Managed Service for Apache Kafka 叢集指定為附加至 Connect 叢集的主要 Kafka 叢集,則不需要其他權限。

服務代理的格式為 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com。專案編號是指包含 Connect 叢集和 Managed Service for Apache Kafka 叢集的專案。

如果 Connect 叢集位於專案 A,而相關聯的 Managed Service for Apache Kafka 叢集位於專案 B,請按照下列步驟操作:

  1. 確認已為專案 A 和專案 B 啟用 Managed Kafka API。

    啟用 API

  2. 找出專案 A 中 Connect 叢集的服務代理程式。

    服務代理的格式為 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com

  3. 在專案 B 中,將 Managed Kafka Client 角色 (roles/managedkafka.client) 授予 Connect 叢集的服務帳戶。

    這個角色會授予連線至 Managed Service for Apache Kafka 叢集所需的權限,並執行讀取和寫入資料等作業。

    如要進一步瞭解如何授予角色,請參閱「建立及授予服務代理程式角色」。

授予權限時,請務必遵循最小權限原則。只授予必要權限,確保安全性並防範未經授權的存取行為。

連結叢集的屬性

本節說明 Connect 叢集的屬性。

Connect 叢集名稱

您要建立的 Connect 叢集名稱。如要查看 Connect 叢集的命名準則,請參閱「Managed Service for Apache Kafka 資源命名指南」。叢集名稱無法變更。

主要 Kafka 叢集

與 Connect 叢集相關聯的 Managed Service for Apache Kafka 叢集。這個相關聯的叢集 (主要叢集) 會儲存 Connect 叢集上執行的連接器狀態。一般而言,主要 Managed Service for Apache Kafka 叢集也是所有來源連接器的目的地,以及在 Connect 叢集上執行的所有接收器連接器的輸入內容。

單一 Managed Service for Apache Kafka 叢集可以有多個 Connect 叢集。如果您選擇不同專案中的 Managed Service for Apache Kafka 叢集,請確保已設定適當的權限

建立 Connect 叢集後,就無法更新為其他 Kafka 叢集。

地區主機代管服務的延遲時間和網路費用優勢

在同一個區域中,將 Managed Service for Apache Kafka 和 Connect 叢集放在一起,可減少延遲和網路費用。舉例來說,假設您的 Managed Service for Apache Kafka 叢集位於 region-a,且您使用接收器連接器將資料從這個 Managed Service for Apache Kafka 叢集 (來源) 寫入同樣位於 region-a 的 BigQuery 資料表 (接收器)。如果您在 region-a 中部署 Connect 叢集,這個部署選項可將 BigQuery 寫入作業的延遲時間降至最低,並免除 Managed Service for Apache Kafka 叢集與 Connect 叢集之間的跨區域網路傳輸費用。

多系統延遲和成本考量

Kafka Connect 會使用連接器在系統之間移動資料。連接器的一端一律會與 Managed Service for Apache Kafka 叢集互動。單一 Kafka Connect 叢集可以執行多個連接器,每個連接器可做為「來源」 (從系統提取資料) 或「接收器」 (將資料推送至系統)。

雖然與 Managed Service for Apache Kafka 叢集位於同一區域的 Connect 叢集,可享有兩者之間較低的通訊延遲,但每個連接器也會與其他系統互動,例如 BigQuery 表格或其他 Kafka 叢集。即使 Connect 叢集和 Managed Service for Apache Kafka 叢集位於同一位置,該其他系統也可能位於不同區域。這會導致延遲時間和成本增加。整體管道延遲時間取決於三個系統的位置:Managed Service for Apache Kafka 叢集、Connect 叢集,以及來源或接收器系統。

舉例來說,如果您的 Managed Service for Apache Kafka 叢集位於 region-a,Connect 叢集位於 region-b,且您使用 Cloud Storage 連接器存取 region-c 中的 bucket,系統會針對兩個網路躍點向您收費 (region-aregion-b,然後 region-bregion-c,或反向,視連接器方向而定)。

規劃 Connect 叢集放置位置時,請仔細考量所有相關區域,盡量減少延遲時間和成本。

容量設定

請在「運算資源設定」專區,提供 Connect 叢集每個 vCPU 的 vCPU 數量和記憶體容量。建立 Connect 叢集後,您可以更新叢集容量。以下是容量設定的屬性:

  • vCPU:指派給 Connect 叢集的 vCPU 數量。最小值為 3 個 vCPU。

  • 記憶體:為每個 vCPU 指派的記憶體容量。每個 vCPU 必須佈建 1 GiB 至 8 GiB 的記憶體。叢集建立後,您可以在這些限制內增加或減少記憶體容量。

    舉例來說,如果您建立的叢集有 6 個 vCPU,則可分配給叢集的記憶體容量下限為 6 GiB (每個 vCPU 1 GiB),上限為 48 GiB (每個 vCPU 8 GiB)。

分配給 Connect 叢集中每個工作站的 vCPU 和記憶體,對叢集的效能、容量和成本有顯著影響。以下是 vCPU 和記憶體對 Connect 叢集的影響。

vCPU 數量

  • Kafka Connect 會將連接器的工作劃分為多項工作。每項工作都能平行處理資料。vCPU 越多,可同時執行的工作就越多,總處理量也會隨之提高。

  • 增加 vCPU 會提高 Connect 叢集的費用。

記憶體

  • Kafka Connect 會使用記憶體緩衝處理連接器和 Managed Service for Apache Kafka 之間流動的資料。記憶體越大,緩衝區就越大。記憶體容量越大,總處理量就越高,尤其是處理大量資料串流時。處理大型訊息或記錄的連接器需要足夠的記憶體,才能處理這些項目,不會發生 OutOfMemoryError 例外狀況。

  • 記憶體越多,Connect 叢集的費用就越高。

  • 如果您使用大量轉換邏輯,則需要更多記憶體分配。

您的目標是為 Connect 叢集選擇合適的容量設定。 如要執行這項操作,您必須瞭解 Connect 叢集可處理的輸送量。

工作站 (主要) 子網路

工作子網路 (也稱為主要子網路) 可將虛擬私有雲網路連線至 Connect 叢集。這個子網路可讓叢集工作站連上消費者網路中來源和接收器的端點,例如 Managed Service for Apache Kafka 叢集或自行代管的 Kafka 叢集。

設定工作站子網路時,請注意下列事項:

  • 必須提供工作站子網路。

  • 子網路必須與 Connect 叢集位於相同區域。

  • 子網路必須與主要 Kafka 叢集已連線子網路清單中,其中一個子網路位於相同的父項 VPC。

  • 子網路 CIDR 範圍的大小不得小於 /22 (1024 個位址)。

叢集工作站會使用 Private Service Connect 介面,在工作站子網路中指派 IP 位址。只要符合下列條件,工作站就能連線至子網路虛擬私有雲網路可存取的任何網路目的地:

  • 端點不得位於 172.16.0.0/14 CIDR 範圍內。這個範圍保留供 Managed Service for Apache Kafka Connect 內部使用。
  • 防火牆規則必須允許流量。請參閱「設定網路連結的安全防護機制」。
  • 如要處理網際網路流量,您必須設定 Cloud NAT。舉例來說,如果 Kafka 叢集可透過網際網路存取,您必須設定 Cloud NAT,MirrorMaker 連接器才能複製該叢集的資料。
  • 如要存取與工作站子網路虛擬私有雲不同的虛擬私有雲中的 Private Service Connect 端點,請務必使用支援的消費者設定 (例如 NCC)。詳情請參閱「透過端點存取已發布的服務」。

可解析的 DNS 網域

可解析的 DNS 網域 (也稱為 DNS 網域名稱) 可讓租戶 VPC 使用用戶 VPC 網路中的 DNS 位址。這項設定可讓 Connect 叢集將 DNS 名稱解析為 IP 位址,方便與其他服務通訊,包括 MirrorMaker 連接器的其他 Kafka 叢集。

對於可解析的 DNS 網域,您可以選取 Managed Service for Apache Kafka 叢集。您不需要為主要 Managed Service for Apache Kafka 叢集設定 DNS 網域名稱,因為啟動程序位址會自動納入可解析的 DNS 網域清單。

不過,您也可以手動指定 DNS 網域,如果選取外部 Kafka 叢集,就必須這麼做。系統會自動納入主要 Managed Service for Apache Kafka 叢集的 DNS 網域。其他 Kafka 叢集仍須設定 DNS 網域。

Secret Manager 資源

指定要載入工作站的 Secret Manager。 這些密鑰會安全地儲存在 Secret Manager 中,並提供給 Connect 叢集使用。

您也可以在連接器設定中使用 Secret Manager。舉例來說,您可以將金鑰檔案載入 Connect 叢集,並讓連接器讀取該檔案。Secret Manager 會以檔案形式掛接至工作人員。

Connect 叢集會直接與 Secret Manager 整合。 您必須使用 Secret Manager 儲存及管理密鑰。

指定密鑰的格式為:projects/{PROJECT_ID}/secrets/{SECRET_NAME}/versions/{VERSION_ID}

  • PROJECT_ID:Secret Manager Secret 所在的專案 ID。

  • SECRET_NAME:Secret Manager 中的 Secret 名稱。

  • VERSION_ID:Secret 的特定版本號碼。例如「1」、「2」、「3」。

單一 Connect 叢集最多可載入 32 個密鑰。

請確認執行 Connect 工作人員的服務代理程式,在您要使用的密鑰上具有 secretmanager.secretAccessor 角色 (Secret Manager 密鑰存取者)。這個角色可讓 Connect 叢集從 Secret Manager 擷取密鑰值。

標籤

標籤是鍵/值組合,可協助您整理及識別資源。 方便您整理 Connect 叢集。您可以為每個 Connect 叢集加上標籤,並根據標籤篩選資源。標籤範例包括 environment:prodapplication:web-app

建立連結叢集

建立叢集前,請先參閱連結叢集屬性的說明文件。

建立 Connect 叢集需要 20 到 30 分鐘。

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 點選「建立」

    「建立連結叢集」頁面隨即開啟。

  3. 在「Connect cluster name」(Connect 叢集名稱) 中輸入字串。

    如要進一步瞭解如何命名 Connect 叢集,請參閱「Managed Service for Apache Kafka 資源命名指南」。

  4. 在「Primary Kafka cluster」(主要 Kafka 叢集) 中,從選單選取 Managed Service for Apache Kafka 叢集。

    如要進一步瞭解這個 Managed Service for Apache Kafka 叢集執行的函式,請參閱「主要 Kafka 叢集」。

  5. 在「位置」部分,從「區域」選單中選取支援的位置,或保留預設值。

    如要進一步瞭解如何選取合適的位置,請參閱「主要 Kafka 叢集」。

  6. 在「運算資源設定」專區,輸入「vCPU」和「記憶體」的值,或保留預設值。

    在「vCPU」vCPUs欄位中,輸入叢集的虛擬 CPU 數量。

    在「記憶體」欄位中,輸入每個 CPU 的記憶體容量 (以 GiB 為單位)。如果每個 CPU 的記憶體大於 8 GiB,系統會顯示錯誤訊息。

    如要進一步瞭解如何調整 Managed Service for Apache Kafka 叢集大小,請參閱「運算資源設定」。

  7. 在「Network configuration」(網路設定) 部分,從「Network」(網路) 選單中,選取或保留主要 Managed Service for Apache Kafka 叢集的網路。

  8. 在「工作站子網路」部分,從選單中選取或保留子網路。

    「子網路 URI 路徑」欄位會自動填入資料。詳情請參閱「工作人員子網路」。

  9. 如果是可解析的 DNS 網域,系統會自動將主要 Kafka 叢集的 DNS 網域新增為可解析的 DNS 網域。

    如要新增其他 DNS 網域,請視需要展開這個部分。

  10. 按一下「新增 DNS 網域」

    從選單中選取 Kafka 叢集。

    系統會自動填入 DNS 網域。您也可以輸入外部 Kafka 叢集的 DNS 網域名稱。

    按一下 [完成]

  11. 如為 Secret Manager 資源,請視需要展開該部分。

  12. 按一下「新增密鑰資源」

  13. 從「Secret」選單選取 Secret,然後從「Secret version」選單選取版本。你也可以建立新的 Secret

    請確認執行 Connect 工作人員的服務代理程式,對您要使用的密鑰具有 Secret Manager 密鑰存取者角色。如要進一步瞭解 Secret Manager,請參閱「Secret Manager 資源」。

  14. 按一下 [完成]

  15. 如要新增其他 Secret,請按一下「Add secret resource」(新增 Secret 資源)

  16. 如要查看「標籤」,請視需要展開該部分。

    如要整理專案,請以鍵/值組合的形式為資源新增任意標籤。

    按一下「新增標籤」,加入不同的環境、服務、擁有者、團隊等。

  17. 點選「建立」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 請執行 gcloud managed-kafka connect-clusters create 指令:

    gcloud managed-kafka connect-clusters create CONNECT_CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --primary-subnet=WORKER_SUBNET \
        --kafka-cluster=KAFKA_CLUSTER \
        [--project=PROJECT_ID] \
        [--secret=SECRET] \
        [--dns-name=DNS_DOMAIN_NAME] \
        [--config-file=CONFIG_FILE] \
        [--labels=LABELS]
        [--async]
    

    更改下列內容:

    • CONNECT_CLUSTER_ID:Connect 叢集的 ID 或名稱。如要查看 Connect 叢集的命名準則,請參閱 Managed Service for Apache Kafka 資源命名指南。 Connect 叢集的名稱無法變更。

    • LOCATION:您建立 Connect 叢集的位置。必須是支援的 Google Cloud區域。 Connect 叢集建立後即無法變更位置。如要查詢可用位置清單,請參閱「Managed Service for Apache Kafka 位置」。如要進一步瞭解位置建議,請參閱「主要 Kafka 叢集」。

    • CPU:Connect 叢集的 vCPU 數量。最小值為 3 個 vCPU。請參閱「vCPU 數量」。

    • MEMORY:Connect 叢集的記憶體量。請使用「MB」、「MiB」、「GB」、「GiB」、「TB」或「TiB」單位。例如「3GiB」。每個 vCPU 必須佈建 1 GiB 至 8 GiB 的記憶體量。 請參閱「記憶體」。

    • WORKER_SUBNET:Connect 叢集的 Worker 子網路。

      子網路的格式為 projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID

      worker 子網路必須與 Connect 叢集位於相同區域。

    • PROJECT_ID:(選用)Google Cloud 專案的 ID。如未提供,則會使用目前的專案。

    • KAFKA_CLUSTER:與 Connect 叢集相關聯的主要 Managed Service for Apache Kafka 叢集 ID 或完整名稱。請參閱 Kafka 叢集。Kafka 叢集的格式為 projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID

      建立 Connect 叢集後,就無法更新為其他 Kafka 叢集。

    • SECRET:(選用) 要載入工作人員的密鑰。 必須提供 Secret Manager 的確切 Secret 版本,不支援別名。一個叢集最多可載入 32 個密鑰。格式:projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID

    • DNS_DOMAIN_NAME:(選用) 子網路的 DNS 網域名稱,可供 Connect Cluster 顯示。Connect 叢集可使用網域名稱存取資源,不必依賴 IP 位址。請參閱「DNS 對等互連」。

    • LABELS:(選用) 要與叢集建立關聯的標籤。如要進一步瞭解標籤格式,請參閱「標籤」。要新增的標籤 KEY=VALUE 組合清單。鍵的開頭須為小寫字元,且只能包含連字號 (-)、底線 (_)、小寫字元和數字。值只能包含連字號 (-)、底線 (_)、小寫字元和數字。

    • CONFIG_FILE:(選用) 包含設定的 JSON 或 YAML 檔案路徑,這些設定會覆寫叢集或連接器預設值。這個檔案也支援內嵌 JSON 或 YAML。

    • --async:(選用) 立即返回,不必等待執行中的作業完成。使用 --async 旗標,您可以在背景建立叢集時繼續執行其他工作。如果您未使用這個標記,系統會等待作業完成,再傳回回應。您必須等到叢集完全更新,才能繼續執行其他工作。

    您會收到類似以下的回覆:

    Create request issued for: [sample-connectcluster]
    Check operation [projects/test-project/locations/us-east1/operations/operation-1753590328249-63ae19098cc06-64300a0a-06512d02] for status.
    

    儲存 OPERATION_ID 即可追蹤進度。舉例來說,這裡的值是 operation-1753590328249-63ae19098cc06-64300a0a-06512d02

  3. Terraform

    您可以使用 Terraform 資源建立 Connect 叢集

    resource "google_managed_kafka_connect_cluster" "default" {
      provider           = google-beta
      project            = data.google_project.default.project_id
      connect_cluster_id = "my-connect-cluster-id"
      location           = "us-central1"
      kafka_cluster      = google_managed_kafka_cluster.default.id
      capacity_config {
        vcpu_count   = 12
        memory_bytes = 12884901888 # 12 GiB
      }
      gcp_config {
        access_config {
          network_configs {
            primary_subnet = google_compute_subnetwork.default.id
          }
        }
      }
    }

    如要瞭解如何套用或移除 Terraform 設定,請參閱「基本 Terraform 指令」。

    Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
    	clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)
    
    	// Capacity configuration with 12 vCPU and 12 GiB RAM
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		VcpuCount:   12,
    		MemoryBytes: 12884901888, // 12 GiB in bytes
    	}
    
    	// Optionally, you can also specify accessible subnets and resolvable DNS
    	// domains as part of your network configuration. For example:
    	// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
    	// 	{
    	// 		PrimarySubnet:      primarySubnet,
    	// 		AdditionalSubnets:  []string{"subnet-1", "subnet-2"},
    	// 		DnsDomainNames:     []string{"domain-1", "domain-2"},
    	// 	},
    	// }
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		KafkaCluster:   kafkaCluster,
    		CapacityConfig: capacityConfig,
    	}
    
    	req := &managedkafkapb.CreateConnectClusterRequest{
    		Parent:           locationPath,
    		ConnectClusterId: clusterID,
    		ConnectCluster:   connectCluster,
    	}
    	op, err := client.CreateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
    	}
    	// The duration of this operation can vary considerably, typically taking 5-15 minutes.
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.RetryingFuture;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
    import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
    import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
    import com.google.cloud.managedkafka.v1.LocationName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class CreateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
        String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
        int cpu = 12;
        long memoryBytes = 12884901888L; // 12 GiB
        createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
      }
    
      public static void createConnectCluster(
          String projectId,
          String region,
          String clusterId,
          String subnet,
          String kafkaCluster,
          int cpu,
          long memoryBytes)
          throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
            .setMemoryBytes(memoryBytes).build();
        ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
            .setPrimarySubnet(subnet)
            .build();
        // Optionally, you can also specify additional accessible subnets and resolvable
        // DNS domains as part of your network configuration. For example:
        // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
        // .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
        ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
            .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setKafkaCluster(kafkaCluster)
            .build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.createConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
            .create(settingsBuilder.build())) {
          CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setConnectClusterId(clusterId)
              .setConnectCluster(connectCluster)
              .build();
    
          // The duration of this operation can vary considerably, typically taking
          // between 10-30 minutes.
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .createConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(), operation.isDone(), future.getMetadata().get().toString());
    
          while (!future.isDone()) {
            // The pollingFuture gives us the most recent status of the operation
            RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
            OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
            System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
                currentOp.getName(),
                currentOp.isDone());
          }
    
          // NOTE: future.get() blocks completion until the operation is complete (isDone
          // = True)
          ConnectCluster response = future.get();
          System.out.printf("Created connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n", 
              e.getMessage());
          throw e;
        }
      }
    }

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig
    
    # TODO(developer): Update with your values.
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # kafka_cluster_id = "my-kafka-cluster"
    # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
    # cpu = 12
    # memory_bytes = 12884901888  # 12 GiB
    
    connect_client = ManagedKafkaConnectClient()
    kafka_client = managedkafka_v1.ManagedKafkaClient()
    
    parent = connect_client.common_location_path(project_id, region)
    kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id)
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    connect_cluster.kafka_cluster = kafka_cluster_path
    connect_cluster.capacity_config.vcpu_count = cpu
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)]
    # Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration.
    # For example:
    # connect_cluster.gcp_config.access_config.network_configs = [
    #     ConnectNetworkConfig(
    #         primary_subnet=primary_subnet,
    #         additional_subnets=additional_subnets,
    #         dns_domain_names=dns_domain_names,
    #     )
    # ]
    
    request = CreateConnectClusterRequest(
        parent=parent,
        connect_cluster_id=connect_cluster_id,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.create_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        # Creating a Connect cluster can take 10-40 minutes.
        response = operation.result(timeout=3000)
        print("Created Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

監控叢集建立作業

只有在執行 gcloud CLI 建立 Connect 叢集時,才能執行下列指令。

  • 建立 Connect 叢集通常需要 20 到 30 分鐘。如要追蹤叢集建立作業的進度,gcloud managed-kafka connect-clusters create 指令會使用長時間執行的作業 (LRO),您可以透過下列指令監控這項作業:

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    更改下列內容:

    • OPERATION_ID 替換為上一節中的作業 ID 值。
    • LOCATION,並將其替換為上一節中的位置值。

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。