高可用性 Kafka クラスタを GKE にデプロイする

Kafka は、オープンソースとして提供されているパブリッシュ / サブスクライブ型の分散メッセージング システムで、大規模なリアルタイム ストリーミング データを高スループットで処理します。Kafka を使用すると、システムやアプリケーション間でデータを確実に移動し、処理と分析を行うストリーミング データ パイプラインを構築できます。

このチュートリアルは、Google Kubernetes Engine(GKE)に高可用性 Kafka クラスタをデプロイすることに関心があるプラットフォーム管理者、クラウド アーキテクト、運用担当者を対象としています。

クラスタ インフラストラクチャを作成する

このセクションでは、Terraform スクリプトを実行して、2 つのリージョン GKE クラスタを作成します。プライマリ クラスタは us-central1 にデプロイされます。

クラスタの作成手順は次のとおりです。

Autopilot

Cloud Shell で、次のコマンドを実行します。

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

プロンプトが表示されたら、「yes」と入力します。

Standard

Cloud Shell で、次のコマンドを実行します。

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

プロンプトが表示されたら、「yes」と入力します。

Terraform 構成ファイルにより、インフラストラクチャのデプロイに必要な次のリソースが作成されます。

  • Docker イメージを保存するための Artifact Registry リポジトリを作成します。
  • VM のネットワーク インターフェース用に VPC ネットワークとサブネットを作成します。
  • 2 つの GKE クラスタを作成します。

Terraform は、2 つのリージョンに限定公開クラスタを作成し、障害復旧用に Backup for GKE を有効にします。

Kafka をクラスタにデプロイする

このセクションでは、Helm チャートを使用して GKE に Kafka をデプロイします。このオペレーションで次のリソースが作成されます。

Helm チャートを使用して Kafka をデプロイするには、次の操作を行います。

  1. Docker アクセスを構成します。

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. Artifact Registry に Kafka と Zookeeper のイメージを登録します。

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. プライマリ クラスタに対する kubectl コマンドライン アクセスを構成します。

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --location=${REGION} \
        --project=${PROJECT_ID}
    
  4. Namespace を作成します。

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. Helm チャート バージョン 20.0.6 を使用して Kafka をインストールします。

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    出力は次のようになります。

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. Kafka レプリカが実行されていることを確認します(数分かかる場合があります)。

    kubectl get all -n kafka
    

    出力は次のようになります。

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

テストデータを作成する

このセクションでは、Kafka アプリケーションをテストし、メッセージを生成します。

  1. Kafka アプリケーションを操作するコンシューマー クライアント Pod を作成します。

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. 3 つのパーティションがあり、レプリケーション係数が 3 のトピックを topic1 という名前で作成します。

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. トピック パーティションが 3 つのブローカーすべてに複製されていることを確認します。

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力は次のようになります。

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    出力例では、topic1 に 3 つのパーティションがあり、それぞれに異なるリーダーとレプリカのセットがあることに注意してください。これは、Kafka がパーティショニングを使用して複数のブローカー間でデータを分散するためで、より大規模なスケーラビリティやより強固なフォールト トレランスを実現できます。レプリケーション係数が 3 の場合、各パーティションには 3 つのレプリカが必ず存在します。この場合、1 つまたは 2 つのブローカーに障害が発生しても引き続きデータを使用できます。

  4. 次のコマンドを実行して、メッセージ番号を topic1 に一括で生成します。

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. すべてのパーティションから topic1 を使用するには、次のコマンドを実行します。

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    CTRL+C」と入力して、コンシューマー プロセスを停止します。

Kafka のベンチマーク

ユースケースを正確にモデル化するために、クラスタで予想される負荷のシミュレーションを実行できす。パフォーマンスをテストするには、Kafka パッケージに含まれるツール(bin フォルダの kafka-producer-perf-test.sh スクリプトと kafka-consumer-perf-test.sh スクリプト)を使用します。

  1. ベンチマーク用にトピックを作成します。

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. Kafka クラスタで負荷を発生させます。

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    プロデューサーが topic-benchmark で 10,000,000 件のレコードを生成します。出力は次のようになります。

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    すべてのレコードが送信されると、次のような追加指標が出力に表示されます。

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    監視を終了するには、「CTRL + C」と入力します。

  3. Pod のシェルを終了します。

    exit
    

アップグレードを管理する

Kafka と Kubernetes のバージョンは定期的に更新されています。運用のベスト プラクティスに従って、ソフトウェア環境を定期的にアップグレードしてください。

Kafka バイナリ アップグレードを計画する

このセクションでは、Helm を使用して Kafka イメージを更新し、トピックがまだ利用できることを確認します。

クラスタに Kafka をデプロイするで使用した Helm チャートで以前の Kafka バージョンからアップグレードするには、次の操作を行います。

  1. Artifact Registry に次のイメージを登録します。

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. 次の手順で、アップグレードされた Kafka と Zookeeper のイメージを使用して Helm チャートをデプロイします。バージョン固有のガイダンスについては、バージョン アップグレード用の Kafka の手順をご覧ください。

    1. Chart.yaml 依存関係のバージョンを更新します。
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. 次の例のように、新しい Kafka と Zookeeper のイメージを使用して Helm チャートをデプロイします。

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    Kafka Pod のアップグレードを監視します。

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    監視を終了するには、「CTRL + C」と入力します。

  3. クライアント Pod を使用して Kafka クラスタに接続します。

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. topic1 からのメッセージにアクセスできることを確認します。

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力には、前の手順で生成されたメッセージが表示されます。CTRL+C と入力してプロセスを終了します。

  5. クライアント Pod を終了します。

    exit
    

障害復旧に備える

サービス中断イベントの発生時に本番環境のワークロードの可用性を確保するには、障害復旧(DR)計画を準備する必要があります。DR 計画の詳細については、障害復旧計画ガイドをご覧ください。

GKE クラスタでワークロードをバックアップして復元するには、Backup for GKE を使用します。

Kafka のバックアップと復元の例

このセクションでは、クラスタのバックアップを gke-kafka-us-central1 から入手して、gke-kafka-us-west1 に復元します。ProtectedApplication カスタム リソースを使用して、アプリケーション スコープでバックアップと復元のオペレーションを行います。

次の図は、障害復旧ソリューションのコンポーネントと、それらのコンポーネントがどのように関係しているのかを示しています。

この図は、高可用性 Kafka クラスタ用のバックアップと復元のソリューションの例を示しています。
図 3: 高可用性 Kafka クラスタ用のバックアップと復元のソリューション。

Kafka クラスタのバックアップと復元を準備するには、次の操作を行います。

  1. 環境変数を設定します。

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. クラスタが RUNNING 状態であることを確認します。

    gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
    
  3. バックアップ プランを作成します。

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. バックアップを手動で作成します。スケジュール バックアップは通常、バックアップ プランの cron スケジュールによって管理されますが、次の例は、1 回限りのバックアップ オペレーションを開始する方法を示しています。

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. 復元プランを作成します。

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. バックアップから手動で復元します。

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. バックアップ クラスタに復元されたアプリケーションを監視します。すべての Pod が実行されて準備が整うまでに数分かかることがあります。

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --location us-west1
    kubectl get pod -n kafka --watch
    

    CTRL+C」と入力して、すべての Pod が稼働状態になったら監視を終了します。

  8. コンシューマーが以前のトピックを取得できるかどうか確認します。

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    出力は次のようになります。

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    CTRL+C と入力してプロセスを終了します。

  9. Pod を終了します。

    exit
    

Kafka サービスの停止をシミュレートする

このセクションでは、ブローカーをホストしている Kubernetes ノードを置き換えて、ノード障害をシミュレートします。このセクションの内容は、Standard にのみ適用されます。Autopilot の場合、ノードは自動的に管理されるため、ノードの障害をシミュレートすることはできません。

  1. クライアント Pod を作成して Kafka アプリケーションに接続します。

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. トピック topic-failover-test を作成し、テスト トラフィックを生成します。

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. topic-failover-test トピックのリーダー ブローカーを決めます。

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力は次のようになります。

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    上記の出力で、Leader: 1topic-failover-test のリーダーがブローカー 1 であることを意味します。これは Pod kafka-1 に対応します。

  4. 新しいターミナルを開き、同じクラスタに接続します。

    gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
    
  5. Pod kafka-1 が実行されているノードを確認します。

    kubectl get pod -n kafka kafka-1 -o wide
    

    出力は次のようになります。

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    上記の出力から、Pod kafka-1 がノード gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 で実行されていることがわかります。

  6. ノードをドレインして Pod を強制排除します。

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    NODE は、Pod kafka-1 が実行されているノードに置き換えます。この例の場合、ノードは gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 です。

    出力は次のようになります。

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. Pod kafka-1 が実行されているノードを確認します。

    kubectl get pod -n kafka kafka-1 -o wide
    

    出力は次のようになります。

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    上記の出力から、アプリケーションが新しいノードで実行されていることがわかります。

  8. kafka-client Pod に接続されているターミナルで、topic-failover-test のリーダー ブローカーを特定します。

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力は次のようになります。

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    この出力例では、リーダーは 1 のままです。しかし、現在は新しいノードで実行されています。

Kafka リーダーの障害をテストする

  1. Cloud Shell で Kafka クライアントに接続し、describe を使用して topic1 の各パーティションに選出されているリーダーを確認します。

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力は次のようになります。

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. Kafka クライアントに接続していない Cloud Shell で、kafka-0 リーダー ブローカーを削除して、新しいリーダーを強制的に選択します。前の出力のリーダーの一つにマッピングされているインデックスを削除する必要があります。

    kubectl delete pod -n kafka kafka-0 --force
    

    出力は次のようになります。

    pod "kafka-0" force deleted
    
  3. Kafka クライアントに接続されている Cloud Shell で describe を使用して、選択されたリーダーを確認します。

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    出力は次のようになります。

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    出力で、中断されたリーダー(kafka-0)が割り当てられていた場合、各パーティションで新しいリーダーに変更されます。これは、Pod が削除されて再作成されたときに、元のリーダーが置き換えられたことを示します。