深入解析kafka高性能背后的实现原理
在探究 Kafka 核心知识之前,我们先思考一个问题:什么场景会促使我们使用 Kafka? 说到这里,我们头脑中或多或少会蹦出异步解耦和削峰填谷等字样,是的,这就是 Kafka 最重要的落地场景。
异步解耦:同步调用转换成异步消息通知,实现生产者和消费者的解耦。想象一个场景,在商品交易时,在订单创建完成之后,需要触发一系列其他的操作,比如进行用户订单数据的统计、给用户发送短信、给用户发送邮件等等。如果所有操作都采用同步方式实现,将严重影响系统性能。针对此场景,我们可以利用消息中间件解耦订单创建操作和其他后续行为。
削峰填谷:利用 broker 缓冲上游生产者瞬时突发的流量,使消费者消费流量整体平滑。对于发送能力很强的上游系统,如果没有消息中间件的保护,下游系统可能会直接被压垮导致全链路服务雪崩。想象秒杀业务场景,上游业务发起下单请求,下游业务执行秒杀业务(库存检查,库存冻结,余额冻结,生成订单等等),下游业务处理的逻辑是相当复杂的,并发能力有限,如果上游服务不做限流策略,瞬时可能把下游服务压垮。针对此场景,我们可以利用 MQ 来做削峰填谷,让高峰流量填充低谷空闲资源,达到系统资源的合理利用。
通过上述例子可以发现交易、支付等场景常需要异步解耦和削峰填谷功能解决问题,而交易、支付等场景对性能、可靠性要求特别高。那么, Kafka 能否满足相应要求呢?下面我们来探讨下。
Kafka 宏观认知
在探究 Kafka 的高性能、高可靠性之前,我们从宏观上来看下 Kafka 的系统架构。
如上图所示,Kafka由Producer、Broker、Consumer 以及负责集群管理的 ZooKeeper 组成,各部分功能如下:
Producer:生产者,负责消息的创建并通过一定的路由策略发送消息到合适的 Broker;
Broker:服务实例,负责消息的持久化、中转等功能;
Consumer:消费者,负责从 Broker 中拉取(Pull)订阅的消息并进行消费,通常多个消费者构成一个分组,消息只能被同组中的一个消费者消费;
ZooKeeper:负责 broker、consumer 集群元数据的管理等;
上图消息流转过程中,还有几个特别重要的概念—主题(Topic)、分区(Partition)、分段(segment)、位移(offset)。
topic:消息主题。Kafka 按 topic 对消息进行分类,我们在收发消息时只需指定 topic。
partition:分区。为了提升系统的吞吐,一个 topic 下通常有多个 partition,partition 分布在不同的 Broker 上,用于存储 topic 的消息,这使 Kafka 可以在多台机器上处理、存储消息,给 kafka 提供给了并行的消息处理能力和横向扩容能力。另外,为了提升系统的可靠性,partition 通常会分组,且每组有一个主 partition、多个副本 partition,且分布在不同的 broker 上,从而起到容灾的作用。
segment:分段。宏观上看,一个 partition 对应一个日志(Log)。由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据检索效率低下,Kafka 采取了分段和索引机制,将每个 partition 分为多个 segment,同时也便于消息的维护和清理。每个 segment 包含一个 .log 日志文件、两个索引(.index、timeindex)文件以及其他可能的文件。每个 Segment 的数据文件以该段中最小的 offset 为文件名,当查找 offset 的 Message 的时候,通过二分查找快找到 Message 所处于的 Segment 中。
offset:消息在日志中的位置,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量。offset 是消息在分区中的唯一标识,是一个单调递增且不变的值。Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
在对 Kafka 的整体系统框架及相关概念简单了解后,下面我们来进一步深入探讨下高可靠性、高性能实现原理。
Tips:自建Kafka 集群,可能会面临一系列挑战,包括集群搭建、节点配置、分区与副本管理、安全性与监控等方面问题。选用腾讯云消息队列 CKafka 版(TDMQ for CKafka),100% 兼容开源 Kafka API 2.4、2.8、3.2 版本,同时享受高可用、数据压缩、同时支持离线和实时数据处理等优点,免去自建管理等各种问题。
Kafka 高可靠性探究
Kafka 高可靠性的核心是保证消息在传递过程中不丢失,涉及如下核心环节:
消息从生产者可靠地发送至 Broker;-- 网络、本地丢数据。
发送到 Broker 的消息可靠持久化;-- PageCache 缓存落盘、单点崩溃、主从同步跨网络。
消费者从 Broker 消费到消息且最好只消费一次 -- 跨网络消息传输。
1 消息从生产者可靠地发送至 Broker
为了保障消息从生产者可靠地发送至 Broker,我们需要确保两点
Producer 发送消息后,能够收到来自 Broker 的消息保存成功 ack;
Producer 发送消息后,能够捕获超时、失败 ack 等异常 ack 并做处理;
1.1 ack 策略
针对问题1,Kafka 为我们提供了三种 ack 策略
Request.required.acks = 0:请求发送即认为成功,不关心有没有写成功,常用于日志进行分析场景。
Request.required.acks = 1:当 leader partition 写入成功以后,才算写入成功,有丢数据的可能。
Request.required.acks= -1:ISR 列表里面的所有副本都写完以后,这条消息才算写入成功,强可靠性保证
为了实现强可靠的 kafka 系统,我们需要设置Request.required.acks= -1,同时还会设置集群中处于正常同步状态的副本 follower 数量min.insync.replicas>2,另外,设置 unclean.leader.election.enable=false 使得集群中 ISR 的 follower 才可变成新的 leader,避免特殊情况下消息截断的出现。
1.2 消息发送策略
针对问题2,kafka 提供两类消息发送方式:同步(sync)发送和异步(async)发送,trpc-go 框架中kafka插件相关参数如下:
trpc-kafka 底层基于开源的 sarama 实现,在消息发送的过程中,无论是同步发送还是异步发送都会涉及到两个协程--负责消息发送的主协程和负责消息分发的 dispatcher 协程。
异步发送
对于异步发送(ack != 0 场景,等于0时不关心写 kafka 结果,后文详细讲解)而言,其流程大概如下:
在主协程中调用异步发送 kafka 消息的时候,其本质是将消息体放进了一个 input 的 channel,只要入 channel 成功,则这个函数直接返回,不会产生任何阻塞。相反,如果入 channel 失败,则会返回错误信息。因此调用 async 写入的时候返回的错误信息是入 channel 的错误信息,至于具体最终消息有没有发送到 kafka 的 broker,我们无法从返回值得知。
当消息进入 input 的 channel 后,会有另一个 dispatcher 的协程负责遍历 input,来真正发送消息到特定 Broker 上的主 Partition 上。发送结果通过一个异步协程进行监听,循环处理 err channel 和 success channel,出现了 error 就记一个日志。因此异步写入场景时,写 kafka 的错误信息,我们暂时仅能够从这个错误日志来得知具体发生了什么错,并且也不支持我们自建函数进行兜底处理,这一点在 trpc-go 的官方也得到了承认。
同步发送
同步发送(ack != 0 场景)是在异步发送的基础上加以条件限制实现的。同步消息发送在 newSyncProducerFromAsyncProducer 中开启两个异步协程处理消息成功与失败的“回调”,并使用 waitGroup 进行等待,从而将异步操作转变为同步操作。其流程大概如下
通过上述分析可以发现,kafka 消息发送本质上都是异步的,不过同步发送通过 waitGroup 将异步操作转变为同步操作。同步发送在一定程度上确保了我们在跨网络向 Broker 传输消息时,消息一定可以可靠地传输到 Broker。因为在同步发送场景我们可以明确感知消息是否发送至 Broker,若因网络抖动、机器宕机等故障导致消息发送失败或结果不明,可通过重试等手段确保消息至少一次(at least once) 发送到 Broker。另外,Kafka(0.11.0.0版本后)还为 Producer 提供两种机制来实现精确一次(exactly once) 消息发送:幂等性(Idempotence)和事务(Transaction)。
1.3 小结
通过 ack 策略配置、同步发送、事务消息组合能力,我们可以实现 exactly once 语意跨网络向 Broker 传输消息。但是,Producer 收到 Broker 的成功 ack,消息一定不会丢失吗?为了搞清这个问题,我们首先要搞明白 Broker 在接收到消息后做了哪些处理。
需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
2 发送到 Broker 的消息可靠持久化
为了确保 Producer 收到 Broker 的成功 ack 后,消息一定不在 Broker 环节丢失,我们核心要关注以下几点:
Broker 返回 Producer 成功 ack 时,消息是否已经落盘;
Broker 宕机是否会导致数据丢失,容灾机制是什么;
Replica 副本机制带来的多副本间数据同步一致性问题如何解决;
2.1 Broker 异步刷盘机制
kafka 为了获得更高吞吐,Broker 接收到消息后只是将数据写入 PageCache 后便认为消息已写入成功,而 PageCache 中的数据通过 linux 的 flusher 程序进行异步刷盘(刷盘触发条:主动调用 sync 或 fsync 函数、可用内存低于阀值、dirty data 时间达到阀值),将数据顺序写到磁盘。消息处理示意图如下:
由于消息是写入到 PageCache ,单机场景,如果还没刷盘 Broker 就宕机了,那么 Producer 产生的这部分数据就可能丢失。为了解决单机故障可能带来的数据丢失问题,Kafka 为分区引入了副本机制。
2.2 Replica 副本机制
Kafka 每组分区通常有多个副本,同组分区的不同副本分布在不同的 Broker 上,保存相同的消息(可能有滞后)。副本之间是“一主多从”的关系,其中 leader 副本负责处理读写请求,follower 副本负责从 leader 拉取消息进行同步。分区的所有副本统称为 AR(Assigned Replicas),其中所有与 leader 副本保持一定同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),与 leader 同步滞后过多的副本组成 OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。follower 副本是否与leader同步的判断标准取决于 Broker 端参数 replica.lag.time.max.ms(默认为10秒),follower 默认每隔 500ms 向 leader fetch 一次数据,只要一个 Follower 副本落后 Leader 副本的时间不连续超过10秒,那么 Kafka 就认为该 Follower 副本与 leader 是同步的。在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合为空。
当 leader 副本所在 Broker 宕机时,Kafka 会借助 ZK 从 follower 副本中选举新的 leader 继续对外提供服务,实现故障的自动转移,保证服务可用。为了使选举的新 leader 和旧 leader 数据尽可能一致,当 leader 副本发生故障时,默认情况下只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有任何机会(可通过设置unclean.leader.election.enable改变)。
当 Kafka 通过多副本机制解决单机故障问题时,同时也带来了多副本间数据同步一致性问题。Kafka 通过高水位更新机制、副本同步机制、 Leader Epoch 等多种措施解决了多副本间数据同步一致性问题,下面我们来依次看下这几大措施。
HW 和 LEO
首先,我们来看下两个和Kafka中日志相关的重要概念 HW 和 LEO:
HW: High Watermark,高水位,表示已经提交(commit)的最大日志偏移量,Kafka 中某条日志“已提交”的意思是 ISR 中所有节点都包含了此条日志,并且消费者只能消费 HW 之前的数据;
LEO: Log End Offset,表示当前 log 文件中下一条待写入消息的 offset;
如上图所示,它代表一个日志文件,这个日志文件中有8条消息,0至5之间的消息为已提交消息,5至7的消息为未提交消息。日志文件的 HW 为5,表示消费者只能拉取到5之前的消息,而 offset 为5的消息对消费者而言是不可见的。日志文件的 LEO 为8,下一条消息将在此处写入。
注意:所有副本都有对应的 HW 和 LEO,只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的 HW 有如下特点:
Leader HW:min(所有副本 LEO),为此 Leader 副本不仅要保存自己的 HW 和 LEO,还要保存 follower 副本的 HW 和 LEO,而 follower 副本只需保存自己的 HW 和 LEO;
Follower HW:min(follower 自身 LEO,leader HW)。
注意:为方便描述,下面 Leader HW 简记为HW L ,Follower HW 简记为HW F ,Leader LEO 简记为 LEO L ,Follower LEO 简记为 LEO F。
下面我们演示一次完整的 HW / LEO 更新流程:
1.初始状态 HW L =0,LEO L =0,HW F=0,LEO F=0。
2.Follower 第一次 fetch
Leader 收到 Producer 发来的一条消息完成存储, 更新 LEO L =1;
Follower从Leader fetch数据, Leader收到请求,记录follower的LEO F =0,并且尝试更新 HW L =min(全部副本 LEO)=0;
leade 返回 HW L=0和 LEO L=1给 Follower,Follower 存储消息并更新LEOF =1, HW=min(LEO F ,HW L )=0。
3.Follower 第二次fetch:
Follower 再次从 Leader fetch 数据, Leader 收到请求,记录 follower 的 LEO F =1,并且尝试更新 HW L =min(全部副本 LEO)=1;
leade 返回 HW L =1和 LEO L =1给 Follower,Leader 收到请求,更新自己的 HW=min(LEO F ,HW L )=1。
上述更新流程中 Follower 和 Leader 的 HW 更新有时间 GAP。如果 Leader 节点在此期间发生故障,则 Follower 的 HW 和 Leader 的 HW 可能会处于不一致状态,如果 Followe 被选为新的 Leader 并且以自己的 HW 为准对外提供服务,则可能带来数据丢失或数据错乱问题。
KIP-101 问题:数据丢失&数据错乱
数据丢失
第1步:
副本 B 作为 leader 收到 producer 的 m2 消息并写入本地文件,等待副本 A 拉取。
副本 A 发起消息拉取请求,请求中携带自己的最新的日志offset(LEO=1),B 收到后更新自己的 HW 为1,并将 HW=1的信息以及消息 m2 返回给 A。
A 收到拉取结果后更新本地的 HW 为1,并将 m2 写入本地文件。发起新一轮拉取请求(LEO=2),B收到A拉取请求后更新自己的 HW 为2,没有新数据只将 HW=2 的信息返回给 A,并且回复给 producer 写入成功。此处的状态就是图中第一步的状态。
第2步:
此时,如果没有异常,A 会收到 B 的回复,得知目前的 HW 为2,然后更新自身的 HW 为2。但在第2步 A 重启了,没有来得及收到 B 的回复,此时 B 仍然是 leader。A 重启之后会以 HW 为标准截断自己的日志,因为 A 作为 follower 不知道多出的日志是否是被提交过的,防止数据不一致从而截断多余的数据并尝试从 leader 那里重新同步。
第3步:
B 崩溃了,min.isr 设置的是1,所以 zookeeper 会从 ISR 中再选择一个作为 leader,也就是 A,但是 A 的数据不是完整的,从而出现了数据丢失现象。问题在哪里?在于 A 重启之后以 HW 为标准截断了多余的日志。不截断行不行?不行,因为这个日志可能没被提交过(也就是没有被 ISR 中的所有节点写入过),如果保留会导致日志错乱。
数据错乱
在分析日志错乱的问题之前,我们需要了解到 kafka 的副本可靠性保证有一个前提:在 ISR 中至少有一个节点。如果节点均宕机的情况下,是不保证可靠性的,在这种情况会出现数据丢失,数据丢失是可接受的。这里我们分析的问题比数据丢失更加槽糕,会引发日志错乱甚至导致整个系统异常,而这是不可接受的。
第1步:
A 和 B 均为 ISR 中的节点。副本 A 作为 leader,收到 producer 的消息 m2的请求后写入 PageCache 并在某个时刻刷新到本地磁盘。
副本 B 拉取到 m2 后写入 PageCage 后(尚未刷盘)再次去 A 中拉取新消息并告知 A 自己的 LEO=2,A收到更新自己的 HW 为1并回复给 producer 成功。
此时 A 和 B 同时宕机,B 的 m2 由于尚未刷盘,所以 m2 消息丢失。此时的状态就是第1步的状态。
第2步:
由于 A 和 B 均宕机,而 min.isr=1 并且 unclean.leader.election.enable=true(关闭 unclean 选择策略),所以 Kafka 会等到第一个 ISR 中的节点恢复并选为 leader,这里不幸的是 B 被选为 leader,而且还接收到 producer 发来的新消息 m3。注意,这里丢失 m2 消息是可接受的,毕竟所有节点都宕机了。
第3步:
A 恢复重启后发现自己是 follower,而且 HW 为2,并没有多余的数据需要截断,所以开始和 B 进行新一轮的同步。但此时 A 和 B 均没有意识到,offset 为1的消息不一致了。
问题在哪里?在于日志的写入是异步的,上面也提到 Kafka 的副本策略的一个设计是消息的持久化是异步的,这就会导致在场景二的情况下被选出的 leader 不一定包含所有数据,从而引发日志错乱的问题。
Leader Epoch
为了解决上述缺陷,Kafka 引入了 Leader Epoch 的概念。leader epoch 和 raft 中的任期号的概念很类似,每次重新选择leader的时候,用一个严格单调递增的 id 来标志,可以让所有 follower 意识到 leader 的变化。而 follower 也不再以 HW 为准,每次奔溃重启后都需要去 leader 那边确认下当前 leader 的日志是从哪个 offset 开始的。下面看下 Leader Epoch 是如何解决上面两个问题的。
数据丢失解决
这里的关键点在于副本 A 重启后作为 follower,不是忙着以 HW 为准截断自己的日志,而是先发起 LeaderEpochRequest 询问副本 B 第0代的最新的偏移量是多少,副本 B 会返回自己的 LEO 为2给副本 A,A 此时就知道消息 m2 不能被截断,所以 m2 得到了保留。当 A 选为 leader 的时候就保留了所有已提交的日志,日志丢失的问题得到解决。如果发起 LeaderEpochRequest 的时候就已经挂了怎么办?这种场景下,不会出现日志丢失,因为副本 A 被选为 leader 后不会截断自己的日志,日志截断只会发生在 follower 身上。
数据错乱解决
这里的关键点还是在第3步,副本 A 重启作为 follower 的第一步还是需要发起 LeaderEpochRequest 询问 leader 当前第0代最新的偏移量是多少,由于副本 B 已经经过换代,所以会返回给A第1代的起始偏移(也就是1),A发现冲突后会截断自己偏移量为1的日志,并重新开始和 leader 同步。副本 A 和副本 B 的日志达到了一致,解决了日志错乱。
2.3 小结
Broker 接收到消息后只是将数据写入 PageCache 后便认为消息已写入成功,但是,通过副本机制并结合 ACK 策略可以大概率规避单机宕机带来的数据丢失问题,并通过 HW、副本同步机制、 Leader Epoch 等多种措施解决了多副本间数据同步一致性问题,最终实现了 Broker 数据的可靠持久化。
3 消费者从 Broker 消费到消息且最好只消费一次
Consumer 在消费消息的过程中需要向 Kafka 汇报自己的位移数据,只有当 Consumer 向 Kafka 汇报了消息位移,该条消息才会被 Broker 认为已经被消费。因此,Consumer 端消息的可靠性主要和 offset 提交方式有关,Kafka 消费端提供了两种消息提交方式:
因为有重复消费的可能,正常情况下我们通常需要幂等控制方式重复消费。
Kafka 高性能探究
我们都知道Kafka是基于磁盘进行存储的,但 Kafka 官方又称其具有高性能、高吞吐、低延时的特点,其吞吐量动辄几十上百万。
在座的靓仔和靓女们是不是有点困惑了,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间。那 Kafka 又是怎么做到其吞吐量动辄几十上百万的呢?
Kafka 通过无锁轻量级 offset 的设计,实现了高性能、高吞吐和低延时的目标。
其 Reactor I/O 网络模型、磁盘顺序写入、内存映射文件、零拷贝、数据压缩和批量处理等技术,为 Kafka 提供了强大的数据处理能力和高效的消息队列服务。
Reactor I/O 网络模型:通过 I/O 多路复用机制,Kafka 能够同时处理大量的网络连接请求,而不需要为每个连接创建一个线程,从而节省了系统资源。
顺序写入:Kafka 使用顺序写入的方式将消息追加到日志文件的末尾,避免了文件位置的频繁变动,从而减少了锁的使用。
MMAP 内存映射文件:Kafka 使用内存映射文件(Memory Mapped File)来访问日志数据和索引文件。这种方式使得文件数据可以直接映射到进程的虚拟地址空间中,从而减少了系统调用的开销,提高了数据访问的效率。
零拷贝:Kafka 使用零拷贝(Zero Copy)技术,将数据从磁盘直接传输到网络,绕过了用户态的复制过程,大大提高了数据传输的效率。
数据压缩和批量处理:数据压缩在 Kafka 中有助于减少磁盘空间的使用和网络带宽的消耗,从而提升整体性能。;Kafka 支持批量处理消息,在一个批次中同时处理多个消息,减少了网络和 I/O 的开销。
下面将详细说明这8点
1 Kafka Reactor I/O 网络模型
Reactor I/O 网络模型是一种非阻塞 I/O 模型,利用事件驱动机制来处理网络请求。
该模型通过 Reactor 模式实现,即一个或多个 I/O 多路复用器监听多个通道的事件,当某个通道准备好进行 I/O 操作时,触发相应的事件处理器进行处理。
这种模型在高并发场景下具有很高的效率,能够同时处理大量的网络连接请求,而不需要为每个连接创建一个线程,从而节省系统资源。
Reactor 线程模型如图所示。
Reacotr 模型主要分为三个角色。
Reactor:把 I/O 事件根据类型分配给分配给对应的 Handler 处理。
Acceptor:处理客户端连接事件。
Handler:处理读写等任务。
Kafka 基于 Reactor 模型架构如图所示。
Kafka 的网络通信模型基于 NIO(New Input/Output)库,通过 Reactor 模式实现,具体包括以下几个关键组件:
SocketServer:管理所有的网络连接,包括初始化 Acceptor 和 Processor 线程。
Acceptor:监听客户端的连接请求,并将其分配给 Processor 线程。Acceptor 使用 Java NIO 的 Selector 进行 I/O 多路复用,并注册 OP_ACCEPT 事件来监听新的连接请求。每当有新的连接到达时,Acceptor 会接受连接并创建一个 SocketChannel,然后将其分配给一个 Processor 线程进行处理。
Processor:处理具体的 I/O 操作,包括读取客户端请求和写入响应数据。Processor 同样使用 Selector 进行 I/O 多路复用,注册 OP_READ 和 OP_WRITE 事件来处理读写操作。每个 Processor 线程都有一个独立的 Selector,用于管理多个 SocketChannel。
RequestChannel:充当 Processor 和请求处理线程之间的缓冲区,存储请求和响应数据。Processor 将读取的请求放入 RequestChannel 的请求队列,而请求处理线程则从该队列中取出请求进行处理。
KafkaRequestHandler:请求处理线程,从 RequestChannel 中读取请求,调用 KafkaApis 进行业务逻辑处理,并将响应放回 RequestChannel 的响应队列。KafkaRequestHandler 线程池中的线程数量由配置参数 num.io.threads 决定。
注:该模型和如何提高 kafka 的性能和效率?
高并发处理能力:通过 I/O 多路复用机制,Kafka 能够同时处理大量的网络连接请求,而不需要为每个连接创建一个线程,从而节省了系统资源。
低延迟:非阻塞 I/O 操作避免了线程的阻塞等待,使得 I/O 操作能够更快地完成,从而降低了系统的响应延迟。
资源节省:通过减少线程的数量和上下文切换,Kafka 在处理高并发请求时能够更有效地利用 CPU 和内存资源。
扩展性强:Reactor 模式的分层设计使得 Kafka 的网络模块具有很好的扩展性,可以根据需要增加更多的 I/O 线程或调整事件处理器的逻辑。
2 零拷贝技术的运用
零拷贝技术是一种计算机操作系统技术,用于在内存和存储设备之间进行数据传输时,避免 CPU 的参与,从而减少 CPU 的负担并提高数据传输效率。
Kafka 使用零拷贝技术来优化数据传输,特别是在生产者将数据写入 Kafka 和消费者从 Kafka 读取数据的过程中。在 Kafka 中,零拷贝主要通过以下几种方式实现:
sendfile() 系统调用:在发送数据时,Kafka 使用操作系统的 sendfile() 系统调用直接将文件从磁盘发送到网络套接字,而无需将数据复制到应用程序的用户空间。这减少了数据复制次数,提高了传输效率。
文件内存映射(Memory-Mapped Files):Kafka 使用文件内存映射技术(mmap),将磁盘上的日志文件映射到内存中,使得读写操作可以在内存中直接进行,无需进行额外的数据复制。
比如 Broker 读取磁盘数据并把数据发送给 Consumer 的过程,传统 I/O 经历以下步骤。
读取数据:通过read 系统调用将磁盘数据通过 DMA copy 到内核空间缓冲区(Read buffer)。
拷贝数据:将数据从内核空间缓冲区(Read buffer) 通过 CPU copy 到用户空间缓冲区(Application buffer)。
写入数据:通过write()系统调用将数据从用户空间缓冲区(Application) CPU copy 到内核空间的网络缓冲区(Socket buffer)。
发送数据:将内核空间的网络缓冲区(Socket buffer)DMA copy 到网卡目标端口,通过网卡将数据发送到目标主机。
这一过程经过的四次 copy 如图所示。
注:零拷贝技术如何提高 Kakfa 的性能?
零拷贝技术通过减少 CPU 负担和内存带宽消耗,提高了 Kakfa 性能。
降低 CPU 使用率:由于数据不需要在内核空间和用户空间之间多次复制,CPU 的参与减少,从而降低了 CPU 使用率,腾出更多的 CPU 资源用于其他任务。
提高数据传输速度:直接从磁盘到网络的传输路径减少了中间步骤,使得数据传输更加高效,延迟更低。
减少内存带宽消耗:通过减少数据在内存中的复制次数,降低了内存带宽的消耗,使得系统能够处理更多的并发请求。
3 Partition 并发和分区负载均衡
3.1 Topic 主题
Topic 是 Kafka 中数据的逻辑分类单元,可以理解成一个队列。Broker 是所有队列部署的机器,Producer 将消息发送到特定的 Topic,而 Consumer 则从特定的 Topic 中消费消息。
3.2 Partition
为了提高并行处理能力和扩展性,Kafka 将一个 Topic 分为多个 Partition。每个 Partition 是一个有序的消息队列,消息在 Partition 内部是有序的,但在不同的 Partition 之间没有顺序保证。
Producer 可以并行地将消息发送到不同的 Partition,Consumer 也可以并行地消费不同的 Partition,从而提升整体处理能力。
因此,可以说,每增加一个 Paritition 就增加了一个消费并发。Partition的引入不仅提高了系统的可扩展性,还使得数据处理更加灵活。
3.3 Partition 分区策略
“生产者将消息发送到哪个分区是如何实现的?不合理的分配会导致消息集中在某些 Broker 上,岂不是完犊子。”
主要有以下几种分区策略:
轮询策略:也称Round-robin策略,即顺序分配。
随机策略:也称Randomness策略。所谓随机就是我们随意地将消息放置到任意一个分区上。
按消息键保序策略。
基于地理位置分区策略。
轮询策略
比如一个 Topic 下有 3个分区,那么第一条消息被发送到分区0,第二条被发送到分区1,第三条被发送到分区2,以此类推。
当生产第4条消息时又会重新开始,即将其分配到分区0,如图 5 所示。
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。
随机策略
所谓随机就是我们随意地将消息放置到任意一个分区上。如图所示,9 条消息随机分配到不同分区。
按消息键分配策略
一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,比如订单 ID,那么绑定同一个 订单 ID 的消息都会发布到同一个分区,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如图所示。
基于地理位置
这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。
4 Segment 日志文件和稀疏索引
前面已经介绍过,Kafka 的 Topic 可以分为多个 Partition,每个 Partition 有多个副本,你可以理解为副本才是存储消息的物理存在。其实每个副本都是以日志(Log)的形式存储。
“日志文件过大怎么办?”
为了解决单一日志文件过大的问题,kafka采用了分段(Segment)的形式进行存储。
所谓 Segment,就是当一个日志文件大小到达一定条件之后,就新建一个新的 Segment,然后在新的Segment写入数据。Topic、Partition、和日志的关系如图所示。
一个 segment 对应磁盘上多个文件。
.index : 消息的 offset 索引文件。
.timeindex : 消息的时间索引文件(0.8版本加入的)。
.log : 存储实际的消息数据。
.snapshot : 记录了 producer 的事务信息。
.swap : 用于 Segment 恢复。
.txnindex 文件,记录了中断的事务信息。
.log 文件存储实际的 message,kafka为每一个日志文件添加了2 个索引文件 .index以及 .timeindex。
segment 文件命名规则:partition 第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
“为什么要有 .index 文件?”
为了提高查找消息的性能。kafka 为消息数据建了两种稀疏索引,一种是方便 offset 查找的 .index 稀疏索引,还有一种是方便时间查找的 .timeindex 稀疏索引。
4.1 稀疏索引
“为什么不创建一个哈希索引,从 offset 到物理消息日志文件偏移量的映射关系?”
万万不可,Kafka 作为海量数据处理的中间件,每秒高达几百万的消息写入,这个哈希索引会把把内存撑爆炸。
稀疏索引不会为每个记录都保存索引,而是写入一定的记录之后才会增加一个索引值,具体这个间隔有多大则通过 log.index.interval.bytes 参数进行控制,默认大小为 4 KB,意味着 Kafka 至少写入 4KB 消息数据之后,才会在索引文件中增加一个索引项。
哈希稀疏索引把消息划分为多个 block ,只索引每个 block 第一条消息的 offset 即可 。
Offset 偏移量:表示第几个消息。
position:消息在磁盘的物理位置。
“如果消费者要查找 Offset 为 4 的消息,查找过程是怎样的?”
首先用二分法定位消息在哪个 Segment ,Segment 文件命名是 Partition 第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。
打开这个 Segment 对应的 index 索引文件,用二分法查找 offset 不大于 4 的索引条目,对应上图第二条条目,也就是 offset = 3 的那个索引。通过索引我们可以知道 offset 为 4 的消息所在的日志文件磁盘物理位置为 495。
打开日志文件,从 Position 为 495 位置开始开始顺序扫描文件,将扫描过程中每条消息的 offset 与 4 比较,直到找到 offset 为 4 的那条Message。
.timeindex 文件同理,只不过它的查找结果是 offset,之后还要在走一遍 .index 索引查找流程。
由于 kafka 设计为顺序读写磁盘,因此遍历区间的数据并对速度有太大的影响,而选择稀疏索引还能节约大量的磁盘空间。
4.2 mmap
有了稀疏索引,当给定一个 offset 时,Kafka 采用的是二分查找来扫描索引定位不大于 offset 的物理位移 position,再到日志文件找到目标消息。
利用稀疏索引,已经基本解决了高效查询的问题,但是这个过程中仍然有进一步的优化空间,那便是通过 mmap(memory mapped files) 读写上面提到的稀疏索引文件,进一步提高查询消息的速度。
进程通过调用mmap系统函数,将文件或物理内存的一部分映射到其虚拟地址空间。这个过程中,操作系统会为映射的内存区域分配一个虚拟地址,并将这个地址与文件或物理内存的实际内容关联起来。
一旦内存映射完成,进程就可以通过指针直接访问映射的内存区域。这种访问方式就像访问普通内存一样简单和高效。
5 顺序读写磁盘
“不管如何,Kafka 读写消息都要读写磁盘,如何变快呢?”
磁盘就一定很慢么?人们普遍错误地认为硬盘很慢。然而,存储介质的性能,很大程度上依赖于数据被访问的模式。
同样在一块普通的7200 RPM SATA硬盘上,随机I/O(random I/O)与顺序I/O相比,随机I/O的性能要比顺序I/O慢3到4个数量级。
合理的方式可以让磁盘写操作更加高效,减少了寻道时间和旋转延迟。
回顾一下磁盘的运行原理,磁盘的运行原理如图所示。
硬盘在逻辑上被划分为磁道、柱面以及扇区。硬盘的每个盘片的每个面都有一个读写磁头。
完成一次磁盘 I/O ,需要经过寻道、旋转和数据传输三个步骤。
寻道:首先必须找到柱面,即磁头需要移动到相应磁道,这个过程叫做寻道,所耗费时间叫做寻道时间。寻道时间越短,I/O 操作越快,目前磁盘的平均寻道时间一般在 3-15ms。
旋转:磁盘旋转将目标扇区旋转到磁头下。这个过程耗费的时间叫做旋转时间。旋转延迟取决于磁盘转速,通常用磁盘旋转一周所需时间的 1/2 表示。比如:7200rpm 的磁盘平均旋转延迟大约为 60*1000/7200/2 = 4.17ms,而转速为 15000rpm 的磁盘其平均旋转延迟为 2ms。
数据传输:数据在磁盘与内存之间的实际传输。
因此,如果在写磁盘的时候省去寻道、旋转可以极大地提高磁盘读写的性能。
Kafka 采用顺序写文件的方式来提高磁盘写入性能。顺序写文件,顺序 I/O 的时候,磁头几乎不用换道,或者换道的时间很短。减少了磁盘寻道和旋转的次数。磁头再也不用在磁道上乱舞了,而是一路向前飞速前行。
Kafka 中每个Partition 是一个有序的,不可变的消息序列,新的消息可以不断追加到 Partition 的末尾,在 Kafka 中 Partition 只是一个逻辑概念,每个Partition 划分为多个 Segment,每个 Segment 对应一个物理文件,Kafka 对 Segment 文件追加写,这就是顺序写文件。
每条消息在发送前会根据负载均衡策略计算出要发往的目标 Partition 中,broker 收到消息之后把该条消息按照追加的方式顺序写入 Partition 的日志文件中。
如下图所示,可以看到磁盘顺序写的性能远高于磁盘随机写,甚至比内存随机写还快。
6 PageCache
“使用稀疏索引和 mmap 内存映射技术提高读消息的性能;Topic Partition 加磁盘顺序写持久化消息的设计已经很快了,但是与内存顺序写还是慢了,还有优化空间么?”
作为快到令人发指的 Kafka,确实想到了一个方式来提高读写写磁盘文件的性能。这就是接下来的主角 Page Cache 。
简而言之:利用操作系统的缓存技术,在读写磁盘日志文件时,操作的是内存,而不是文件,由操作系统决定什么在某个时间将 Page Cache 的数据刷写到磁盘中。
Producer 发送消息到 Broker 时,Broker 会使用 pwrite() 系统调用写入数据,此时数据都会先写入page cache。
Consumer 消费消息时,Broker 使用 sendfile() 系统调用函数,通零拷贝技术地将 Page Cache 中的数据传输到 Broker 的 Socket buffer,再通过网络传输到 Consumer。
leader 与 follower 之间的同步,与上面 consumer 消费数据的过程是同理的。
Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。
当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。
于是我们得到一个重要结论:如果Kafka producer的生产速率与consumer的消费速率相差不大,那么就能几乎只靠对broker page cache的读写完成整个生产-消费过程,磁盘访问非常少。
实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。
7 数据压缩和批量处理
数据压缩在 Kafka 中有助于减少磁盘空间的使用和网络带宽的消耗,从而提升整体性能。
通过减少消息的大小,压缩可以显著降低生产者和消费者之间的数据传输时间。
Kafka 支持的压缩算法有哪些?
在Kafka 2.1.0版本之前,Kafka支持3种压缩算法:GZIP、Snappy和LZ4。从2.1.0开始,Kafka正式支持Zstandard算法(简写为zstd)。
这么多压缩算法,我如何选择?
一个压缩算法的优劣,有两个重要的指标:压缩比,文件压缩前的大小与压缩后的大小之比,比如源文件占用 1000 M 内存,经过压缩后变成了 200 M,压缩比 = 1000 /200 = 5,压缩比越高越高;另一个指标是压缩/解压缩吞吐量,比如每秒能压缩或者解压缩多少 M 数据,吞吐量越高越好。
7.1 生产者压缩
Kafka 的数据压缩主要在生产者端进行。具体步骤如下:
生产者配置压缩方式:在 KafkaProducer 配置中设置 compression.type 参数,可以选择 gzip、snappy、lz4 或 zstd。
消息压缩:生产者将消息批量收集到一个 batch 中,然后对整个 batch 进行压缩。这种批量压缩方式可以获得更高的压缩率。
压缩消息存储:压缩后的 batch 以压缩格式存储在 Kafka 的主题(Topic)分区中。
消费者解压缩:消费者从 Kafka 主题中获取消息时,首先对接收到的 batch 进行解压缩,然后处理其中的每一条消息。
7.2 解压缩
有压缩,那必有解压缩。通常情况下,Producer 发送压缩后的消息到 Broker ,原样保存起来。
Consumer 消费这些消息的时候,Broker 原样发给 Consumer,由 Consumer 执行解压缩还原出原本的信息。
Consumer 咋知道用什么压缩算法解压缩?
Kafka会将启用了哪种压缩算法封装进消息集合中,这样当Consumer读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。
总之一句话:Producer端压缩、Broker端保持、Consumer端解压缩。
7.3 批量数据处理
Kafka Producer 向 Broker 发送消息不是一条消息一条消息的发送,将多条消息打包成一个批次发送。
批量数据处理可以显著提高 Kafka 的吞吐量并减少网络开销。
Kafka Producer 的执行流程如下图所示:
发送消息依次经过以下处理器:
Serialize:键和值都根据传递的序列化器进行序列化。优秀的序列化方式可以提高网络传输的效率。
Partition:决定将消息写入主题的哪个分区,默认情况下遵循 murmur2 算法。自定义分区程序也可以传递给生产者,以控制应将消息写入哪个分区。
Compression:默认情况下,在 Kafka 生产者中不启用压缩。Compression 不仅可以更快地从生产者传输到代理,还可以在复制过程中进行更快的传输。压缩有助于提高吞吐量,降低延迟并提高磁盘利用率。
Record Accumulator:Accumulate顾名思义,就是一个消息累计器。其内部为每个 Partition 维护一个Deque双端队列,队列保存将要发送的 Batch批次数据,Accumulate将数据累计到一定数量,或者在一定过期时间内,便将数据以批次的方式发送出去。记录被累积在主题每个分区的缓冲区中。根据生产者批次大小属性将记录分组。主题中的每个分区都有一个单独的累加器 / 缓冲区。
Group Send:记录累积器中分区的批次按将它们发送到的代理分组。批处理中的记录基于 batch.size 和 linger.ms 属性发送到代理。记录由生产者根据两个条件发送。当达到定义的批次大小或达到定义的延迟时间时。
Send Thread:发送线程,从 Accumulator 的队列取出待发送的 Batch 批次消息发送到 Broker。
Broker 端处理:Kafka Broker 接收到 batch 后,将其存储在对应的主题分区中。
消费者端的批量消费:消费者可以配置一次拉取多条消息的数量,通过 fetch.min.bytes 和 fetch.max.wait.ms 参数控制批量大小和等待时间。
8 无锁轻量级 offset
Offset 是 Kafka 中的一个重要概念,用于标识消息在分区中的位置。
每个分区中的消息都有一个唯一的 offset,消费者通过维护自己的 offset 来确保准确消费消息。offset 的高效管理对于 Kafka 的性能至关重要。
offset 是从 0 开始的,每当有新的消息写入分区时,offset 就会加 1。offset 是不可变的,即使消息被删除或过期,offset 也不会改变或重用。
Consumer需要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为Consumer能够同时消费多个partition的数据,所以位移的提交实际上是在partition粒度上进行的,即Consumer需要为分配给它的每个partition提交各自的位移数据。
提交位移主要是为了表征Consumer的消费进度,这样当Consumer发生故障重启之后,就能够从Kafka中读取之前提交的位移值,然后从相应的位移处继续消费。
在传统的消息队列系统中,offset 通常需要通过锁机制来保证一致性,但这会带来性能瓶颈。Kafka 的设计哲学是尽量减少锁的使用,以提升并发处理能力和整体性能。
8.1 无锁设计思想
Kafka 在 offset 设计中采用了一系列无锁的技术,使其能够在高并发的环境中保持高效。
顺序写入:Kafka 使用顺序写入的方式将消息追加到日志文件的末尾,避免了文件位置的频繁变动,从而减少了锁的使用。
MMAP 内存映射文件:Kafka 使用内存映射文件(Memory Mapped File)来访问日志数据和索引文件。这种方式使得文件数据可以直接映射到进程的虚拟地址空间中,从而减少了系统调用的开销,提高了数据访问的效率。
零拷贝:Kafka 使用零拷贝(Zero Copy)技术,将数据从磁盘直接传输到网络,绕过了用户态的复制过程,大大提高了数据传输的效率。
批量处理:Kafka 支持批量处理消息,在一个批次中同时处理多个消息,减少了网络和 I/O 的开销。
8.2 消费者 Offset 管理流程
启动消费者:消费者启动并订阅 Kafka 主题的某个分区。
从分区读取消息:消费者从指定分区中读取消息。
处理消息:消费者处理读取到的消息。
是否成功处理:判断消息是否成功处理。 如果成功处理,更新 Offset。 如果处理失败,记录失败原因并准备重新处理。
更新 Offset:成功处理消息后,更新 Offset 以记录已处理消息的位置。
提交 Offset:将更新后的 Offset 提交到 Kafka,以确保消息处理进度的持久化。
继续处理下一个消息:提交 Offset 后,继续读取并处理下一个消息。
Kafka 通过无锁轻量级 offset 的设计,实现了高性能、高吞吐和低延时的目标。