Kafka 系列 八 跨集群数据镜像
在之前系列文章中,我们讨论了一个 Kafka 集群的搭建、维护和使用,而在实际情况中我们往往拥有多个 Kafka 集群,而且这些 Kafka 集群很可能是相互隔离的。一般来说,这些集群之间不需要进行数据交流,但如果在某些情况下这些集群之间存在数据依赖,那么我们可能需要持续的将数据从一个集群复制到另一个集群。而由于“复制”这个术语已经被用于描述同一集群内的副本冗余,因此我们将跨集群的数据复制称为数据镜像(Mirroring)。另外,Kafka 中内置的跨集群数据复制器称为 MirrorMaker。
跨集群数据镜像的用户场景
以下为跨集群数据镜像的一些典型用户场景:
- 区域集群与中心集群:很多公司往往有多个数据中心,而且每个数据中心维护独立的 Kafka 集群。一般的应用可能只需要跟本地集群通信即可,但存在一些应用需要所有集群的数据。比如,一个公司在每个城市都有一个数据中心,并且该中心维护相应城市的产品供需数据以及价格数据;这些数据需要汇总到一个中心集群以便进行公司维度的营利分析。
- 数据冗余:为了防止一个集群故障导致应用不可用,我们需要把数据同步到另一个集群,这样当一个集群出现故障,可以把应用的流量切换到备份集群。
- 云迁移:一般公司都维护有自己的数据中心,但随着云设施越来越便宜,很多公司会选择将服务迁移到云上。数据迁移与复制也是其中一个重要部分,我们可以使用 Kafka Connect 将数据库更新同步到本地 Kafka 集群,然后再把数据从本地 Kafka 集群同步到云上的 Kafka 集群。
多集群架构
上面列举了多集群的用户场景,现在来看下多集群的常见架构。但在讨论架构前,先来了解跨集群通信的一些现实因素。
跨集群通信的现实因素
- 高延迟:由于集群间的距离较长以及网络拓扑节点增多,集群的通信延迟也会增加。
- 带宽有限:广域网(WAN)带宽通常比机房内带宽要小得多,并且可用带宽可能无时无刻都在变化。
- 高成本:无论是自己维护的集群还是云上的集群,集群间通信的成本都是非常高的。这是因为带宽有限并且增加带宽会带来昂贵的成本,而且服务提供商对于跨集群、跨区域、跨云的数据传输会额外收取费用。
Kafka 的 broker 和生产者/消费者客户端都是基于一个集群来进行性能调优的,也就是说在低延迟和高吞吐的假设前提下,经过测试与验证从而得到了 Kafka 的超时和缓冲区默认值。因此,一般我们不推荐同一个集群的不同 broker 处于多个数据中心。大多数情况下,由于高延迟和网络错误,最好避免生产数据到另一个集群。当然,我们可以通过提高重试次数、增加缓冲区大小等手段来处理这些问题。
这么看,broker 跨集群、生产者-broker 跨集群这两种方案都被否决了,那么对于跨集群数据镜像,我们只剩下一种方案:broker-消费者跨集群。这种方案是最安全的,因为即便存在网络分区导致消费者不能消费数据,这些数据仍然保留在 broker 中,当网络恢复后消费者仍然可以读取。也就是说,无论网络状况如何,都不会造成数据丢失。另外,如果存在多个应用需要读取另一个集群的数据,我们可以在每个数据中心都搭建一个 Kafka 集群,使用集群数据镜像来只同步一次数据,然后应用从本地集群中消费数据,避免重复读取数据浪费广域网带宽。
下面是跨集群架构设计的一些准则:
- 每个数据中心都应该至少有一个 Kafka 集群;
- 集群间尽可能只同步一次数据;
- 跨集群消费数据由于跨集群生产数据。
中心集群架构
下面是多个本地集群和一个中心集群的架构,简单情况下只存在两个集群,即主集群和副本集群。
这种架构适用于,数据分布在多个数据中心而某些应用需要访问整体数据集。另外每个数据中心的应用可以处理本地数据,但无法访问全量数据。这种架构的主要优点在于,数据生产到本地,而且跨集群只复制一次数据(到中心集群)。只依赖本地数据的应用可以部署在本地集群,而依赖多数据中心的应用则部署在中心集群。这种架构也非常简单,因为数据流向是单向的,这使得部署、运维和监控非常容易。
它的主要缺点在于,区域的集群不能访问另一个集群的数据。比如,我们在每个城市维护一个 Kafka 集群来保存银行的用户信息和账户历史,并且将这些数据同步到中心集群以便做银行的商业分析。当用户访问本地的银行分支网站时,这些请求可以被分发到本地集群处理;但如果用户访问异地的银行分支网站时,要么该异地集群跟中心集群通信(此种方式不建议),要么直接拒绝请求(是的非常尴尬)。
多活架构
这种架构适用于多个集群共享数据,此架构主要优点在于,每个集群都可以处理用户的任何请求并且不阉割产品功能(与前一种架构对比),而且就近处理用户请求,响应时间可以大大降低。其次,由于数据冗余与弹性控制,一个集群出现故障,可以把用户请求导流到别的集群进行处理。
此架构主要缺点在于,由于多个集群都可以处理用户请求,异步的数据读取和更新无法保证全局数据一致性。下面列举一些可能会遇到的挑战:
- 如果用户发送一个事件到一个集群,然后从其他集群读取事件信息,那么由于事件复制延迟,很有可能读取不到该事件。比如,用户添加一本书到心愿单后,访问心愿单却看不到添加的书。为了解决这个问题,研发人员可能会将用户与集群进行绑定,使用同一个集群来处理用户请求(当然在集群故障情况下会转移)。
- 一个集群包含用户订购 A 的事件,另一个集群包含用户订购 B 的事件,而且这两个事件是几乎同时的。经过数据镜像后,每个数据中心都有这两个事件,而这两个事件可能是冲突的。我们需要决定哪个事件才是目前正确的最终事件么?如果需要,那么我们得制定规则来使得多个集群的应用都能得出相同的结论。或者我们可以认为这两个事件都是正确的,认为用户同时订购了 A 和 B。亚马逊以前采取这种方式来处理冲突,但像证券交易这种机构不能采取这种方式。这个问题的解决方案是因地制宜的,我们需要知道的是一旦采取这种架构,冲突是无法避免的。
如果我们找到多集群异步读写的数据一致性问题,那么这种架构是最好的,因为它是可扩展的、弹性的,并且相对于冷热互备来说性价比也不错。
多活架构的另一个挑战是,如果存在多个数据中心,那么每一对中心都需要通信链路。也就是说,如果有 5 个数据中心,那么总共需要部署 20 个镜像进程来处理数据复制;如果考虑高可用,那么可能需要 40 个。
另外,我们需要避免事件被循环复制和处理。对于这个问题,我们可以将一个逻辑概念的主题拆分成多个物理主题,并且一个物理主题与一个数据中心对应。比如,users 这个逻辑主题可以拆分成 SF.users 和 NYC.users 这两个物理主题,每个主题对应一个数据中心;NYC 的镜像进程从 SF 的 SF.users 读取数据到本地,SF 的镜像进程从 NYC 的 NYC.users 读取数据到本地。因此每个事件都只会被复制一次,而且每个数据中心都包含 SF.users 和 NYC.users 主题,并且包含全量的 users 数据。消费者如果需要获取全量的 users 数据,那么需要消费所有本地.users 主题的数据。
需要提醒的是,Kafka 正在计划添加记录头部,允许我们添加标记信息。我们在生产消息时可以加上数据中心的标记,这样也可以避免循环数据复制。当然,我们也可以自己在消息体中增加标记信息进行过滤,但缺点是当前的镜像工具并不支持,我们得自己开发复制逻辑。
冷热互备架构
有时候,多集群是为了防止单点故障。比如说,我们可能有两个集群,其中集群 A 处于服务状态,另一个集群 B 通过数据镜像来接收集群 A 所有的事件,当集群 A 不可用时,B 可以启动服务。在这种场景中,集群 B 包含了数据的冷备份。
这种架构的优点在于搭建简单并且适用于多种场景。我们只需搭建第二个集群,设置一个镜像进程来将源集群的所有事件同步到该集群即可,并且不用担心发生数据冲突。缺点在于,我们浪费了一个集群资源,因为集群故障通常很少发生。一些公司会选择搭建低配的备份集群,但这样会存在一个风险,那就是无法保证出现紧急情况时该备份集群是否能支撑所有服务;另一些公司则选择适当利用备份集群,那就是把一些读取操作转移到备份集群。
集群故障转移也具有一些挑战性。但无论我们选择何种故障转移方案,SRE 团队都需要进行日常的故障演练。因为,即便今天故障转移是有效的,在进行系统升级之后很可能失效了。一个季度进行一次故障转移演练是最低限度,强大的 SRE 团队会演练更频繁,Netflix 著名的 Chaos Monkey 玩的更溜,它会随机制造故障,也就是说故障每天都可能发生。
下面来看下故障转移比较具有挑战性的地方。
数据损失与不一致
很多 Kafka 的数据镜像解决方案都是异步的,也就是说备份集群不会包含主集群最新的消息。在一个高并发的系统中,备份集群可能落后主集群几百甚至上千条消息。假如集群每秒处理 100 万条消息,备份集群与主集群之间有 5ms 的落后,那么在理想情况下备份集群也落后将近 5000 条消息。因此,我们需要对故障转移时的数据丢失做好准备。当然在故障演练时,我们停止主集群之后,可以等待数据镜像进程接收完剩余的消息,再进行故障转移,避免数据丢失。另外,Kafka 不支持事务,如果多个主题的数据存在关联性,那么在数据丢失的情况下可能会导致不一致,因此应用需要注意处理这种情况。
故障转移的开始消费位移
在故障转移中,其中一个挑战就是如何决定应用在备份集群的开始消费位移。下面来讨论几个可选的方案。
- 自动位移重置:Kafka 消费者可以配置没有已提交位移时的行为,要么从每个分区的起始端消费,要么从每个分区的最末端消费。如果我们的消费者提交位移到 Zookeeper,而且没有对 Zookeeper 中的位移数据进行镜像备份,那么我们需要从这两个选项中做出选择。选择从起始端开始消费的话,可能会存在大量重复的消息;选择从最末端消费的话,可能会存在消息丢失。如果这两种情况可以忍受的话,那么建议选择这种方案,因为这种方案非常简单。
- 复制位移主题:如果我们使用 0.9 或者更高版本的 Kafka 消费者,消费者会提交位移到一个特殊的主题,_consumer_offsets。如果我们复制这个主题到备份集群,那么备份集群的消费者可以从已提交的位移处开始消费。这种方案也很简单,但是有一些情况需要注意。首先,主集群和备份集群的消息位移不能保证是一样的。举个例子,我们在主集群中只保留 3 天的数据,在主题创建并且使用了一个星期之后,我们开始进行备份集群的数据镜像;在这个场景中,主集群的最新消息位移可能到达 57000000,而备份集群的最新消息位移是 0,并且由于主集群中老的数据已经被过期删除了,备份集群的消息位移跟主集群始终是不一样的。其次,即便我们在创建主题就进行数据镜像,由于生产者失败重试,仍然会导致不同集群的消息位移是不同的。最后,即便主集群和备份集群的消息位移完全一致,由于主集群和备份集群存在一定的消息落后并且 Kafka 不支持事务,消费者提交的消息位移可能在相应消息之前或之后到达。因此,在故障转移时消费者可能根据位移找不到匹配的消息,或者位移落后于主集群。总的来说,如果备份集群的提交位移比主集群的提交位移更老,或者由于重试导致备份集群的消息比主集群的消息多,那么会存在一定的数据重复消费;如果备份集群的提交位移没有匹配到相应的消息,那么我们可能仍然需要从主题起始端或者最末端进行消费。因此,这种方案能够减少数据重复消费或者数据丢失,但也不能完全避免。
- 基于时间的故障转移:如果我们使用 0.10.0 或者更高版本的 Kafka 消费者,每条消息都会包含发送到 Kafka 的时间戳。而且,0.10.1.0 或者更高版本的 broker 会建立一个索引,并且提供一个根据时间戳来查询位移的 API。因此,假如我们知道故障在某个时间发生,比如说为早上 4:05,那么我们可以让备份集群的消费者从早上 4:03 处开始消费数据,虽然这样会有两分钟的数据重复消费,但至少数据没有丢失。这个方案的唯一问题是,我们怎么告诉备份集群的消费者从特定时间点开始消费呢?一个解决思路是,我们在应用代码中支持指定开始消费的时间,然后使用 API 来获取该时间对应的位移,然后从该位移处开始消费处理。但如果应用代码没有支持这种功能,我们可以自己写一个小工具,该工具接收一个时间戳,然后使用 API 来获取所有主题分区的位移,最后提交这些位移,这样备份集群的消费组在启动时会自动获取位移,然后进行消费处理。这种方案是最优的。
- 外部位移映射:在上面讨论复制位移主题的时候,曾提到一个最大的挑战是主集群和备份集群的消息位移不一致。基于这个问题,一些公司选择开发自己的数据镜像工具,并且使用外部存储系统来存储集群间的消息位移映射。比如,主集群中位移为 495 的消息对应于备份集群中位移为 500 的消息,那么在外部存储系统中记录(495,500),这样在故障转移时我们可以基于主集群的已提交位移和映射来得到备份集群中的提交位移。但这种方案没有解决位移比消息提前到达备份集群的问题。这种方案比较复杂,升级集群然后使用基于时间的故障转移可能更便捷。
故障转移之后
假如故障顺利转移到备份集群,并且备份集群正常工作,那么原主集群应该怎么处理呢?可能需要将其转化为备份集群。你可能会想,能不能简单修改数据镜像工具,让其换个同步方向,从新的主集群同步数据到老的主集群?这样会导致两个问题:
- 我们如何得知从什么地方开始进行数据镜像呢?这个问题跟故障转移时消费者不知道消费位移的问题是一样的,而且解决方案也会存在消息重复或者丢失的问题。
- 如前所述,老的主集群可能会包含备份集群没有同步的数据更新,如果只是简单的将新主集群的数据同步回来,那么这两个集群又会发生不一致的情况。
因此,最简单的解决方案是,清除老主集群的所有状态和数据,然后重新与新主集群进行数据镜像,这样可以保证这两个集群的状态是一致的。
其他事项
故障转移还有一个需要注意的地方是,应用如何切换与备份集群进行通信?如果我们在代码中直接硬编码主集群的 broker,那么故障转移比较麻烦。因此,很多公司会创建一个 DNS 名称来解析到主集群的 broker,当故障转移时将 DNS 解析到备份集群的 broker。由于 Kafka 客户端只需要成功连接到集群的一个 broker 便可通过该 broker 发现整个集群,因此我们创建 3 个左右的 DNS 解析到 broker 即可。
延伸集群
延伸集群主要用来防止单个数据中心故障导致 Kafka 服务不可用,其解决方案为:将一个 Kafka 集群分布在多个数据中心。因此延伸集群与其他集群方案有本质的区别,它就是一个 Kafka 集群。在这种方案中,我们不需要数据镜像来同步,因为 Kafka 本身就有复制机制,并且是同步复制的。在生产者发送消息时,我们可以通过配置分区机架信息、min.isr、acks=all 来使得数据写入到至少两个数据中心副本后,才返回成功。
这种方案的优点是,多个数据中心的数据是实时同步的,而且不存在资源浪费问题。由于集群跨数据中心,为了得到最好的服务性能,数据中心间需要搭建高质量的通信设施以便得到低延迟和高吞吐,部分公司可能无法提供。
另外需要注意的是,一般需要 3 个数据中心,因为 Kafka 依赖的 Zookeeper 需要奇数的节点来保证服务可用性,只要有超过一半的节点存活,服务即可用。如果我们只有两个数据中心,那么肯定其中一个数据中心拥有多数的 Zookeeper 节点,那么该数据中心发生故障的话服务便不可用;如果拥有三个数据中心并且 Zookeeper 节点均匀分布,那么其中一个数据中心发生故障,服务仍然可用。
MirrorMaker
Kafka 内置了一个用于集群间做数据镜像的简单工具–MirrorMaker,它的核心是一个包含若干个消费者的消费组,该消费组从指定的主题中读取数据,然后使用生产者把这些消息推送到另一个集群。每个消费者负责一部分主题和分区,而生产者则只需要一个,被这些消费者共享;每隔 60 秒消费者会通知生产者发送消息数据,然后等待另一个集群的 Kafka 接收写入这些数据;最后这些消费者提交已写入消息的位移。MirrorMaker 保证数据不丢失,而且在发生故障时不超过 60 秒的数据重复。
如何配置
首先,MirrorMaker 依赖消费者和生产者,因此消费者和生产者的配置属性对 MirrorMaker 也适用。另外,MirrorMaker 也有自身的属性需要配置。先来看一个配置的代码样例:
bin/kafka-mirror-maker --consumer.config etc/kafka/consumer.properties --producer.config etc/kafka/producer.properties --new.consumer --num.streams=2 --whitelist ".\*"
- consumer.config:这个配置文件指定了所有消费者的属性,其中 bootstrap.servers 属性指定了源集群,group.id 指定了所有消费者的使用的消费组 ID。另外,auto.commit.enable=false 这个配置最好不要更改,因为 MirrorMaker 根据消息是否写入目标集群来决定是否提交位移,修改此属性可能会造成数据丢失。auto.offset.reset 这个属性默认为 latest,也就是说创建 MirrorMaker 时会从该时间点开始数据镜像,如果需要对历史数据进行数据镜像,可以设置成 earliest。
- producer.config:这个配置文件指定了 MirrorMaker 中生产者的属性,其中 bootstrap.servers 属性指定了目标写入集群。
- new.consumer:MirrorMaker 可以使用 0.8 版本或者新的 0.9 版本消费者,建议使用 0.9 版本消费者。
- num.streams:指定消费者的数量。
- whitelist:使用正则表达式来指定需要数据镜像的主题。上面的例子中指定对所有的主题进行数据镜像。
在生产环境中部署 MirrorMaker
上面的例子展示了如何使用命令行启动 MirrorMaker,当在生产环境中部署 MirrorMaker 时,你可能会使用 nohub 和输出重定向来将使得它在后台运行,不过 MirrorMaker 已经包含-daemon 参数来指定后台运行模式。很多公司都有自己的部署运维系统,比如 Ansible,Puppet,Chef,Salt 等等。一个更为高级的部署方案是使用 Docker 来运行 MirrorMaker,而且越来越流行。MirrorMaker 本身是无状态的,不需要任何磁盘存储,并且这种方案可以使一台机器运行多个 MirrorMaker(也就是说运行多个 Docker)。对于一个 MirrorMaker 来说,它的吞吐瓶颈在于只有一个生产者,因此使用多个 MirrorMaker 可以提高吞吐,而使用 Docker 部署多个 MirrorMaker 尤其方便。另外,Docker 也可以支持业务洪峰低谷的弹性伸缩。
如果允许的话,建议将 MirrorMaker 部署在目标集群内,这是因为如果一旦发生网络分区,消费者与源集群断开连接比生产者与目标集群断开连接要安全。如果消费者断开连接,那么只是当前读取不到数据,但是数据仍然在源集群内,并不会丢失;而生产者断开连接,MirrorMaker 便生产不了数据,如果 MirrorMaker 本身处理不当,可能会丢失数据。
但对于在集群间需要加密传输数据的场景来说,将 MirrorMaker 部署在源集群也是个可以考虑的方案。这是因为在 Kafka 中使用 SSL 进行加密传输时,消费者相比生产者来说性能受影响更大。因此我们可以在源集群内部 broker 到 MirrorMaker 的消费者间不使用 SSL 加密,而在 MirrorMaker 跨集群生产数据时使用 SSL 加密,这样可以将 SSL 的性能影响降到最低。另外,尽量配置 acks=all 和足够的重试次数来降低数据丢失的风险,而且如果 MirrorMaker 一旦发送消息失败最好让其暂时退出,避免丢失数据。
为了降低目标集群和源集群的消息延迟,建议将 MirrorMaker 部署在两台不同的机器上并且使用相同的消费组,这样一台发生故障另外一台仍然可以保证服务正常。
在生产环境中部署 MirrorMaker 时,监控是很重要的,下面是一些重要的监控指标:
延迟监控
延迟是指目标集群与源集群的消息落后间隔,间隔值通过计算源集群最新的消息与目标集群最新的消息来得到。下图中源集群最新的消息位移是 7,目标集群最新的消息位移是 5,延迟间隔为 2。
有两种方式来监控此指标,但各有优缺点:
- 检测 MirrorMaker 提交到源集群的位移。我们可以使用 kafka-consumer-groups 来检测分区的最新位移以及 MirrorMaker 提交的位移,通过计算差值得到落后间隔。但这种计算方式不是 100%准确的,因为 MirrorMaker 不是时刻提交位移的,默认情况下每分钟提交一次位移。因此你可能会看到间隔在一分钟内逐渐增长,然后突然降低。拿上面的例子来说,实际的间隔为 2,但由于 MirrorMaker 没有提交位移,kafka-consumer-groups 工具可能会检测到落后间隔为 4。LinkedIn 的 Burrow 工具相对来说更成熟,可以避免这种问题。
- 检测 MirrorMaker 读取到的消息位移(可能还没有提交)。MirrorMaker 的消费者会通过 JMX 来发布指标,其中一个指标就是消费者落后间隔(聚合所有分区)。但这个间隔也不是 100%准确的,因为它是根据消费者读取到的位移来计算的,并没有考虑是否已经写入目标集群。拿上面的例子来说,MirrorMaker 消费者可能会汇报落后间隔为 1 而不是 2,因为它已经读取到消息 6,即便这个消息仍未写入到目标集群。
指标监控
MirrorMaker 中包含消费者和生产者,它们都有许多指标,建议在生产环境中收集跟踪这些指标。 Kafka 文档 列举了所有可用的指标,下面是一些比较重要的指标:
- 消费者:fetch-size-avg, fetch-size-max, fetch-rate, fetch-throttle-time-avg, 和 fetch-throttle-time-max。
- 生产者:batch-size-avg, batch-size-max, requests-in-flight,和 record- retry-rate。
- 消费者和生产者:io-ratio 和 io-wait-ratio。
Canary
如果你已经监控了所有的指标,那么 Canary 不是必须的。但我们仍然推荐在生产环境中使用 Canary,因为它能提供整体的监控。Canary 每分钟发送一个事件到源集群,然后尝试从目标集群读取该事件,如果时间间隔超过阈值就会发出报警信息,因为这意味着 MirrorMaker 数据镜像存在问题。
MirrorMaker 性能调优
首先 MirrorMaker 的集群大小需要依赖所需要满足的吞吐量和延迟。如果不能忍受延迟,那么你可能需要尽可能部署多的 MirrorMaker 以便处理流量洪峰;如果能忍受一定的延迟,那么 MirrorMaker 处理洪峰的 75%-80%或者 95%-99%就可以了,洪峰的延迟会在低谷时慢慢降低。
现在我们来评估 MirrorMaker 的消费者线程数,也就是 num.streams 所指定的值。LinkedIn 的经验值是 8 个消费者线程可以达到 6MB/s 的处理速度,16 个消费者线程可以达到 12MB/s 的速度,但这个经验值不是通用的,因为它受硬件配置影响。因此我们需要自己做压力测试,Kafka 中内置有 kafka-performance-producer,可以使用它作为生产者来发送压测事件到源集群,然后测试 MirrorMaker 在 1,2,4,8,16,24,32 个线程下的性能,当增加线程数不能提高性能时即取得极值,配置的线程数需要小于这个极值即可。如果我们发送的消息是经过压缩的,那么 MirrorMaker 的消费者需要解压然后生产者重新压缩,这个过程会消耗 CPU,因此在测试过程中也需要关注 CPU 负载情况。这个过程可以测试单个 MirrorMaker 的性能,如果以集群形态部署,那么我们需要对多个 MirrorMaker 的集群进行性能压测。
另外,核心的主题可能需要尽可能降低延迟,对于这种情况建议在部署 MirrorMaker 时进行隔离,防止别的大流量主题影响到核心主题。
上面是基本的性能调优,一般能满足业务需求了。但我们其实还可以进一步提高 MirrorMaker 的性能。在使用 MirrorMaker 做跨集群数据镜像时,我们可以对网络参数进行性能调优:
- 增大 TCP 缓冲区(net.core.rmem_default, net.core.rmem_max, net.core.wmem_default, net.core.wmem_max, net.core.optmem_max)。
- 使用自动滑动窗口(sysctl –w net.ipv4.tcp_window_scaling=1,或者添加 net.ipv4.tcp_window_scaling=1 到/etc/sysctl.conf)。
- 降低 TCP 慢启动时间(设置/proc/sys/net/ipv4/ tcp_slow_start_after_idle 为 0)
网络性能调优是一个复杂的过程,感兴趣的可以参考《Performance Tuning for Linux Servers》这本书。
另外,如果需要对 MirrorMaker 的生产者和消费者进行性能调优的话,我们得首先了解性能瓶颈究竟是在于生产者还是消费者。一个方法是监控生产者和消费者指标,如果发现一个空闲而另一个负载非常高,那么就知道瓶颈在哪了。或者我们可以使用 jstack 来对线程栈进行多次采样,看 MirrorMaker 究竟主要耗费时间在 poll 消息还是 send 消息,然后再进行优化。
如果想优化生产者,那么下面是一些比较重要的属性配置:
- max.in.flight.requests.per.connection:默认情况下,MirrorMaker 生产者同时只发送一个请求,这意味着生产者等到目标集群 ack 后才发送下一个请求。这种方式可以保证在失败重试的情况下仍然保持消息顺序。不过如果集群间通信延迟较大,这种方式会降低发送性能,因此对于消息顺序不重要的场景,我们可以通过增加 max.in.flight.requests.per.connection 来提高吞吐。
- linger.ms 和 batch.size:如果检测到生产者发送的消息经常是很小的(比如说 batch-size-avg 和 batch-size-max 都小于配置的 batch.size),那么我们可以通过增加 linger.ms 来让生产者等待更多的消息然后再发送请求,但注意到这种方式也会增加延迟。而如果观测到生产者每次发送的消息都是满足 batch.size 的,而我们又有空余的内存,那么可以考虑增大 batch.size。
如果想优化消费者,下面是一些比较重要的属性配置:
- partition.assignment.strategy:MirrorMaker 默认的分区均衡策略为 range,这种方式有一定的好处,但是可能会导致分区不均衡分配。对于 MirrorMaker 来说,我们可以考虑设置成轮询策略(Round Robin),只需要将 partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor 添加到配置文件即可。
- fetch.max.bytes:如果检测到 fetch-size-avg 和 fetch-size-max 都跟 fetch.max.bytes 很接近,而我们又有空余的内存空间,那么可以考虑 fetch.max.bytes 来使得消费者在每个请求中读取更多的数据。
- fetch.min.bytes 和 fetch.max.wait:如果检测到 fetch-rate 指标很高,那么证明消费者频繁拉取消息,而且拉取的消息非常少,那么我们可以考虑增加 fetch.min.bytes 和 fetch.max.wait 来使得消费者每次可以等待拉取更多的消息。
其他跨集群数据镜像解决方案
上面深入讨论了 MirrorMaker 的方案,但如前所述 MirrorMaker 有自身的局限性和缺点,下面来看下 MirrorMaker 的替代方案以及它们是如何解决 MirrorMaker 所遇到的问题的。
Uber uReplicator
Uber 大规模使用 MirrorMaker,随着主题增多和集群规模增长,他们遇到了一些问题:
- 重平衡延迟:MirrorMaker 内部的消费者只是普通的 Kafka 消费者,因此增加消费者线程、增加 MirrorMaker 实例、增加主题等等都会引起分区的重平衡。在前面系列文章提到过,分区重平衡进行时,消费者不能消费数据,直至重平衡完毕。如果主题和分区数量很大,那么这个过程会需要一定时间,对于老的消费者来说时间则更长。在一些场景下,甚至会引起 5 到 10 分钟的停顿。
- 新增主题困难:使用正则表达式来指定主题列表意味着每新增一个主题都会引起上面所说的重平衡,因此为了避免不必要的重平衡,Uber 单独指明需要数据镜像的主题列表。但 Uber 在新增镜像主题时需要修改所有 MirrorMaker 的配置并且重启,这仍然会导致重平衡。不过这样可以控制重平衡次数,只是定期维护导致重平衡,而不是每次新增主题都进行重平衡。需要注意的是,如果配置出错导致 MirrorMaker 间的配置不同,那么 MirrorMaker 启动后会不断的重平衡,因为消费者间不能达成一致。
基于上述的问题,Uber 开发了 uReplicator 来替代 MirrorMaker,他们使用 Apache Helix 来管理分配到 uReplicator 的主题和分区,并且使用 REST API 来在 Helix 中新增主题。Uber 使用自身研发的 Helix 消费者来替代 MirrorMaker 中的消费者,Helix 消费者从 Helix 中获取分区,并且监听 Helix 的分区改动事件,以此来避免原生的消费者重平衡。
Uber 写了一篇 博客 来描述这个架构,并且详细说明了这种方案的改进之处。
Confluent Replicator
在 Uber 开发 uReplicator 的同时,Confluent 公司也在开发 Replicator。虽然这两者名称基本相同,但是它们的侧重点却是不一样的。Confluent 公司的 Replicator 主要是解决商业上遇到的多集群部署维护问题:
- 集群配置不一致:MirrorMaker 只是同步数据,但是不同集群的主题配置(分区数、冗余因子等)可能是不同的。如果我们在源集群增加了主题数据的保留时间但忘记在目标集群修改相同的配置,可能会导致在故障转移时,应用找不到历史数据。而且,手动同步主题配置非常容易出错。
- MirrorMaker 集群本身维护困难:上面说到 MirrorMaker 一般是以集群来部署的,本身也需要维护。MirrorMaker 除了配置生产者和消费者之外,本身也有许多属性需要配置。如果我们有多个数据中心需要相互同步数据,那么 MirrorMaker 数量会迅速膨胀。这些情况都导致了 MirrorMaker 集群的运维复杂性。
为了降低运维复杂性,Confluent 公司研发了 Replicator,它是 Kafka Connect 的一种 connector,与从数据库读取的 connector 不同的是,Replicator 从 Kafka 集群中读取数据。Kafka Connect 框架中的 connector 会将整体工作拆分成多个 task,其中每个任务是一对 <consumer, producer>
。
Connect 框架将 task 均衡分配到各个 worker 节点,因此我们不需要计算每个 MirrorMaker 需要多少个消费者或者一个机器上部署多少个 MirrorMaker 实例。另外,Connect 提供了 REST API 来管理 connector 和 task。通过使用使用基于 Kafka Connect 框架的方案,我们可以降低需要维护的集群数量。而且,Replicator 除了同步数据之外,也会同步 Zookeeper 中的主题配置。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论