In dieser Anleitung erfahren Sie, wie Sie mit dem Strimzi-Operator Apache Kafka-Cluster bereitstellen.
Kafka ist ein verteiltes Open-Source-Messaging-System für die Verarbeitung von großen Datenmengen mit hohem Durchsatz und Echtzeitstreaming. Damit können Sie Streamingdaten-Pipelines für die zuverlässige Datenübertragung zwischen verschiedenen Systemen und Anwendungen erstellen, um Verarbeitungs- und Analyseaufgaben zu unterstützen.
Operatoren sind Softwareerweiterungen, die benutzerdefinierte Ressourcen verwenden, um Anwendungen und ihre Komponenten zu verwalten. Weitere Informationen zur Motivation für die Verwendung von Operatoren finden Sie in der Open-Source-Dokumentation zu Kubernetes unter Operator Pattern. Der Strimzi-Operator bietet Flexibilität bei den Bereitstellungsoptionen und ermöglicht es Ihnen, Kubernetes-Taints und -Toleranzen zu verwenden, um Kafka auf dedizierten Knoten auszuführen.
Dieser Leitfaden richtet sich an Plattformadministratoren, Cloud-Architekten und Betriebsexperten, die an der Bereitstellung von Kafka-Clustern in GKE interessiert sind.
Diese Lösung ist ein guter Ausgangspunkt, wenn Sie lernen möchten, wie Sie Kafka-Cluster mit einem Drittanbieteroperator bereitstellen, um die Verwaltung zu automatisieren und Fehler zu reduzieren. Wenn Sie eine genauere operative Kontrolle wünschen, lesen Sie den Abschnitt Hochverfügbaren Kafka-Cluster in GKE bereitstellen.
Umgebung vorbereiten
In dieser Anleitung verwenden Sie Cloud Shell zum Verwalten von Ressourcen, die in Google Cloudgehostet werden. Die Software, die Sie für diese Anleitung benötigen, ist in Cloud Shell vorinstalliert, einschließlich kubectl
, gcloud CLI, Helm und Terraform.
So richten Sie Ihre Umgebung mit Cloud Shell ein:
Starten Sie eine Cloud Shell-Sitzung über die Google Cloud Console. Klicken Sie dazu in der Google Cloud Console auf
Cloud Shell aktivieren. Dadurch wird im unteren Bereich der Google Cloud Console eine Sitzung gestartet.
Legen Sie Umgebungsvariablen fest:
export PROJECT_ID=PROJECT_ID export KUBERNETES_CLUSTER_PREFIX=kafka export REGION=us-central1
Ersetzen Sie
PROJECT_ID
: Ihre Google Cloud durch Ihre Projekt-ID.Klonen Sie das GitHub-Repository:
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
Wechseln Sie in das Arbeitsverzeichnis:
cd kubernetes-engine-samples/streaming/
Clusterinfrastruktur erstellen
In diesem Abschnitt führen Sie ein Terraform-Skript aus, um einen privaten, hochverfügbaren regionalen GKE-Cluster zu erstellen. Die folgenden Schritte ermöglichen einen öffentlichen Zugriff auf die Steuerungsebene. Erstellen Sie einen privaten Cluster, um den Zugriff einzuschränken.
Sie können den Operator mit einem Standard- oder Autopilot-Cluster installieren.
Standard
Das folgende Diagramm zeigt einen privaten regionalen Standard-GKE-Cluster, der in drei verschiedenen Zonen bereitgestellt wird:
Führen Sie die folgenden Befehle in der Cloud Shell aus, um diese Infrastruktur bereitzustellen:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
Geben Sie bei Aufforderung yes
ein. Es kann einige Minuten dauern, bis dieser Befehl abgeschlossen ist und der Cluster den Status „Bereit“ anzeigt.
Terraform erstellt die folgenden Ressourcen:
- Ein VPC-Netzwerk und ein privates Subnetz für die Kubernetes-Knoten.
- Ein Router für den Zugriff auf das Internet über NAT.
- Ein privater GKE-Cluster in der Region
us-central1
. - 2 Knotenpools mit aktiviertem Autoscaling (1-2 Knoten pro Zone, mindestens 1 Knoten pro Zone)
- Ein
ServiceAccount
mit Logging- und Monitoring-Berechtigungen. - Backup for GKE zur Notfallwiederherstellung.
- Google Cloud Managed Service for Prometheus für das Clustermonitoring.
Die Ausgabe sieht in etwa so aus:
...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials strimzi-cluster --region us-central1"
Autopilot
Das folgende Diagramm zeigt einen privaten regionalen Autopilot-GKE-Cluster:
Führen Sie die folgenden Befehle in der Cloud Shell aus, um die Infrastruktur bereitzustellen:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
Geben Sie bei Aufforderung yes
ein. Es kann einige Minuten dauern, bis dieser Befehl abgeschlossen ist und der Cluster den Status „Bereit“ anzeigt.
Terraform erstellt die folgenden Ressourcen:
- VPC-Netzwerk und privates Subnetz für die Kubernetes-Knoten.
- Ein Router für den Zugriff auf das Internet über NAT.
- Ein privater GKE-Cluster in der Region
us-central1
. - Ein
ServiceAccount
mit Logging- und Monitoring-Berechtigungen. - Google Cloud Managed Service for Prometheus für das Clustermonitoring.
Die Ausgabe sieht in etwa so aus:
...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials strimzi-cluster --region us-central1"
Verbindung zum Cluster herstellen
Konfigurieren Sie kubectl
für die Kommunikation mit dem Cluster:
gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}
Stellen Sie den Strimzi-Operator in Ihrem Cluster bereit:
In diesem Abschnitt stellen Sie den Strimzi-Operator mit einem Helm-Diagramm bereit. Es gibt auch mehrere andere Möglichkeiten, Strimzi bereitzustellen.
Fügen Sie das Strimzi-Helm-Diagramm-Repository hinzu:
helm repo add strimzi https://strimzi.io/charts/
Fügen Sie einen Namespace für den Strimzi-Operator und den Kafka-Cluster hinzu:
kubectl create ns kafka
Stellen Sie den Strimzi-Cluster-Operator mit Helm bereit:
helm install strimzi-operator strimzi/strimzi-kafka-operator -n kafka
Wenn Sie den Strimzi Cluster Operator und Kafka-Cluster in verschiedenen Namespaces bereitstellen möchten, fügen Sie den Parameter
--set watchNamespaces="{kafka-namespace,kafka-namespace-2,...}"
zum Helm-Befehl hinzu.Prüfen Sie mit Helm, ob der Strimzi Cluster Operator erfolgreich bereitgestellt wurde:
helm ls -n kafka
Die Ausgabe sieht in etwa so aus:
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION strimzi-operator kafka 1 2023-06-27 11:22:15.850545 +0200 CEST deployed strimzi-kafka-operator-0.35.0 0.35.0
Kafka bereitstellen
Nachdem der Operator im Cluster bereitgestellt wurde, können Sie eine Kafka-Clusterinstanz bereitstellen.
In diesem Abschnitt stellen Sie Kafka in einer grundlegenden Konfiguration bereit und testen dann verschiedene erweiterte Konfigurationsszenarien, um Anforderungen an Verfügbarkeit, Sicherheit und Beobachtbarkeit zu erfüllen.
Grundlegende Konfiguration
Die grundlegende Konfiguration für die Kafka-Instanz umfasst die folgenden Komponenten:
- Drei Replikate von Kafka-Brokern, wobei mindestens zwei verfügbare Replikate für die Clusterkonsistenz erforderlich sind.
- Drei Replikate von ZooKeeper-Knoten, die einen Cluster bilden.
- Zwei Kafka-Listener: einer ohne Authentifizierung und einer mit TLS-Authentifizierung mit einem von Strimzi generierten Zertifikat.
- Java MaxHeapSize und MinHeapSize sind für Kafka auf 4 GB und für ZooKeeper auf 2 GB festgelegt.
- CPU-Ressourcenzuweisung von 1 CPU-Anfrage und 2 CPU-Limits für Kafka und ZooKeeper sowie 5 GB Arbeitsspeicheranfragen und -limits für Kafka (4 GB für den Hauptdienst und 0,5 GB für den Messwertexporter) und 2,5 GB für ZooKeeper (2 GB für den Hauptdienst und 0,5 GB für den Messwertexporter).
- Einheit mit den folgenden Anfragen und Limits:
tlsSidecar
: 100 m/500 m CPU und 128 Mi Arbeitsspeicher.topicOperator
: 100 m/500 m CPU und 512 Mi Arbeitsspeicher.userOperator
: 500 mCPU und 2 Gi Arbeitsspeicher.
- 100 GB Speicher für jeden Pod unter Verwendung der
premium-rwo
storageClass
. - Toleranzen, nodeAffinities und podAntiAffinities für jede Arbeitslast, wodurch eine ordnungsgemäße Verteilung zwischen Knoten gewährleistet wird. Dabei werden die entsprechenden Knotenpools und verschiedene Zonen verwendet.
- Kommunikation im Cluster, die durch selbst signierte Zertifikate gesichert ist: separate Zertifizierungsstellen (CAs) für Cluster und Clients (mTLS). Sie können auch eine andere Zertifizierungsstelle konfigurieren.
Diese Konfiguration stellt die minimale Einrichtung dar, die zum Erstellen eines produktionsfertigen Kafka-Clusters erforderlich ist. In den folgenden Abschnitten werden benutzerdefinierte Konfigurationen zu Aspekten wie Clustersicherheit, Access Control Lists (ACLs), Themenverwaltung, Zertifikatsverwaltung und mehr vorgestellt.
Einfachen Kafka-Cluster erstellen
Erstellen Sie mithilfe der grundlegenden Konfiguration einen neuen Kafka-Cluster:
kubectl apply -n kafka -f kafka-strimzi/manifests/01-basic-cluster/my-cluster.yaml
Mit diesem Befehl wird eine benutzerdefinierte Kafka-Ressource des Strimzi-Operators erstellt, die CPU- und Speicheranfragen und -limits, Blockspeicheranfragen sowie eine Kombination aus Markierungen und Affinitäten enthält, um die bereitgestellten Pods auf Kubernetes-Knoten zu verteilen.
Warten Sie einige Minuten, während Kubernetes die erforderlichen Arbeitslasten startet:
kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s -n kafka
Prüfen Sie, ob die Kafka-Arbeitslasten erstellt wurden:
kubectl get pod,service,deploy,pdb -l=strimzi.io/cluster=my-cluster -n kafka
Die Ausgabe sieht in etwa so aus:
NAME READY STATUS RESTARTS AGE pod/my-cluster-entity-operator-848698874f-j5m7f 3/3 Running 0 44m pod/my-cluster-kafka-0 1/1 Running 0 5m pod/my-cluster-kafka-1 1/1 Running 0 5m pod/my-cluster-kafka-2 1/1 Running 0 5m pod/my-cluster-zookeeper-0 1/1 Running 0 6m pod/my-cluster-zookeeper-1 1/1 Running 0 6m pod/my-cluster-zookeeper-2 1/1 Running 0 6m NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/my-cluster-kafka-bootstrap ClusterIP 10.52.8.80 <none> 9091/TCP,9092/TCP,9093/TCP 5m service/my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 5m service/my-cluster-zookeeper-client ClusterIP 10.52.11.144 <none> 2181/TCP 6m service/my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 6m NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/my-cluster-entity-operator 1/1 1 1 44m NAME MIN AVAILABLE MAX UNAVAILABLE ALLOWED DISRUPTIONS AGE poddisruptionbudget.policy/my-cluster-kafka 2 N/A 1 5m poddisruptionbudget.policy/my-cluster-zookeeper 2 N/A 1 6m
Der Operator erstellt die folgenden Ressourcen:
- Zwei
StrimziPodSets
für Kafka und ZooKeeper. - Drei Pods für Kafka-Broker-Replikate.
- Drei Pods für ZooKeeper-Replikate.
- Zwei
PodDisruptionBudgets
, die eine Mindestverfügbarkeit von zwei Replikaten für die Clusterkonsistenz gewährleisten. - Ein Dienst namens
my-cluster-kafka-bootstrap
, der als Bootstrap-Server für Kafka-Clients dient, die eine Verbindung innerhalb des Kubernetes-Clusters herstellen. In diesem Dienst sind alle internen Kafka-Listener verfügbar. - Ein monitorloser Dienst mit dem Namen
my-cluster-kafka-brokers
, der die direkte DNS-Auflösung von Kafka-Broker-Pod-IP-Adressen ermöglicht. Dieser Dienst wird für die Kommunikation zwischen Brokern verwendet. - Ein Dienst namens
my-cluster-zookeeper-client
, mit dem Kafka-Broker eine Verbindung zu ZooKeeper-Knoten als Clients herstellen können. - Ein monitorloser Dienst mit dem Namen
my-cluster-zookeeper-nodes
, der die direkte DNS-Auflösung von ZooKeeper-Pod-IP-Adressen ermöglicht. Dieser Dienst wird verwendet, um eine Verbindung zwischen ZooKeeper-Replikaten herzustellen. - Ein Deployment namens
my-cluster-entity-operator
, das den Themenoperator und den Nutzeroperator enthält und die Verwaltung der benutzerdefinierten RessourcenKafkaTopics
undKafkaUsers
erleichtert.
Sie können auch zwei NetworkPolicies
konfigurieren, um die Verbindung zu Kafka-Listenern von jedem Pod und Namespace aus zu ermöglichen. Diese Richtlinien würden auch Verbindungen zu ZooKeeper auf Broker beschränken und die Kommunikation zwischen den Cluster-Pods und internen Dienstports ermöglichen, die ausschließlich für die Clusterkommunikation vorgesehen sind.
Authentifizierung und Nutzerverwaltung
In diesem Abschnitt erfahren Sie, wie Sie die Authentifizierung und Autorisierung aktivieren, um Kafka-Listener zu sichern und Anmeldedaten mit Clients zu teilen.
Strimzi bietet eine Kubernetes-native Methode für die Nutzerverwaltung mit einem separaten User Operator
und der entsprechenden benutzerdefinierten Kubernetes-Ressource, KafkaUser
, zur Definition der Nutzerkonfiguration. Die Nutzerkonfiguration umfasst Einstellungen für die Authentifizierung und Autorisierung und stellt den entsprechenden Nutzer in Kafka bereit.
Strimzi kann Kafka-Listener und -Nutzer erstellen, die mehrere Authentifizierungsmechanismen wie die Authentifizierung auf Basis von Nutzername und Passwort (SCRAM-SHA-512) oder TLS unterstützen. Sie können auch die OAuth 2.0-Authentifizierung verwenden. Diese wird oft als besserer Ansatz im Vergleich zur Verwendung von Passwörtern oder Zertifikaten für die Authentifizierung angesehen, da sie sicherer ist und externe Anmeldedatenverwaltung bietet.
Kafka-Cluster bereitstellen
In diesem Abschnitt erfahren Sie, wie Sie einen Strimzi-Operator bereitstellen, der Funktionen zur Nutzerverwaltung zeigt, darunter:
- Einen Kafka-Cluster mit passwortbasierter Authentifizierung (SCRAM-SHA-512) für einen der Listener
- Ein
KafkaTopic
mit drei Replikaten. - Eine
KafkaUser
mit einer ACL, die angibt, dass der Nutzer Lese- und Schreibberechtigungen für das Thema hat.
Konfigurieren Sie Ihren Kafka-Cluster so, dass ein Listener mit passwortbasierter SCRAM-SHA-512-Authentifizierung an Port 9094 und einfacher Autorisierung verwendet wird:
kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/my-cluster.yaml
Erstellen Sie ein
Topic
, einUser
und einen Client-Pod, um Befehle für den Kafka-Cluster auszuführen:kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/topic.yaml kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/my-user.yaml
Das
Secret
my-user
mit den Nutzeranmeldedaten wird als Volume für den Client-Pod bereitgestellt.Mit diesen Anmeldedaten wird bestätigt, dass der Nutzer die Berechtigung hat, Nachrichten für das Thema über den Listener mit aktivierter passwortbasierter Authentifizierung (SCRAM-SHA-512) zu veröffentlichen.
Client-Pod erstellen:
kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/kafkacat.yaml
Warten Sie einige Minuten, bis der Client-Pod
Ready
ist, und stellen Sie dann eine Verbindung her:kubectl wait --for=condition=Ready pod --all -n kafka --timeout=600s kubectl exec -it kafkacat -n kafka -- /bin/sh
Erstellen Sie eine neue Nachricht mit
my-user
-Anmeldedaten und versuchen Sie, sie zu verarbeiten:echo "Message from my-user" |kcat \ -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=my-user \ -X sasl.password=$(cat /my-user/password) \ -t my-topic -P kcat -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=my-user \ -X sasl.password=$(cat /my-user/password) \ -t my-topic -C
Die Ausgabe sieht in etwa so aus:
Message from my-user % Reached end of topic my-topic [0] at offset 0 % Reached end of topic my-topic [2] at offset 1 % Reached end of topic my-topic [1] at offset 0
Geben Sie
CTRL+C
ein, um den Nutzervorgang zu beenden.Beenden Sie die Pod-Shell
exit
Sicherungen und Notfallwiederherstellung
Der Strimzi-Operator bietet zwar keine integrierte Sicherungsfunktion, Sie können jedoch effiziente Sicherungsstrategien anhand bestimmter Muster implementieren.
Sie können Backup for GKE verwenden, um Folgendes zu sichern:
- Kubernetes-Ressourcenmanifeste.
- Benutzerdefinierte API-Ressourcen von Strimzi und ihre Definitionen, die vom Kubernetes API-Server des Clusters extrahiert wurden, der die Sicherung ausführt.
- Volumes, die PersistentVolumeClaim-Ressourcen in den Manifesten entsprechen.
Weitere Informationen zum Sichern und Wiederherstellen von Kafka-Clustern mit Backup for GKE finden Sie unter Notfallwiederherstellung vorbereiten.
Sie können auch eine Sicherung eines Kafka-Clusters erstellen, der mit dem Strimzi-Operator bereitgestellt wurde. Sichern Sie Folgendes:
- Die Kafka-Konfiguration, die alle benutzerdefinierten Ressourcen der Strimzi API enthält, z. B.
KafkaTopics
undKafkaUsers
. - Die Daten, die in den PersistentVolumes der Kafka-Broker gespeichert werden.
Das Speichern von Kubernetes-Ressourcenmanifesten, einschließlich Strimzi-Konfigurationen, in Git-Repositories kann dazu führen, dass keine separate Sicherung für Kafka-Konfigurationen erforderlich ist, da die Ressourcen bei Bedarf auf einen neuen Kubernetes-Cluster angewendet werden können.
Um die Wiederherstellung von Kafka-Daten in Szenarien zu gewährleisten, in denen eine Kafka-Serverinstanz oder ein Kubernetes-Cluster, in dem Kafka bereitgestellt wird, verloren geht, sollten Sie die Kubernetes-Speicherklasse, die für die Bereitstellung von Volumes für Kafka-Broker verwendet wird, mit der Option reclaimPolicy
mit dem Wert Retain
konfigurieren. Außerdem empfehlen wir, Snapshots von Kafka-Broker-Volumes zu erstellen.
Das folgende Manifest beschreibt eine StorageClass, die die Option reclaimPolicy
mit dem Wert Retain
verwendet:
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
Das folgende Beispiel zeigt die StorageClass, die der spec
einer benutzerdefinierten Kafka-Clusterressource hinzugefügt wird:
# ...
spec:
kafka:
# ...
storage:
type: persistent-claim
size: 100Gi
class: premium-rwo-retain
Bei dieser Konfiguration werden PersistentVolumes, die mit der Speicherklasse bereitgestellt werden, nicht gelöscht, auch wenn der entsprechende PersistentVolumeClaim gelöscht wird.
So stellen Sie die Kafka-Instanz auf einem neuen Kubernetes-Cluster mithilfe der vorhandenen Konfigurations- und Broker-Instanzdaten wieder her:
- Wenden Sie die vorhandenen benutzerdefinierten Strimzi Kafka-Ressourcen (
Kakfa
,KafkaTopic
,KafkaUser
usw.) auf einen neuen Kubernetes-Cluster an. - Aktualisieren Sie die PersistentVolumeClaims mit dem Namen der neuen Kafka-Broker-Instanzen auf die alten PersistentVolumes unter Verwendung des Attributs
spec.volumeName
im PersistentVolumeClaim.