GKE에 고가용성 Kafka 클러스터 배포

Kafka는 높은 볼륨, 높은 처리량, 실시간 스트리밍 데이터를 처리하기 위한 오픈소스 분산 게시-구독 메시징 시스템입니다. Kafka를 사용하면 처리 및 분석을 위해 여러 시스템 및 애플리케이션에서 데이터를 안정적으로 이동하는 스트리밍 데이터 파이프라인을 빌드할 수 있습니다.

이 튜토리얼은 Google Kubernetes Engine(GKE)에 고가용성 Kafka 클러스터 배포에 관심이 있는 플랫폼 관리자, 클라우드 설계자, 운영 전문가를 대상으로 합니다.

클러스터 인프라 만들기

이 섹션에서는 Terraform 스크립트를 실행하여 2개의 리전 GKE 클러스터를 만듭니다. 기본 클러스터는 us-central1에 배포됩니다.

클러스터를 만들려면 다음 단계를 따르세요.

Autopilot

Cloud Shell에서 다음 명령어를 실행합니다.

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

메시지가 표시되면 yes를 입력합니다.

Standard

Cloud Shell에서 다음 명령어를 실행합니다.

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

메시지가 표시되면 yes를 입력합니다.

Terraform 구성 파일은 인프라를 배포하기 위해 다음 리소스를 만듭니다.

  • Docker 이미지를 저장할 Artifact Registry 저장소를 만듭니다.
  • VM의 네트워크 인터페이스에 대한 VPC 네트워크 및 서브넷을 만듭니다.
  • GKE 클러스터 2개를 만듭니다.

Terraform은 두 리전에 비공개 클러스터를 만들고 재해 복구를 위해 Backup for GKE를 사용 설정합니다.

클러스터에 Kafka 배포

이 섹션에서는 Helm 차트를 사용하여 GKE에 Kafka를 배포합니다. 이 작업은 다음 리소스를 생성합니다.

Helm 차트를 사용하여 Kafka를 배포하려면 다음 단계를 수행합니다.

  1. Docker 액세스를 구성합니다.

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. Artifact Registry에 Kafka 및 Zookeeper 이미지를 채웁니다.

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. 기본 클러스터에 대한 kubectl 명령줄 액세스를 구성합니다.

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --location=${REGION} \
        --project=${PROJECT_ID}
    
  4. 네임스페이스를 만듭니다.

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. Helm 차트 버전 20.0.6을 사용하여 Kafka를 설치합니다.

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    출력은 다음과 비슷합니다.

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. Kafka 복제본이 실행 중인지 확인합니다(몇 분 정도 걸릴 수 있음).

    kubectl get all -n kafka
    

    출력은 다음과 비슷합니다.

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

테스트 데이터 만들기

이 섹션에서는 Kafka 애플리케이션을 테스트하고 메시지를 생성합니다.

  1. Kafka 애플리케이션과 상호작용하기 위해 소비자 클라이언트 포드를 만듭니다.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. 파티션이 3개 있고 복제 계수가 3인 topic1이라는 주제를 만듭니다.

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. 주제 파티션이 3개 브로커에 모두 복제되었는지 확인합니다.

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    출력은 다음과 비슷합니다.

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    출력 예시에서 topic1에는 3개 파티션이 있고, 각 파티션마다 서로 다른 리더 및 복제본 집합이 포함되어 있습니다. 이것은 확장성 및 내결함성 향상을 위해 Kafka가 파티션 나누기를 사용하여 여러 브로커에 데이터를 분산하기 때문입니다. 복제 계수 3은 각 파티션에 복제본을 3개 포함하여 한 두 개의 브로커가 실패하더라도 데이터를 계속 사용할 수 있게 보장합니다.

  4. 다음 명령어를 실행하여 메시지 번호를 topic1에 대량으로 생성합니다.

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. 다음 명령어를 실행하여 모든 파티션에서 topic1을 소비합니다.

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    CTRL+C를 입력하여 소비자 프로세스를 중지합니다.

Kafka 벤치마크

사용 사례를 정확하게 모델링하기 위해서는 클러스터에서 예상 부하에 대한 시뮬레이션을 실행할 수 있습니다. 성능을 테스트하려면 bin 폴더에 있는 kafka-producer-perf-test.shkafka-consumer-perf-test.sh 스크립트와 같이 Kafka 패키지에 포함된 도구를 사용합니다.

  1. 벤치마킹 주제를 만듭니다.

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. Kafka 클러스터에 부하를 만듭니다.

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    제작자가 topic-benchmark에서 10,000,000개 레코드를 생성합니다. 출력은 다음과 비슷합니다.

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    모든 레코드가 전송되었으면 다음과 비슷하게 출력에 추가 측정항목이 표시됩니다.

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    확인을 종료하려면 CTRL + C를 입력합니다.

  3. 포드 셸을 종료합니다.

    exit
    

업그레이드 관리

Kafka 및 Kubernetes의 버전 업데이트는 정기적으로 출시됩니다. 작업 권장사항에 따라 소프트웨어 환경을 정기적으로 업데이트하세요.

Kafka 바이너리 업그레이드 계획

이 섹션에서는 Helm을 사용하여 Kafka 이미지를 업데이트하고 주제를 계속 사용할 수 있는지 확인합니다.

클러스터에 Kafka 배포에서 사용한 Helm 차트로부터 이전 Kafka 버전에서 업그레이드하려면 다음 단계를 수행합니다.

  1. Artifact Registry에 다음 이미지를 채웁니다.

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. 업그레이드된 Kafka 및 Zookeeper 이미지로 Helm 차트를 배포하려면 다음 단계를 수행합니다. 버전 관련 안내는 Kafka 버전 업그레이드 안내를 참조하세요.

    1. Chart.yaml 종속 항목 버전을 업데이트합니다.
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. 다음 예시에 표시된 것처럼 새 Kafka 및 Zookeeper 이미지를 사용하여 Helm 차트를 배포합니다.

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    Kafka 포드가 업그레이드되는 것을 확인합니다.

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    확인을 종료하려면 CTRL + C를 입력합니다.

  3. 클라이언트 포드를 사용하여 Kafka 클러스터에 연결합니다.

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. topic1에서 메시지에 액세스할 수 있는지 확인합니다.

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    이전 단계에서 생성된 메시지가 출력에 표시됩니다. 프로세스를 종료하려면 CTRL+C를 입력합니다.

  5. 클라이언트 포드를 종료합니다.

    exit
    

재해 복구 대비

서비스 중단 이벤트 발생 시 프로덕션 워크로드를 계속 사용할 수 있도록 하려면 재해 복구(DR) 계획을 준비해야 합니다. DR 계획에 대한 자세한 내용은 재해 복구 계획 가이드를 참조하세요.

GKE 클러스터에서 워크로드를 백업 및 복원하려면 Backup for GKE를 사용하면 됩니다.

Kafka 백업 및 복원 시나리오 예시

이 섹션에서는 gke-kafka-us-central1에서 클러스터 백업을 가져오고 백업을 gke-kafka-us-west1에 복원합니다. ProtectedApplication 커스텀 리소스를 사용하여 애플리케이션 범위에서 백업 및 복원 작업을 수행합니다.

다음 다이어그램은 재해 복구 솔루션의 구성요소와 상호 관계를 보여줍니다.

가용성이 높은 Kafka 클러스터의 백업 및 복구 솔루션 예시를 보여주는 다이어그램
그림 3: 고가용성 Kafka 클러스터의 백업 및 복구 솔루션 예시

Kafka 클러스터 백업 및 복원을 준비하려면 다음 단계를 수행합니다.

  1. 환경 변수 설정

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. 클러스터가 RUNNING 상태인지 확인합니다.

    gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
    
  3. 백업 계획을 만듭니다.

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. 수동으로 백업을 만듭니다. 예약된 백업은 일반적으로 백업 계획에서 크론 일정으로 제어되지만 다음 예시는 일회성 백업 작업을 시작하는 방법을 보여줍니다.

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. 복원 계획을 만듭니다.

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. 백업에서 수동으로 복원합니다.

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. 복원된 애플리케이션이 백업 클러스터에 표시되는 것을 확인합니다. 모든 포드가 실행되고 준비될 때까지 몇 분 정도 걸릴 수 있습니다.

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --location us-west1
    kubectl get pod -n kafka --watch
    

    모든 포드가 작동되어 실행되면 CTRL+C를 입력하여 확인을 종료합니다.

  8. 소비자가 이전 주제를 가져올 수 있는지 확인합니다.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    출력은 다음과 비슷합니다.

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    프로세스를 종료하려면 CTRL+C를 입력합니다.

  9. 포드를 종료합니다.

    exit
    

Kafka 서비스 중단 시뮬레이션

이 섹션에서는 브로커를 호스팅하는 Kubernetes 노드를 교체하여 노드 오류를 시뮬레이션합니다. 이 섹션은 Standard 버전에만 적용됩니다. Autopilot에서는 노드가 자동으로 관리되므로 노드 오류를 시뮬레이션할 수 없습니다.

  1. Kafka 애플리케이션에 연결하기 위해 클라이언트 포드를 만듭니다.

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. topic-failover-test 주제를 만들고 테스트 트래픽을 생성합니다.

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. topic-failover-test 주제 리더로 브로커를 결정합니다.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    출력은 다음과 비슷합니다.

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    위 출력에서 Leader: 1topic-failover-test의 리더가 브로커 1입니다. 이것은 포드 kafka-1에 해당합니다.

  4. 새 터미널을 열고 동일한 클러스터에 연결합니다.

    gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
    
  5. kafka-1 포드가 실행되는 노드를 찾습니다.

    kubectl get pod -n kafka kafka-1 -o wide
    

    출력은 다음과 비슷합니다.

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    위 출력에서 kafka-1 포드는 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 노드에서 실행됩니다.

  6. 노드를 드레이닝하여 포드를 제거합니다.

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    NODE를 kafka-1 포드가 실행되는 노드로 바꿉니다. 이 예시에서는 노드가 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72입니다.

    출력은 다음과 비슷합니다.

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. kafka-1 포드가 실행되는 노드를 찾습니다.

    kubectl get pod -n kafka kafka-1 -o wide
    

    출력은 다음과 비슷하게 표시됩니다.

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    위 출력에서는 애플리케이션이 새 노드에서 실행됩니다.

  8. kafka-client 포드에 연결된 터미널에서 topic-failover-test의 리더인 브로커를 결정합니다.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    출력은 다음과 비슷하게 표시됩니다.

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    예시 출력에서 리더는 계속 1입니다. 하지만 이제 새 노드에서 실행됩니다.

Kafka 리더 오류 테스트

  1. Cloud Shell에서 Kafka 클라이언트에 연결하고 describe를 사용하여 topic1의 각 파티션에 대해 선택된 리더를 확인합니다.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    출력은 다음과 비슷합니다.

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. Kafka 클라이언트에 연결되지 않은 Cloud Shell에서 새 리더 선택을 강제하기 위해 kafka-0 리더 브로커를 삭제합니다. 이전 출력의 리더 중 하나에 매핑되는 색인을 삭제해야 합니다.

    kubectl delete pod -n kafka kafka-0 --force
    

    출력은 다음과 비슷합니다.

    pod "kafka-0" force deleted
    
  3. Kafka 클라이언트에 연결된 Cloud Shell에서 describe를 사용하여 선택된 리더를 확인합니다.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    출력은 다음과 비슷합니다.

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    출력에서 중단된 리더(kafka-0)에 할당된 경우 각 파티션의 새 리더가 변경됩니다. 이것은 포드가 삭제되고 다시 생성될 때 원래 리더가 교체되었음을 나타냅니다.