使用 Confluent 將 Apache Kafka 部署至 GKE

本指南說明如何使用 Confluent for Kubernetes (CFK) 運算子,在 Google Kubernetes Engine (GKE) 上部署 Apache Kafka 叢集。

Kafka 是開放原始碼的分散式發布/訂閱訊息傳遞系統,可處理大量、高輸送量和即時串流資料。您可以使用 Kafka 建構串流資料管道,在不同系統和應用程式之間穩定移動資料,以利處理和分析。

本指南適用於有興趣在 GKE 上部署 Kafka 叢集的平台管理員、雲端架構師和營運專員。

您也可以使用 CFK 運算子部署 Confluent Platform 的其他元件,例如網頁版 Confluent Control Center、Schema Registry 或 KsqlDB。不過,本指南僅著重於 Kafka 部署作業。

準備環境

在本教學課程中,您將使用 Cloud Shell 管理託管在 Google Cloud上的資源。Cloud Shell 已預先安裝本教學課程所需的軟體,包括 kubectlgcloud CLIHelmTerraform

如要使用 Cloud Shell 設定環境,請按照下列步驟操作:

  1. 在 Google Cloud 控制台中,按一下Cloud Shell 啟用圖示Google Cloud 控制台中的「啟用 Cloud Shell」,即可啟動 Cloud Shell 工作階段。系統會在 Google Cloud 控制台的底部窗格啟動工作階段。

  2. 設定環境變數:

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    

    PROJECT_ID: your Google Cloud 替換為您的專案 ID

  3. 複製 GitHub 存放區:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. 變更為工作目錄:

    cd kubernetes-engine-samples/streaming
    

建立叢集基礎架構

在本節中,您將執行 Terraform 指令碼,建立高可用性的地區 GKE 私人叢集。請按照下列步驟公開存取控制層。如要限制存取權,請建立私人叢集

您可以使用標準或 Autopilot叢集安裝運算子。

標準

下圖顯示部署在三個不同可用區的私有區域標準 GKE 叢集:

如要部署這項基礎架構,請在 Cloud Shell 中執行下列指令:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

系統顯示提示訊息時,請輸入 yes。這個指令可能需要幾分鐘才能完成,叢集也會在這段時間內顯示就緒狀態。

Terraform 會建立下列資源:

  • Kubernetes 節點的虛擬私有雲網路和私有子網路。
  • 透過 NAT 存取網際網路的路由器。
  • us-central1 地區的私人 GKE 叢集。
  • 2 個啟用自動調度資源功能的節點集區 (每個可用區 1 到 2 個節點,每個可用區至少 1 個節點)
  • 具備記錄與監控權限的 ServiceAccount
  • Backup for GKE,用於災難復原。
  • Google Cloud Managed Service for Prometheus,用於監控叢集。

輸出結果會與下列內容相似:

...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Autopilot

下圖顯示私人區域 Autopilot GKE 叢集:

如要部署基礎架構,請透過 Cloud Shell 執行下列指令:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

系統顯示提示訊息時,請輸入 yes。這個指令可能需要幾分鐘才能完成,叢集也會在這段時間內顯示就緒狀態。

Terraform 會建立下列資源:

  • Kubernetes 節點的虛擬私有雲網路和私人子網路。
  • 透過 NAT 存取網際網路的路由器。
  • us-central1 地區的私人 GKE 叢集。
  • 具備記錄與監控權限的 ServiceAccount
  • Google Cloud Managed Service for Prometheus,用於監控叢集。

輸出結果會與下列內容相似:

...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

連線至叢集

設定 kubectl 與叢集通訊:

gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}

將 CFK 運算子部署至叢集

在本節中,您將使用 Helm 圖表部署 Confluent for Kubernetes (CFK) 運算子,然後部署 Kafka 叢集。

  1. 新增 Confluent Helm 資訊套件存放區:

    helm repo add confluentinc https://packages.confluent.io/helm
    
  2. 為 CFK 運算子和 Kafka 叢集新增命名空間:

    kubectl create ns kafka
    
  3. 使用 Helm 部署 CFK 叢集運算子:

    helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
    

    如要讓 CFK 管理所有命名空間的資源,請在 Helm 指令中加入 --set-namespaced=false 參數。

  4. 確認 Confluent 運算子已使用 Helm 成功部署:

    helm ls -n kafka
    

    輸出結果會與下列內容相似:

    NAME                  NAMESPACE  REVISION UPDATED                                  STATUS      CHART                                APP VERSION
    confluent-operator    kafka      1        2023-07-07 10:57:45.409158 +0200 CEST    deployed    confluent-for-kubernetes-0.771.13    2.6.0
    

部署 Kafka

在本節中,您將以基本設定部署 Kafka,然後嘗試各種進階設定情境,以滿足可用性、安全性及可觀測性需求。

基本設定

Kafka 執行個體的基本設定包含下列元件:

  • Kafka 代理程式的三項備用資源,至少須有兩項備用資源,才能確保叢集一致性。
  • 三個 ZooKeeper 節點副本,組成一個叢集。
  • 兩個 Kafka 監聽器:一個不含驗證,另一個使用 TLS 驗證和 CFK 產生的憑證。
  • Kafka 的 Java MaxHeapSizeMinHeapSize 設為 4 GB。
  • CPU 資源分配:1 個 CPU 要求和 2 個 CPU 限制,以及 Kafka 的 5 GB 記憶體要求和限制 (主要服務為 4 GB,指標匯出工具為 0.5 GB),Zookeeper 的 3 GB 記憶體要求和限制 (主要服務為 2 GB,指標匯出工具為 0.5 GB)。
  • 使用 premium-rwo storageClass 為每個 Pod 分配 100 GB 的儲存空間,其中 Kafka 資料為 100 GB,Zookeeper 資料/記錄為 90/10 GB。
  • 為每個工作負載設定容許條件、nodeAffinity 和 podAntiAffinity,確保工作負載在節點之間適當分配,並利用各自的節點集區和不同區域。
  • 使用您提供的憑證授權單位,透過自行簽署的憑證保護叢集內的通訊安全。

這項設定代表建立可供正式環境使用的 Kafka 叢集所需的基本設定。下列各節將示範自訂設定,以解決叢集安全、存取控制清單 (ACL)、主題管理、憑證管理等問題。

建立基本 Kafka 叢集

  1. 產生 CA 金鑰組:

    openssl genrsa -out ca-key.pem 2048
    openssl req -new -key ca-key.pem -x509 \
      -days 1000 \
      -out ca.pem \
      -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
    

    Confluent for Kubernetes 會為 Confluent Platform 元件提供自動產生的憑證,用於 TLS 網路加密。您必須產生並提供憑證授權單位 (CA)。

  2. 為憑證授權單位建立 Kubernetes 密鑰:

    kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
    

    Secret 名稱是 預先定義

  3. 使用基本設定建立新的 Kafka 叢集:

    kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
    

    這個指令會建立 CFK 運算子的 Kafka 自訂資源和 Zookeeper 自訂資源,其中包含 CPU 和記憶體要求與限制、區塊儲存空間要求,以及用於在 Kubernetes 節點間分配佈建 Pod 的汙點和親和性。

  4. 請稍候片刻,等待 Kubernetes 啟動必要的工作負載:

    kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
    
  5. 確認 Kafka 工作負載已建立:

    kubectl get pod,svc,statefulset,deploy,pdb -n kafka
    

    輸出結果會與下列內容相似:

    NAME                                    READY   STATUS  RESTARTS   AGE
    pod/confluent-operator-864c74d4b4-fvpxs   1/1   Running   0        49m
    pod/my-cluster-0                        1/1   Running   0        17m
    pod/my-cluster-1                        1/1   Running   0        17m
    pod/my-cluster-2                        1/1   Running   0        17m
    pod/zookeeper-0                         1/1   Running   0        18m
    pod/zookeeper-1                         1/1   Running   0        18m
    pod/zookeeper-2                         1/1   Running   0        18m
    
    NAME                          TYPE      CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                        AGE
    service/confluent-operator    ClusterIP   10.52.13.164   <none>      7778/TCP                                                       49m
    service/my-cluster            ClusterIP   None         <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-0-internal   ClusterIP   10.52.2.242  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-1-internal   ClusterIP   10.52.7.98   <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-2-internal   ClusterIP   10.52.4.226  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/zookeeper             ClusterIP   None         <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-0-internal  ClusterIP   10.52.8.52   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-1-internal  ClusterIP   10.52.12.44  <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-2-internal  ClusterIP   10.52.12.134   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    
    NAME                        READY   AGE
    statefulset.apps/my-cluster   3/3   17m
    statefulset.apps/zookeeper  3/3   18m
    
    NAME                               READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/confluent-operator   1/1   1          1         49m
    
    NAME                                  MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
    poddisruptionbudget.policy/my-cluster   N/A           1               1                   17m
    poddisruptionbudget.policy/zookeeper  N/A           1               1                   18m
    

運算子會建立下列資源:

  • Kafka 和 ZooKeeper 各有一個 StatefulSet。
  • Kafka 代理程式副本的三個 Pod。
  • 三個 ZooKeeper 副本的 Pod。
  • 兩個 PodDisruptionBudget 資源,確保叢集一致性最多只有一個無法使用的副本。
  • 這個 Service 可做為 Kafka 用戶端的啟動伺服器,從 Kubernetes 叢集內連線。my-cluster這個服務提供所有內部 Kafka 監聽器。
  • 這項服務 zookeeper 可讓 Kafka 代理程式以用戶端身分連線至 ZooKeeper 節點。

驗證和使用者管理

本節說明如何啟用驗證和授權,確保 Kafka 監聽器安全無虞,並與用戶端共用憑證。

Confluent for Kubernetes 支援多種 Kafka 驗證方法,例如:

限制

  • CFK 不提供使用者管理用的自訂資源。不過,您可以將憑證儲存在 Secret 中,並在接聽器規格中參照 Secret。
  • 雖然沒有可直接管理 ACL 的自訂資源,但官方的 Confluent for Kubernetes 提供使用 Kafka CLI 設定 ACL 的指南。

新增使用者

本節說明如何部署 CFK 運算子,示範使用者管理功能,包括:

  • 已在其中一個接聽程式上啟用密碼驗證 (SASL/PLAIN) 的 Kafka 叢集
  • 具有 3 個副本的 KafkaTopic
  • 具備讀取和寫入權限的使用者憑證
  1. 使用使用者憑證建立 Secret:

    export USERNAME=my-user
    export PASSWORD=$(openssl rand -base64 12)
    kubectl create secret generic my-user-credentials -n kafka \
      --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
    

    憑證應採用下列格式儲存:

    {
    "username1": "password1",
    "username2": "password2",
    ...
    "usernameN": "passwordN"
    }
    
  2. 將 Kafka 叢集設定為在通訊埠 9094 上使用具備密碼驗證功能的監聽器,並採用 SCRAM-SHA-512 驗證:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
    
  3. 設定主題和用戶端 Pod,與 Kafka 叢集互動並執行 Kafka 指令:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml
    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
    

    GKE 會將 Secret my-user-credentials磁碟區的形式掛接至用戶端 Pod。

  4. 用戶端 Pod 準備就緒後,請連線至該 Pod,並使用提供的憑證開始產生及取用訊息:

    kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka
    kubectl exec -it kafkacat -n kafka -- /bin/sh
    
  5. 使用 my-user 憑證產生訊息,然後使用該訊息來驗證是否已收到。

    export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2)
    export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4)
    echo "Message from my-user" |kcat \
      -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -P
    kcat -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -C
    

    輸出結果會與下列內容相似:

    Message from my-user
    % Reached end of topic my-topic [1] at offset 1
    % Reached end of topic my-topic [2] at offset 0
    % Reached end of topic my-topic [0] at offset 0
    

    輸入 CTRL+C 即可停止消費者程序。如果收到 Connect refused 錯誤,請稍候幾分鐘,然後再試一次。

  6. 退出 Pod 殼層

    exit
    

備份與災難復原

使用 Confluent 運算子時,您可以按照特定模式實作有效率的備份策略。

您可以使用 GKE 備份服務備份下列項目:

  • Kubernetes 資源資訊清單。
  • 從備份叢集的 Kubernetes API 伺服器擷取的 Confluent API 自訂資源及其定義。
  • 與資訊清單中找到的 PersistentVolumeClaim 資源對應的磁碟區。

如要進一步瞭解如何使用 GKE 備份功能備份及還原 Kafka 叢集,請參閱「準備進行災難復原」。

您也可以手動備份 Kafka 叢集。建議備份的項目:

  • Kafka 設定,包括 Confluent API 的所有自訂資源,例如 KafkaTopicsConnect
  • 儲存在 Kafka 代理程式 PersistentVolumes 中的資料

將 Kubernetes 資源資訊清單 (包括 Confluent 設定) 儲存在 Git 存放區中,即可免除 Kafka 設定的個別備份需求,因為必要時可將資源重新套用至新的 Kubernetes 叢集。

為確保 Kafka 伺服器執行個體或部署 Kafka 的 Kubernetes 叢集遺失時,Kafka 資料能夠復原,建議您將用於為 Kafka 代理程式佈建磁碟區的 Kubernetes 儲存空間類別,設定為 reclaimPolicy 選項設為 Retain。我們也建議您為 Kafka 代理程式磁碟區建立快照

下列資訊清單說明使用 reclaimPolicy 選項 Retain 的 StorageClass:

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer

以下範例顯示新增至 Kafka 叢集自訂資源 spec 的 StorageClass:

...
spec:
  ...
  dataVolumeCapacity: 100Gi
  storageClass:
  name: premium-rwo-retain

完成這項設定後,即使刪除對應的 PersistentVolumeClaim,系統也不會刪除使用儲存空間類別佈建的 PersistentVolume。

如要使用現有設定和代理程式執行個體資料,在新 Kubernetes 叢集上復原 Kafka 執行個體,請按照下列步驟操作:

  1. 將現有的 Confluent 自訂資源 (KafkaKafkaTopicZookeeper 等) 套用至新的 Kubernetes 叢集
  2. 使用 PersistentVolumeClaim 的 spec.volumeName 屬性,以舊的 PersistentVolumes 更新 PersistentVolumeClaims,並使用新的 Kafka 代理程式執行個體名稱。