使用 MirrorMaker 2.0 将 Kafka 数据迁移到 Google Cloud

本文档可帮助您将 Apache Kafka 工作负载迁移到 Google Cloud Managed Service for Apache Kafka( Google Cloud中的一项代管式服务)。

Managed Service for Apache Kafka 可帮助您在 Google Cloud上运行 Apache Kafka。在此解决方案文档中,您将数据从外部 Apache Kafka 集群迁移到 Managed Service for Apache Kafka 集群。

如需详细了解 Managed Service for Apache Kafka,请参阅 Managed Service for Apache Kafka 概览

我们建议您使用 Apache Kafka MirrorMaker 2.0 进行此迁移。

MirrorMaker 2.0 是一款用于在 Apache Kafka 集群之间实时复制数据的工具。它可用于数据迁移、灾难恢复、数据隔离和数据汇总。

如需详细了解 MirrorMaker 2.0,请参阅下一部分。

什么是 MirrorMaker 2.0

MirrorMaker 2.0 使用 Kafka Connect 框架在 Kafka 集群之间复制数据。Kafka Connect 是一个用于在 Kafka 集群和其他系统之间流式传输数据的框架。它充当可伸缩且可靠的流水线。 此框架通过使用现成的连接器,简化了 Kafka 与各种外部系统(例如数据库、消息队列和在线存储)的集成。以下列出了您可以使用 MirrorMaker 2.0 的一些可能场景:

  • 数据迁移:将 Kafka 工作负载迁移到新集群,如本指南所示。

  • 灾难恢复:创建备份集群,以确保在发生故障时业务能够持续运行。

  • 数据隔离:选择性地将主题复制到公共集群,同时在私有集群中确保敏感数据的安全。

  • 数据聚合:将多个 Kafka 集群中的数据整合到一个中央集群中,以用于分析。

MirrorMaker 2.0 支持 Kafka 2.4.0 及更高版本,并提供以下主要功能:

  • 全面复制:复制所有必需的组件,包括主题、数据和配置、具有偏移量的消费者群组以及 ACL。

  • 分区保留:在目标集群中保持相同的分区方案,从而简化应用的过渡。

  • 自动创建主题和分区:自动检测并复制新主题和分区,最大限度地减少手动配置。

  • 监控功能:提供端到端复制延迟时间等基本指标,让您能够跟踪复制过程的健康状况和性能。

  • 容错能力和可伸缩性:即使在数据量很大的情况下也能确保可靠运行,并且可以横向扩缩以应对不断增加的工作负载。

  • 用于提高鲁棒性的内部主题:利用内部主题进行偏移同步、检查点和心跳。这些主题具有可配置的复制因子(例如 offset.syncs.topic.replication.factor),以确保高可用性和容错性。

MirrorMaker 2.0 提供两种部署模式:

  • 专用集群模式:MirrorMaker 2.0 作为独立集群运行,管理自己的工作器。本文档重点介绍此模式,并提供了一个有关其部署和配置的实用示例。

  • Kafka Connect 集群模式:MirrorMaker 2.0 在现有 Kafka Connect 集群中作为连接器运行。

高级别工作流程

下图展示了使用 MirrorMaker 2.0 将数据从源 Apache Kafka 集群迁移到 Managed Service for Apache Kafka 集群的架构。

使用 MirrorMaker 2.0 迁移 Kafka 数据。
图 1. 使用 MirrorMaker 2.0 将数据从源 Apache Kafka 集群迁移到 Managed Service for Apache Kafka 集群。

以下是这些组件的协同工作方式:

  • 源集群:这表示您现有的 Apache Kafka 集群,该集群可以位于本地或其他云环境中。其中包含您要迁移的主题。在此图中,源 Apache Kafka 集群包含三个主题,分别是主题 A、B 和 C。

  • MirrorMaker 2.0:此核心组件部署在 Compute Engine 虚拟机上,作为专用 MirrorMaker 2.0 集群,可主动将数据从源 Apache Kafka 集群复制到目标 Managed Service for Apache Kafka 集群。重要的是,如果目标集群中不存在相应的主题和分区,该工具还会自动创建它们,从而镜像源集群设置。

  • 目标集群:这是您的 Managed Service for Apache Kafka 集群。 它将成为 Kafka 数据的新家,MirrorMaker 2.0 可确保创建的主题和分区与来源环境相匹配。

下面简要介绍了迁移流程。

  1. 初步评估

    • 记录现有的 Kafka 设置,包括集群大小、主题、吞吐量和使用方群组。

    • 规划迁移目标和策略,包括停机容忍度和割接方法。

    • 估算 Managed Service for Apache Kafka 集群所需的资源。

  2. 准备工作

    • 创建 Managed Service for Apache Kafka 集群。

    • 配置现有 Kafka 集群与您刚刚创建的 Managed Service for Apache Kafka 集群之间的网络连接。

    • 在 Google Cloud 虚拟机上部署 MirrorMaker 2.0。

  3. 迁移执行

    • 配置 MirrorMaker 2.0 以将数据从现有 Kafka 集群复制到 Managed Service for Apache Kafka 集群。

    • 使用 MirrorMaker 2.0 指标监控复制过程。

    • 将消费者和生产者逐步迁移到新的 Managed Service for Apache Kafka 集群。

  4. 验证和割接

    • 验证 Managed Service for Apache Kafka 集群中的数据完整性和应用功能。

    • 执行最终割接,将流量重定向到 Managed Service for Apache Kafka 集群。

    • 停用旧的 Kafka 集群。

  5. 迁移后

    • 持续监控 Managed Service for Apache Kafka 集群的性能。

    • 查看并更新文档以反映这些更改。

最大限度地减少迁移停机时间

本部分概述了使用 MirrorMaker 2.0 将开源 Kafka 数据迁移到 Managed Service for Apache Kafka 的一些注意事项。MirrorMaker 2.0 可帮助您复制数据和偏移量,让消费者能够从新集群中的正确位置继续使用。不过,仔细规划对于最大限度地减少迁移过程中的停机时间至关重要。不妨考虑以下策略:

  • 并行部署:为了在切换到新的 Managed Service for Apache Kafka 集群时最大限度地减少停机时间,您可以在旧集群和新集群上并行运行应用实例。在此过渡期间,暂时停用应用中必须针对每条消息只执行一次的任何操作,例如发送通知。停用这些副作用,以防止因两次处理同一消息而导致意外后果。在新实例完全赶上进度后,将所有流量重定向到新集群,然后重新启用所有功能。

  • 分阶段迁移:以更小、更易于管理的阶段进行迁移,从不太关键的应用开始。这种方法有助于隔离潜在问题,并最大限度地减少任何中断的影响。

  • 蓝绿部署:在现有生产环境(蓝色)旁边创建一个完整的生产环境副本(绿色)。逐步将流量从蓝色割接到绿色,以便在最终割接之前进行测试和验证。此方法可最大限度地减少停机时间,但需要提高资源利用率。

  • 消息处理要求:了解应用对重复或丢失消息的容忍程度,并相应地配置消费者。MirrorMaker 2.0 提供多种配置来处理消息传递语义。 例如,sync.group.offsets.enabled 支持消费者偏移量同步。 使用者可以使用同步的偏移量从源集群中上次停止的位置继续读取。这样做可以防止消息丢失或收到过多重复消息。

  • 沟通和协调:与应用团队进行有效沟通对于顺利迁移至关重要。建立清晰的沟通渠道,并协调切换时间。

将本地 Apache Kafka 连接到 Google Cloud

如果源 Apache Kafka 集群位于本地,您需要在本地网络与 Managed Service for Apache Kafka 集群所在的虚拟私有云 (VPC) 之间建立安全连接。使用 Google Cloud中的以下任一选项。

  • Cloud VPN:一种经济高效的解决方案,适合对带宽需求较低或进行初始迁移实验的场景。它在公共互联网上建立一条加密隧道。如需详细了解 Cloud VPN,请参阅 Cloud VPN 概览

  • Cloud Interconnect:在本地网络与 Google Cloud之间提供专用高带宽连接。这非常适合需要更高吞吐量和更低延迟的企业级部署。您可以选择专用互连(用于直接物理连接)或合作伙伴互连(通过受支持的服务提供商进行连接)。如需详细了解 Google Cloud互联文档,请参阅 Cloud Interconnect 概览

创建 Managed Service for Apache Kafka 集群时,您必须在 VPC 中选择至少一个子网。此子网提供集群用于与 VPC 中的其他资源通信的 IP 地址,从而使集群可在 VPC 网络中访问。

如需从本地网络或其他 VPC 网络安全地连接到 Managed Service for Apache Kafka 集群,您可以通过 Cloud VPN 或 Cloud Interconnect 使用 Private Service Connect (PSC)。您无需明确设置 PSC 端点。在集群创建期间选择子网时,Managed Service for Apache Kafka 服务会自动创建必要的 PSC 端点。这样一来,您就可以使用 VPC 内的内部 IP 地址访问集群,而无需管理复杂的防火墙规则或公共 IP 地址,从而简化网络配置。

如需详细了解 Managed Service for Apache Kafka 的网络设置,请参阅 Managed Service for Apache Kafka 的网络

准备工作

在开始创建迁移设置之前,您必须记录当前的 Apache Kafka 设置。您需要此信息,以便计算新的 Managed Service for Apache Kafka 集群所需的资源(例如 vCPU、内存和存储空间)。收集有关源 Apache Kafka 环境的以下信息:

  • 确保 Apache Kafka 版本为 2.4.0 或更高版本。

    如需检查 Apache Kafka 集群的版本,请前往 Kafka 安装目录并运行命令 bin/kafka-topics.sh --version

  • 确定需要迁移的集群和主题。

  • 确定与每个主题关联的提供方和使用方。

  • 确定所有消费群体。

  • 确定集群级层和主题级层的消息吞吐量。

  • 确定集群和主题的复制因子。

  • 记录消费者配置,尤其是安全协议以及与任何其他 Google Cloud 服务的集成。

为避免在迁移期间出现中断,请规划与源 Kafka 集群相关的所有应用依赖项。在迁移生产环境之前,请先在开发环境中使用非关键集群进行测试迁移。验证流程并确定任何潜在问题。最后,制定全面的回滚方案,以便在必要时恢复到原始集群。

计算目标集群大小

如需估算 Managed Service for Apache Kafka 集群所需的 vCPU 数量和内存大小,请参阅规划 Kafka 集群规模。 磁盘和代理配置是自动的,无法调整。

开源 Kafka 提供 JMX 指标。为了准确计算 Managed Service for Apache Kafka 所需的集群大小,您可以使用以下 JMX 指标。这些指标是在代理级报告的。您必须汇总所有代理中的数据,才能计算出集群吞吐量。

  • kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:此指标报告来自客户端的入站字节速率(涵盖所有主题)。 省略 topic={...} 参数即可获取所有主题的汇总费率。

  • kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:此指标报告所有主题中向客户端发送的字节速率。 省略 topic={...} 参数即可获得总体比率。

通过在一段时间内监控这些 JMX 指标,您可以收集数据点来计算以下各项:

  • 平均数据传入速率(MB/秒):此指标表示数据被提取到 Kafka 集群中的平均速率。

  • 峰值数据传入速率(MB/秒):此指标表示数据被提取到 Kafka 集群中的最高速率。

  • 平均数据输出量(MB/秒):此指标表示从 Kafka 集群消耗数据的平均速率。

  • 峰值数据输出量(MB/秒):此指标表示从 Kafka 集群消耗数据的最高速率。

可能需要进行一些指标计算,才能汇总数据并将字节转换为 MB。使用这些计算出的值,您可以按如下方式估算写入等效速率:

Write-equivalent rate (Avg/Peak) = (total write bandwidth) + (total read bandwidth / 4)

此写入等效速率有助于确定集群的总体写入负载,这是适当调整 Managed Service for Apache Kafka 集群大小所必需的。

创建 Managed Service for Apache Kafka 集群

Managed Service for Apache Kafka 集群位于特定的Google Cloud 项目和区域中。可以使用任何虚拟私有云 (VPC) 中一个或多个子网内的一组 IP 地址来访问该服务。

集群的大小取决于您为其分配的 CPU 数量和总 RAM。在这种情况下,集群大小必须与源 Apache Kafka 集群的大小相同。如需详细了解如何执行此计算,请参阅计算目标集群大小

如需获得创建集群所需的权限,请让管理员向您或创建集群的服务账号授予项目的 Managed Kafka Admin (roles/managedkafka.admin) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

如需创建 Managed Service for Apache Kafka 集群,请按照通过 CLI 生成和使用消息中的快速入门说明操作。 创建集群通常需要 20-30 分钟。

以独立集群模式设置 MirrorMaker 2.0

如需查看概念验证文档和示例代码,了解如何使用 MirrorMaker 2.0 和 Terraform 将 Kafka 数据传输到 Google Cloud,请参阅此 GitHub 代码库

本部分将指导您在 Google Cloud 虚拟机上以独立集群模式安装和配置 MirrorMaker 2.0。通过此设置,您可以将数据从现有 Apache Kafka 集群复制到 Managed Service for Apache Kafka 集群。

  1. 在已获准访问 Managed Service for Apache Kafka 集群的同一网络中创建虚拟机。使用 gcloud compute instances create 命令。

    gcloud compute instances create VM_NAME\
     --zone=ZONE\
     [--image=IMAGE | --image-family=IMAGE_FAMILY]\
     --image-project=IMAGE_PROJECT\
     --machine-type=MACHINE_TYPE
    

    替换以下内容:

    • VM_NAME:您要创建的虚拟机的名称。
    • ZONE:您要在其中创建虚拟机的可用区。
    • IMAGEIMAGE_FAMILY:您要用于虚拟机的映像或映像系列。
    • IMAGE_PROJECT:映像所在的项目。
    • MACHINE_TYPE:您要用于虚拟机的机器类型。
  2. 如需访问新创建的虚拟机,您可以使用 SSH。

    如需详细了解 SSH 连接,请参阅关于 SSH 连接

  3. 如需下载 Kafka 并进行提取,请在新虚拟机的终端窗口中运行以下命令:

    wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz
    tar -xzvf kafka_2.13-3.7.1.tgz
    
  4. 下载 Java,提取软件包,然后设置 Java 路径。

    # Download Java
    wget https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
    # Extract Java
    tar -xzvf openjdk-11.0.2_linux-x64_bin.tar.gz
    # Set Java path
    export PATH=$PATH:/java/jdk-11.0.2/bin/
    
  5. 修改 path/to/kafka/config/mm2.properties 文件并更新以下属性:

    clusters = source, target
    source.bootstrap.servers = <source_kafka_bootstrap_servers>
    target.bootstrap.servers = <target_kafka_bootstrap_servers>
    source.security.protocol = SASL_SSL
    source.sasl.mechanism = PLAIN
    source.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule
    required username="<source_kafka_username>" password="<source_kafka_password>";
    
    target.security.protocol = SASL_SSL
    target.sasl.mechanism = PLAIN
    target.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule
    required username="<target_kafka_username>" password="<target_kafka_password>";
    
    mirrors = source->target
    source->target.enabled=true
    
    topics = .*
    groups = .*
    
    offset.syncs.topic.replication.factor = 3
    checkpoints.topic.replication.factor = 3
    heartbeats.topic.replication.factor = 3
    emit.checkpoints.interval.seconds = 10
    
    • source_kafka_bootstrap_serverstarget_kafka_bootstrap_servers 分别替换为源 Kafka 集群和目标 Kafka 集群的引导服务器地址。您可以使用 managed-kafka clusters describe Google Cloud CLI 命令获取 Managed Service for Apache Kafka 的引导服务器地址。

    • source_kafka_usernamesource_kafka_password 替换为源 Kafka 集群的凭据。

    • target_kafka_usernametarget_kafka_password 替换为目标 Managed Service for Apache Kafka 集群的凭据。如需配置用户名和密码,请参阅 SASL/PLAIN 身份验证

    • topics = .\*groups = .\* 设置会复制所有主题和消费者群组。您可以根据需要修改这些设置,使其更加具体。

    • offset.syncs.topic.replication.factor = 3 设置用于设置 MirrorMaker 2.0 所用内部主题的复制因子,以在源集群和目标集群之间同步使用方偏移量。复制因子为 3 表示偏移量数据会复制到目标集群中的三个代理,从而确保更高的可用性和容错性。

    • checkpoints.topic.replication.factor = 3 设置用于设置 MirrorMaker 2.0 存储检查点的另一个内部主题的复制因子。检查点有助于 MirrorMaker 2.0 跟踪其进度,并在发生故障或重新启动时从正确的点恢复复制。

    • heartbeats.topic.replication.factor = 3 设置用于设置 MirrorMaker 2.0 发送检测信号所用内部主题的复制因子。检测信号表示 MirrorMaker 2.0 进程处于活跃状态。 较高的复制因子可确保可靠地存储这些心跳,并可用于监控复制过程的运行状况。

    • emit.checkpoints.interval.seconds = 10 设置用于控制 MirrorMaker 2.0 发出检查点的频率。在这种情况下,每 10 秒会发出一次检查点。此频率可在跟踪进度和最大限度地减少写入检查点的开销之间取得平衡。

  6. 启动 MirrorMaker 2.0。使用 connect-mirror-maker.sh 脚本启动该进程。

    该脚本以独立模式启动 MirrorMaker 2.0,并开始将数据从源 Kafka 集群复制到 Managed Service for Apache Kafka 集群。

其他注意事项:

  • 网络:确保您的 Google Cloud 虚拟机与源 Kafka 集群和目标 Managed Service for Apache Kafka 集群都具有网络连接。如果您的源集群位于本地,您可能需要配置 VPN 或互连。

  • 安全性:配置适当的安全协议和防火墙规则,以保护 MirrorMaker 2.0 实例和 Kafka 集群。

按照这些步骤操作,您可以在 Google Cloud 虚拟机上以独立集群模式成功安装和配置 MirrorMaker 2.0,从而轻松将 Kafka 数据迁移到 Managed Service for Apache Kafka。

监控

监控 MirrorMaker 2.0 进程,确保其正常运行并按预期复制数据。您可以使用 MirrorMaker 2 的内置指标或其他监控工具。迁移应用后,请监控以下内容以验证迁移是否成功:

  • 下游吞吐率:确保下游吞吐率没有显著变化。例如,如果您在下游使用 Dataflow,则与 Kafka 相关的吞吐量和指标必须保持一致。

  • CPU 和内存利用率:使用 Cloud Monitoring 监控 Managed Service for Apache Kafka 集群的 CPU 和内存利用率。 为确保最佳性能,利用率最好保持在 75% 以下。

  • 错误日志:定期检查 Cloud Logging 中是否有与 Managed Service for Apache Kafka 集群或应用相关的任何错误日志。请及时解决所有错误,以免造成中断。

限制

  • MirrorMaker 2.0 要求源 Apache Kafka 集群的版本为 2.4.0 或更高版本。