このガイドでは、Confluent for Kubernetes(CFK)オペレーターを使用して、Apache Kafka クラスタを Google Kubernetes Engine(GKE)にデプロイする方法を説明します。
Kafka は、オープンソースとして提供されているパブリッシュ / サブスクライブ型の分散メッセージング システムで、大規模なリアルタイム ストリーミング データを高スループットで処理します。Kafka を使用すると、システムやアプリケーション間でデータを確実に移動し、処理と分析を行うストリーミング データ パイプラインを構築できます。
このガイドは、GKE に Kafka クラスタをデプロイすることを検討しているプラットフォーム管理者、クラウド アーキテクト、運用担当者を対象としています。
CFK オペレーターを使用して、ウェブベースの Confluent Control Center、Schema Registry、KsqlDB など、Confluent Platform のその他のコンポーネントをデプロイすることもできます。ただし、このガイドでは Kafka のデプロイについてのみ説明します。
環境を準備する
このチュートリアルでは、Cloud Shell を使用して Google Cloudでホストされているリソースを管理します。Cloud Shell には、このチュートリアルに必要なソフトウェア(kubectl
、gcloud CLI、Helm、Terraform など)がプリインストールされています。
Cloud Shell を使用して環境を設定するには、次の操作を行います。
Google Cloud コンソールで
(Cloud Shell をアクティブにする)をクリックして、 Google Cloud コンソールから Cloud Shell セッションを起動します。 Google Cloud コンソールの下部ペインでセッションが起動します。
環境変数を設定します。
export PROJECT_ID=PROJECT_ID export KUBERNETES_CLUSTER_PREFIX=kafka export REGION=us-central1
PROJECT_ID
: Google Cloud は実際のプロジェクト ID に置き換えます。GitHub リポジトリのクローンを作成します。
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
作業ディレクトリを変更します。
cd kubernetes-engine-samples/streaming
クラスタ インフラストラクチャを作成する
このセクションでは、Terraform スクリプトを実行して、限定公開の高可用性リージョン GKE クラスタを作成します。次の手順では、コントロール プレーンへの公開アクセスを許可します。アクセスを制限するため、プライベート クラスタを作成します。
オペレーターは、Standard または Autopilot クラスタを使用してインストールできます。
Standard
次の図は、3 つの異なるゾーンにデプロイされた限定公開のリージョン GKE Standard クラスタを示しています。
このインフラストラクチャをデプロイするには、Cloud Shell から次のコマンドを実行します。
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
プロンプトが表示されたら、「yes
」と入力します。このコマンドが完了し、クラスタが準備完了ステータスになるまでに数分かかることがあります。
Terraform が次のリソースを作成します。
- Kubernetes ノードの VPC ネットワークとプライベート サブネット。
- NAT 経由でインターネットにアクセスするためのルーター。
us-central1
リージョンの限定公開 GKE クラスタ。- 自動スケーリングが有効な 2 つのノードプール(ゾーンあたり 1~2 ノード、ゾーンあたり 1 ノード以上)
- ロギングとモニタリングの権限を持つ
ServiceAccount
。 - Backup for GKE(障害復旧用)。
- Google Cloud Managed Service for Prometheus(クラスタ モニタリング用)。
出力は次のようになります。
...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"
Autopilot
次の図は、限定公開のリージョン GKE Autopilot クラスタを示しています。
このインフラストラクチャをデプロイするには、Cloud Shell から次のコマンドを実行します。
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
プロンプトが表示されたら、「yes
」と入力します。このコマンドが完了し、クラスタが準備完了ステータスになるまでに数分かかることがあります。
Terraform が次のリソースを作成します。
- Kubernetes ノードの VPC ネットワークとプライベート サブネット
- NAT 経由でインターネットにアクセスするためのルーター。
us-central1
リージョンの限定公開 GKE クラスタ。- ロギングとモニタリングの権限を持つ
ServiceAccount
- Google Cloud Managed Service for Prometheus(クラスタ モニタリング用)。
出力は次のようになります。
...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"
クラスタに接続する
クラスタと通信を行うように kubectl
を構成します。
gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}
CFK オペレーターをクラスタにデプロイする
このセクションでは、Helm チャートを使用して Confluent for Kubernetes(CFK)オペレーターをデプロイしてから、Kafka クラスタをデプロイします。
Confluent Helm チャート リポジトリを追加します。
helm repo add confluentinc https://packages.confluent.io/helm
CFK オペレーターと Kafka クラスタの Namespace を追加します。
kubectl create ns kafka
Helm を使用して CFK クラスタ オペレーターをデプロイします。
helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
CFK がすべての Namespace のリソースを管理できるようにするため、Helm コマンドにパラメータ
--set-namespaced=false
を追加します。Helm を使用して、Confluent オペレーターが正常にデプロイされたことを確認します。
helm ls -n kafka
出力は次のようになります。
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION confluent-operator kafka 1 2023-07-07 10:57:45.409158 +0200 CEST deployed confluent-for-kubernetes-0.771.13 2.6.0
Kafka をデプロイする
このセクションでは、Kafka を基本構成にデプロイしてから、可用性、セキュリティ、オブザーバビリティの要件を満たすため、さまざまな高度な構成シナリオを試してみます。
基本的な構成
Kafka インスタンスの基本構成には、次のコンポーネントが含まれています。
- Kafka ブローカーの 3 つのレプリカ。クラスタの整合性を確保するため、利用可能なレプリカが少なくとも 2 つ必要です。
- クラスタを形成する ZooKeeper ノードの 3 つのレプリカ。
- 2 つの Kafka リスナー: 1 つは認証を利用しません。もう 1 つは CFK によって生成された証明書を使用して TLS 認証を利用します。
- Kafka では Java の MaxHeapSize と MinHeapSize が 4 GB に設定されています。
- 1 CPU リクエストの CPU リソースの割り当ては 2 CPU に制限されています。メモリ リクエストの割り当ては、Kafka の場合 5 GB で、メインサービスに 4 GB、指標エクスポータに 0.5 GB という制限があります。ZooKeeper の場合は 3 GB で、メインサービスに 2 GB、指標エクスポータに 0.5 GB という制限があります。
premium-rwo
storageClass を使用して各 Pod に 100 GB のストレージが割り当てられます。Kafka データには 100 GB、Zookeeper データとログにはそれぞれ 90 GB、10 GB が割り当てられます。- 各ワークロードに構成された toleration、nodeAffinities、podAntiAffinities。それぞれのノードプールと異なるゾーンを使用して、ノード間で適切に分散されます。
- 指定した認証局を使用して自己署名証明書で保護されているクラスタ内の通信。
この構成は、本番環境に対応した Kafka クラスタの作成に必要な最小限の設定を表しています。以降のセクションでは、クラスタ セキュリティ、アクセス制御リスト(ACL)、トピック管理、証明書管理などに対処するためのカスタム構成を示します。
基本的な Kafka クラスタを作成する
CA ペアを生成します。
openssl genrsa -out ca-key.pem 2048 openssl req -new -key ca-key.pem -x509 \ -days 1000 \ -out ca.pem \ -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
Confluent for Kubernetes は、Confluent Platform コンポーネントが TLS ネットワーク暗号化に使用する証明書を自動生成します。ユーザーは認証局(CA)を生成して提供する必要があります。
認証局の Kubernetes Secret を作成します。
kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
Secret の名前は事前に定義されています。
基本構成を使用して新しい Kafka クラスタを作成します。
kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
このコマンドは、CFK オペレーターの Kafka カスタム リソースと Zookeeper カスタム リソースを作成します。これには、CPU とメモリ リクエストと上限、ブロック ストレージ リクエスト、Kubernetes ノード間でプロビジョニングされた Pod を分散させる taint とアフィニティが含まれています。
Kubernetes が必要なワークロードを開始するまで数分待ちます。
kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
Kafka ワークロードが作成されたことを確認します。
kubectl get pod,svc,statefulset,deploy,pdb -n kafka
出力は次のようになります。
NAME READY STATUS RESTARTS AGE pod/confluent-operator-864c74d4b4-fvpxs 1/1 Running 0 49m pod/my-cluster-0 1/1 Running 0 17m pod/my-cluster-1 1/1 Running 0 17m pod/my-cluster-2 1/1 Running 0 17m pod/zookeeper-0 1/1 Running 0 18m pod/zookeeper-1 1/1 Running 0 18m pod/zookeeper-2 1/1 Running 0 18m NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/confluent-operator ClusterIP 10.52.13.164 <none> 7778/TCP 49m service/my-cluster ClusterIP None <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-0-internal ClusterIP 10.52.2.242 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-1-internal ClusterIP 10.52.7.98 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-2-internal ClusterIP 10.52.4.226 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/zookeeper ClusterIP None <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-0-internal ClusterIP 10.52.8.52 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-1-internal ClusterIP 10.52.12.44 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-2-internal ClusterIP 10.52.12.134 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m NAME READY AGE statefulset.apps/my-cluster 3/3 17m statefulset.apps/zookeeper 3/3 18m NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/confluent-operator 1/1 1 1 49m NAME MIN AVAILABLE MAX UNAVAILABLE ALLOWED DISRUPTIONS AGE poddisruptionbudget.policy/my-cluster N/A 1 1 17m poddisruptionbudget.policy/zookeeper N/A 1 1 18m
オペレーターが次のリソースを作成します。
- Kafka と ZooKeeper 用の 2 つの StatefulSet。
- Kafka ブローカー レプリカ用に 3 つの Pod。
- ZooKeeper レプリカ用に 3 つの Pod。
- 2 つの
PodDisruptionBudget
リソース。クラスタの整合性を確保するため、使用できないレプリカを最大 1 つ確保します。 my-cluster
という名前の Service。Kubernetes クラスタ内から接続する Kafka クライアントのブートストラップ サーバーとして機能します。この Service では、すべての内部 Kafka リスナーを使用できます。zookeeper
という名前の Service。これにより、Kafka ブローカーはクライアントとして ZooKeeper ノードに接続できます。
認証とユーザー管理
このセクションでは、Kafka リスナーを保護し、クライアントと認証情報を共有するために認証と認可を有効にする方法について説明します。
Confluent for Kubernetes は、次のような Kafka 向けのさまざまな認証方法をサポートしています。
- SASL / PLAIN 認証: クライアントは認証にユーザー名とパスワードを使用します。ユーザー名とパスワードは Kubernetes Secret のサーバー側に保存されます。
- LDAP による SASL / PLAIN 認証: クライアントは認証にユーザー名とパスワードを使用します。認証情報は LDAP サーバーに保存されます。
- mTLS 認証: クライアントは認証に TLS 証明書を使用します。
制限事項
- CFK では、ユーザー管理用のカスタム リソースは提供されません。ただし、認証情報を Secret に格納し、リスナー仕様で Secret を参照することはできます。
- ACL を直接管理するためのカスタム リソースはありませんが、Kafka CLI を使用して ACL を構成する方法については、公式の Confluent for Kubernetes がガイダンスを提供しています。
ユーザーを作成する
このセクションでは、次のようなユーザー管理機能を実行する CFK オペレーターのデプロイ方法を示します。
- いずれかのリスナーでパスワード ベースの認証(SASL / PLAIN)が有効になっている Kafka クラスタ
- 3 つのレプリカを持つ
KafkaTopic
- 読み取り / 書き込み権限を含むユーザー認証情報
ユーザー認証情報で Secret を作成します。
export USERNAME=my-user export PASSWORD=$(openssl rand -base64 12) kubectl create secret generic my-user-credentials -n kafka \ --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
認証情報は次の形式で保存する必要があります。
{ "username1": "password1", "username2": "password2", ... "usernameN": "passwordN" }
ポート 9094 でパスワード ベースの認証である SCRAM-SHA-512 認証でリスナーを使用するように、Kafka クラスタを構成します。
kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
Kafka クラスタとやり取りを行うトピックとクライアント Pod を設定し、Kafka コマンドを実行します。
kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
GKE は、Secret
my-user-credentials
を Volume としてクライアント Pod にマウントします。クライアント Pod の準備ができたら接続して、提供された認証情報を使用してメッセージの生成と使用を開始します。
kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka kubectl exec -it kafkacat -n kafka -- /bin/sh
my-user
認証情報を使用してメッセージを生成し、メッセージで受信を確認します。export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2) export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4) echo "Message from my-user" |kcat \ -b my-cluster.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=PLAIN \ -X sasl.username=$USERNAME \ -X sasl.password=$PASSWORD \ -t my-topic -P kcat -b my-cluster.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=PLAIN \ -X sasl.username=$USERNAME \ -X sasl.password=$PASSWORD \ -t my-topic -C
出力は次のようになります。
Message from my-user % Reached end of topic my-topic [1] at offset 1 % Reached end of topic my-topic [2] at offset 0 % Reached end of topic my-topic [0] at offset 0
「
CTRL+C
」と入力して、コンシューマー プロセスを停止します。Connect refused
エラーが発生した場合は、数分待ってからもう一度試してください。Pod のシェルを終了します。
exit
バックアップと障害復旧
Confluent オペレーターを使用すると、特定のパターンに従って効率的なバックアップ戦略を実装できます。
Backup for GKE を使用して、次のものをバックアップできます。
- Kubernetes リソース マニフェスト。
- バックアップを行うクラスタの Kubernetes API サーバーから抽出された Confluent API カスタム リソースとその定義。
- マニフェスト内の PersistentVolumeClaim リソースに対応する Volume。
Backup for GKE を使用して Kafka クラスタのバックアップと復元を行う方法については、障害復旧の準備を行うをご覧ください。
Kafka クラスタの手動バックアップを実行することもできます。次のデータをバックアップする必要があります。
- Kafka 構成。
KafkaTopics
やConnect
など、Confluent API のすべてのカスタム リソースが含まれます。 - Kafka ブローカーの PersistentVolume に保存されているデータ。
Confluent の構成を含む Kubernetes リソース マニフェストを Git リポジトリに保存すると、必要に応じてリソースを新しい Kubernetes クラスタに再適用できるため、Kafka 構成のバックアップを別途行う必要がなくなります。
Kafka サーバー インスタンスまたは Kafka がデプロイされた Kubernetes クラスタが失われた場合に Kafka のデータを復旧できるようにするため、reclaimPolicy
オプションを Retain
に設定して、Kafka ブローカーのボリュームのプロビジョニングに使用される Kubernetes ストレージ クラスを構成することをおすすめします。また、Kafka ブローカー ボリュームのスナップショットを取得することもおすすめします。
次のマニフェストでは、reclaimPolicy
オプション Retain
を使用する StorageClass を記述しています。
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
次の例は、Kafka クラスタのカスタム リソースの spec
に追加された StorageClass を示しています。
...
spec:
...
dataVolumeCapacity: 100Gi
storageClass:
name: premium-rwo-retain
この構成では、対応する PersistentVolumeClaim が削除されても、ストレージ クラスを使用してプロビジョニングされた PersistentVolume は削除されません。
既存の構成とブローカー インスタンス データを使用して、新しい Kubernetes クラスタに Kafka インスタンスを復元するには:
- 既存の Confluent カスタム リソース(
Kafka
、KafkaTopic
、Zookeeper
など)を新しい Kubernetes クラスタに適用します。 - PersistentVolumeClaim の
spec.volumeName
プロパティを使用して、新しい Kafka ブローカー インスタンスの名前を持つ PersistentVolumeClaim を古い PersistentVolume に戻します。