本指南介绍了如何使用 Strimzi Operator 部署 Apache Kafka 集群。
Kafka 是一个开源分布式消息传递系统,旨在处理大量、高吞吐量和实时的流式数据。您可以借助它构建流式传输数据流水线,以便在不同的系统和应用之间可靠地传输数据,从而为处理和分析任务提供支持。
操作器是使用自定义资源来管理应用及其组件的软件扩展程序。如需详细了解使用 Operator 的动机,请参阅开源 Kubernetes 文档中的 Operator 模式。Strimzi 操作器提供灵活的部署选项,您可以使用 Kubernetes 污点和容忍属性在专用节点上运行 Kafka。
本指南适用于有意在 GKE 上部署 Kafka 集群的平台管理员、云架构师和运营专家。
如果您想了解如何使用第三方操作器部署 Kafka 集群来自动执行管理并减少错误,此解决方案是一个很好的起点。如果您想要更精细的运营控制,请参阅在 GKE 上部署高可用性 Kafka 集群。
准备环境
在本教程中,您将使用 Cloud Shell 来管理 Google Cloud上托管的资源。Cloud Shell 预安装了本教程所需的软件,包括 kubectl
、gcloud CLI、Helm 和 Terraform。
如需使用 Cloud Shell 设置您的环境,请按照以下步骤操作:
点击Google Cloud 控制台中的
激活 Cloud Shell,从 Google Cloud 控制台启动 Cloud Shell 会话。此操作会在 Google Cloud 控制台的底部窗格中启动会话。
设置环境变量:
export PROJECT_ID=PROJECT_ID export KUBERNETES_CLUSTER_PREFIX=kafka export REGION=us-central1
将
PROJECT_ID
替换为您的 Google Cloud 项目 ID。克隆 GitHub 代码库:
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
切换到工作目录:
cd kubernetes-engine-samples/streaming/
创建集群基础架构
在本部分中,您将运行 Terraform 脚本以创建可用性高的专用区域级 GKE 集群。通过执行以下步骤,可以公开访问控制平面。如需限制访问权限,请创建专用集群。
您可以使用 Standard 或 Autopilot 集群安装操作器。
标准
下图显示了部署在三个不同可用区中的专用区域级 Standard GKE 集群:
若要部署此基础架构,请从 Cloud Shell 运行以下命令:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
出现提示时,请输入 yes
。完成此命令并使集群显示就绪状态可能需要几分钟时间。
Terraform 会创建以下资源:
- Kubernetes 节点的 VPC 网络和专用子网。
- 用于通过 NAT 访问互联网的路由器。
- 专用 GKE 集群(在
us-central1
区域中)。 - 2 个启用了自动扩缩功能的节点池(每个可用区 1-2 个节点,每个可用区至少 1 个节点)
- 具有日志记录和监控权限的
ServiceAccount
。 - 用于灾难恢复的 Backup for GKE
- Google Cloud Managed Service for Prometheus,用于监控集群。
输出类似于以下内容:
...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials strimzi-cluster --region us-central1"
Autopilot
下图显示了专用区域级 Autopilot GKE 集群:
若要部署此基础架构,请从 Cloud Shell 运行以下命令:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
出现提示时,请输入 yes
。完成此命令并使集群显示就绪状态可能需要几分钟时间。
Terraform 会创建以下资源:
- Kubernetes 节点的 VPC 网络和专用子网。
- 用于通过 NAT 访问互联网的路由器。
- 专用 GKE 集群(在
us-central1
区域中)。 - 具有日志记录和监控权限的
ServiceAccount
- Google Cloud Managed Service for Prometheus,用于监控集群。
输出类似于以下内容:
...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials strimzi-cluster --region us-central1"
连接到集群
配置 kubectl
以与集群通信:
gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}
将 Strimzi 操作器部署到您的集群
在本部分中,您将使用 Helm 图表部署 Strimzi 操作器。您还可以通过几种其他方式部署 Strimzi。
添加 Strimzi Helm 图表代码库:
helm repo add strimzi https://strimzi.io/charts/
为 Strimzi 操作器和 Kafka 集群添加命名空间:
kubectl create ns kafka
使用 Helm 部署 Strimzi Cluster Operator:
helm install strimzi-operator strimzi/strimzi-kafka-operator -n kafka
如需将 Strimzi Cluster Operator 和 Kafka 集群部署到不同的命名空间,请将参数
--set watchNamespaces="{kafka-namespace,kafka-namespace-2,...}"
添加到 helm 命令。使用 Helm 验证 Strimzi Cluster Operator 是否已成功部署:
helm ls -n kafka
输出类似于以下内容:
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION strimzi-operator kafka 1 2023-06-27 11:22:15.850545 +0200 CEST deployed strimzi-kafka-operator-0.35.0 0.35.0
部署 Kafka
将操作器部署到集群后,您就可以部署 Kafka 集群实例了。
在本部分中,您将在基本配置中部署 Kafka,然后尝试各种高级配置场景以满足可用性、安全性和可观测性要求。
基本配置
Kafka 实例的基本配置包括以下组成部分:
- Kafka 代理的三个副本,至少需要两个可用副本来实现集群一致性。
- ZooKeeper 节点的三个副本,形成一个集群。
- 两个 Kafka 监听器:一个不使用身份验证,另一个利用 TLS 身份验证(使用 Strimzi 生成的证书)。
- Java MaxHeapSize 和 MinHeapSize 设置为 4 GB(对于 Kafka)和 2 GB(对于 ZooKeeper)。
- CPU 资源分配:1 个 CPU 请求和 2 个 CPU 限制(对于 Kafka 和 ZooKeeper),同时 Kafka 的内存请求和限制为 5 GB(主服务为 4 GB,指标导出器为 0.5 GB),ZooKeeper 为 2.5 GB(主服务为 2 GB,指标导出为 0.5 GB)。
- 实体操作器,其请求和限制如下:
tlsSidecar
:100 m/500 m CPU 和 128 Mi 内存。topicOperator
:100 m/500 m CPU 和 512 Mi 内存。userOperator
:500 m CPU 和 2 Gi 内存。
- 100 GB 存储空间(使用
premium-rwo
storageClass
分配给每个 Pod)。 - 为每个工作负载配置容忍、nodeAffinity 和 podAntiAffinity,利用其各自的节点池和不同的可用区,确保跨节点适当分布。
- 集群内通信(由自签名证书保护):集群和客户端 (mTLS) 单独的证书授权机构 (CA)。也可以配置为使用其他证书授权机构。
此配置表示创建可用于生产用途的 Kafka 集群所需的最低设置。以下部分演示的自定义配置可解决集群安全、访问控制列表 (ACL)、主题管理、证书管理等方面的需要。
创建基本 Kafka 集群
使用基本配置创建新的 Kafka 集群:
kubectl apply -n kafka -f kafka-strimzi/manifests/01-basic-cluster/my-cluster.yaml
此命令会创建 Strimzi 操作器的 Kafka 自定义资源,该资源包含 CPU 及内存请求和限制、块存储请求以及污点和亲和性组合,用于在各 Kubernetes 节点之间分发预配的 Pod。
请等待几分钟,让 Kubernetes 启动所需的工作负载:
kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s -n kafka
验证是否已创建 Kafka 工作负载:
kubectl get pod,service,deploy,pdb -l=strimzi.io/cluster=my-cluster -n kafka
输出类似于以下内容:
NAME READY STATUS RESTARTS AGE pod/my-cluster-entity-operator-848698874f-j5m7f 3/3 Running 0 44m pod/my-cluster-kafka-0 1/1 Running 0 5m pod/my-cluster-kafka-1 1/1 Running 0 5m pod/my-cluster-kafka-2 1/1 Running 0 5m pod/my-cluster-zookeeper-0 1/1 Running 0 6m pod/my-cluster-zookeeper-1 1/1 Running 0 6m pod/my-cluster-zookeeper-2 1/1 Running 0 6m NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/my-cluster-kafka-bootstrap ClusterIP 10.52.8.80 <none> 9091/TCP,9092/TCP,9093/TCP 5m service/my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 5m service/my-cluster-zookeeper-client ClusterIP 10.52.11.144 <none> 2181/TCP 6m service/my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 6m NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/my-cluster-entity-operator 1/1 1 1 44m NAME MIN AVAILABLE MAX UNAVAILABLE ALLOWED DISRUPTIONS AGE poddisruptionbudget.policy/my-cluster-kafka 2 N/A 1 5m poddisruptionbudget.policy/my-cluster-zookeeper 2 N/A 1 6m
操作器创建以下资源:
- 两个
StrimziPodSets
,用于 Kafka 和 ZooKeeper。 - 用于 Kafka 代理副本的三个 Pod。
- 用于 ZooKeeper 副本的三个 Pod。
- 两个
PodDisruptionBudgets
,为实现集群一致性,确保最少两个副本可用。 - 名为
my-cluster-kafka-bootstrap
的 Service,用作从 Kubernetes 集群内连接的 Kafka 客户端的引导服务器。所有内部 Kafka 监听器在此 Service 中都可用。 - 名为
my-cluster-kafka-brokers
的无头 Service,用于直接对 Kafka 代理 Pod IP 地址进行 DNS 解析。此 Service 用于代理间通信。 - 名为
my-cluster-zookeeper-client
的 Service,允许 Kafka 代理作为客户端连接到 ZooKeeper 节点。 - 名为
my-cluster-zookeeper-nodes
的无头 Service,支持直接对 ZooKeeper Pod IP 地址进行DNS 解析。此服务用于在 ZooKeeper 副本之间建立连接。 - 名为
my-cluster-entity-operator
的 Deployment,其中包含 topic-operator 和 user-operator,并且有助于管理自定义资源KafkaTopics
和KafkaUsers
。
您还可以配置两个 NetworkPolicies
,以便从任何 Pod 和命名空间连接到 Kafka 监听器。这些政策还会限制 ZooKeeper 到代理的连接,并支持集群 Pod 与集群通信专用的内部 Service 端口之间的通信。
身份验证和用户管理
本部分介绍如何启用身份验证和授权,以保护 Kafka 监听器并与客户端共享凭据。
Strimzi 提供一种使用单独的 User Operator
及其相应的 Kubernetes 自定义资源 KafkaUser
(用于定义用户配置)来进行用户管理的 Kubernetes 原生方法。用户配置包括身份验证和授权设置,还包括在 Kafka 中预配相应的用户。
Strimzi 可以创建支持基于用户名和密码的身份验证 (SCRAM-SHA-512) 或 TLS 等多种身份验证机制的 Kafka 监听器和用户。您还可以使用 OAuth 2.0 身份验证,由于其安全性和外部凭据管理,通常认为这种方法比使用密码或证书进行身份验证更好。
部署 Kafka 集群
本部分介绍如何部署可演示用户管理功能的 Strimzi 操作器,包括:
- 一个 Kafka 集群,它在某个监听器上启用了基于密码的身份验证 (SCRAM-SHA-512)。
- 包含 3 个副本的
KafkaTopic
。 - 一个
KafkaUser
,其 ACL 指定该用户具有主题的读写权限。
将您的 Kafka 集群配置为使用在端口 9094 上采用基于密码的 SCRAM-SHA-512 身份验证的监听器:
kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/my-cluster.yaml
创建
Topic
、User
和客户端 Pod,以对 Kafka 集群执行命令:kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/topic.yaml kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/my-user.yaml
包含用户凭据的
Secret
my-user
作为卷装载到客户端 Pod 上。这些凭据会确认用户是否有权使用启用了基于密码的身份验证 (SCRAM-SHA-512) 的监听器向主题发布消息。
创建客户端 Pod:
kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/kafkacat.yaml
等待几分钟,让客户端 Pod 变为
Ready
状态,然后连接到该 Pod:kubectl wait --for=condition=Ready pod --all -n kafka --timeout=600s kubectl exec -it kafkacat -n kafka -- /bin/sh
生成具有
my-user
凭据的新消息,并尝试使用它:echo "Message from my-user" |kcat \ -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=my-user \ -X sasl.password=$(cat /my-user/password) \ -t my-topic -P kcat -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=my-user \ -X sasl.password=$(cat /my-user/password) \ -t my-topic -C
输出类似于以下内容:
Message from my-user % Reached end of topic my-topic [0] at offset 0 % Reached end of topic my-topic [2] at offset 1 % Reached end of topic my-topic [1] at offset 0
按
CTRL+C
以停止使用方进程。退出 Pod shell
exit
备份和灾难恢复
虽然 Strimzi 操作器不提供内置备份功能,但您可以按照以下模式实现高效的备份策略。
您可以使用 Backup for GKE 备份:
- Kubernetes 资源清单。
- 从正在备份的集群的 Kubernetes API 服务器中提取的 Strimzi API 自定义资源及其定义。
- 与清单中找到的 PersistentVolumeClaim 资源相对应的卷。
如需详细了解如何使用 Backup for GKE 备份和恢复 Kafka 集群,请参阅为灾难恢复做好准备。
您还可以对已使用 Strimzi 操作器部署的 Kafka 集群执行备份。您应该备份:
- Kafka 配置,其中包含 Stimzi API 的所有自定义资源,例如
KafkaTopics
和KafkaUsers
。 - 数据,存储在 Kafka 代理的 PersistentVolume 中。
通过将 Kubernetes 资源清单(包括 Strimzi 配置)存储在 Git 代码库中,无需单独备份对 Kafka 配置,因为可以根据需要将资源重新应用于新的 Kubernetes 集群。
为了在 Kafka 服务器实例或部署了 Kafka 的 Kubernetes 集群丢失的情况下保护 Kafka 数据恢复,我们建议您配置用于为 Kafka 代理预配卷的 Kubernetes 存储类别,并将 reclaimPolicy
选项设置为 Retain
。我们还建议您截取 Kafka 代理卷的快照。
以下清单描述了使用 reclaimPolicy
选项 Retain
的 StorageClass:
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
以下示例展示了 StorageClass 被添加到了 Kafka 集群自定义资源的 spec
中:
# ...
spec:
kafka:
# ...
storage:
type: persistent-claim
size: 100Gi
class: premium-rwo-retain
使用此配置时,即使删除了相应的 PersistentVolumeClaim,使用该存储类别预配的 PersistentVolume 也不会被删除。
如需使用现有配置和代理实例数据在新的 Kubernetes 集群上恢复 Kafka 实例,请执行以下操作:
- 将现有 Strimzi Kafka 自定义资源(
Kakfa
、KafkaTopic
、KafkaUser
等)应用于新的 Kubernetes 集群 - 使用 PersistentVolumeClaim 上的
spec.volumeName
属性,将采用新 Kafka 代理实例名称的 PersistentVolumeClaim 更新为旧的 PersistentVolume。