1. 基础原理
Kafka是LinkedIn开发并开源的一套分布式的高性能消息引擎服务。
- 消息系统:Kafka具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等强大的功能。
- 存储系统:Kafka 的消息持久化功能和多副本机制,我们可以把Kafka作为长期的数据存储系统来使用。
- 流式处理平台:Kafka还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作,也是一个分布式流处理平台。
1.1 基础架构

可以把topic想象成一个表,把 partition对应系统上就是一个或若干个目录,一个服务器节点是一个Broker。
- topic为单位,分区后有多块(Partition),每块又是主从复制的,每块是有个leader的(生成消费和leader打交道)。
- 每个服务叫做 Broker,可以认为部署了一个 kafka 实例。
- 消费者组,包含多个消费者。一个分区只能由一个消费者消费。
- 消费者组之间互不影响,可以同时消费一个 topic。
1.2 主题和分区
一个主题下面有多个分区,这些分区会存储到不同的服务器上面。
生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。
每个 Partition 可以设置多个副本。它们会选取一个副本作为 Leader,而其余的作为 Follower。
我们的生产者在发送数据的时候,是直接发送到 Leader Partition 里面,然后 Follower Partition 会去 Leader 那里自行同步数据,消费者消费数据的时候,也是从 Leader 那去消费数据的。
1.3 分区副本

- AR:分区中所有的副本。
- ISR: 和leader保持一定程度的副本。默认情况下,只有 ISR 晋升为 Leader(也可以通过修改相应的参数配置来改变)
- OSR: 落后过多的副本。
- HW:高水位,消费者只能拉取到这个offset之前的消息。

1.4 存储原理
- Kafka的消息是存在于文件系统之上的,Kafka高度依赖文件系统来存储和缓存消息。
- 操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存,所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接/O会绕过磁盘缓存)
- Kafka正是利用顺序IO,以及Page Cache达成的超高吞吐。
- 任何发布到Partition的消息都会被追加到Partition数据文件的尾部,这样的顺序写磁盘操作让Kafka的效率非常高。


2. 生产和消费
2.1 生产者
Kafka消息就是一个个的键–值对,ProducerRecord对象可以只包含主题名称和值,键默认情况下是null。不过,大多数应用程序还是会用键来发送消息。
- 如果记录中指定了分区,则直接使用。
- 如果未指定分区,但指定了key值,则根据key的hash值选择一个分区(相同的key所发送到的Partition是同一个,可用来保证消息的局部有序性)
- 如果未指定分区,也未指定key值,则以 ‘黏性分区’ 策略(2.4版本以前使用轮询策略)选择一个分区。
发送消息 Partition 策略
指定 Partiton 发送
key hash 同一个分区
如果 key不为空,会对key 进行哈希,同一个key 总是被映射到同一个分区。
轮询
如果key为null,并且使用了默认的分区器,那么记录将被随机发送给主题的分区。分区器使用轮询调度(round-robin)算法将消息均衡地分布到各个分区中。从Kafka 2.4开始,在处理键为null的记录时,默认分区器使用的轮询调度算法具备了黏性。
自定义策略
实现 Partitioner 接口就能自定义分区策略。
2.2 消费者
在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在Zookeeper或Kafka上,如果消费者关闭或重启,它的读取状态不会丢失。
随着时间的推移,Kafka对ZooKeeper的依赖在减少。新版的Kafka消费者,并将偏移量提交到Kafka,消除对ZooKeeper的依赖。
session.timeout.ms指定了消费者可以在多长时间内不与服务器发生交互而仍然被认为还“活着”,默认是10秒。
1. 消费者组
- 一个主题的分区,只能被消费组内一个消费者消费。
- 不同消费者组,可以消费同一个Topic,互不影响。
2. 提交偏移量
那么消费者是如何提交偏移量的呢?消费者会向一个叫作 __consumer_offset的主题发送消息,消息里包含每个分区的偏移量。 enable.auto.commit,你可以决定让消费者自动提交偏移量,也可以在代码里手动提交偏移量。
3. 再均衡触发重复消费
如果触发再均衡,再均衡完成之后,每个消费者可能会被分配新的分区,而不是之前读取的那个。
假设我们使用默认的5秒提交时间间隔,并且消费者在最后一次提交偏移量之后3秒会发生崩溃。再均衡完成之后,接管分区的消费者将从最后一次提交的偏移量的位置开始读取消息。这个偏移量实际上落后了3秒,所以在这3秒内到达的消息会被重复处理。
可以通过修改提交时间间隔来更频繁地提交偏移量,缩小可能导致重复消息的时间窗口,但无法完全避免。
2.3 消息保留策略
要么保留一段时间(比如7天),要么保留到消息达到一定大小的字节数(比如1GB)。
保留多使用log.retention.hours参数来配置时间,默认为168小时,也就是1周。
参数log.retention.bytes来指定,对应的是每一个分区。也就是说,如果一个主题包含8个分区,并且log.retention.bytes被设置为1 GB,那么这个主题最多可以保留8 GB的数据。
3. 消息保证
3.1 消息可靠性保证
- Kafka可以保证分区中的消息是有序的。
- 一条消息只有在被写入分区所有的同步副本时才被认为是“已提交”的(但不一定要冲刷到磁盘上)。
- 消费者只能读取已提交的消息。
1. Broker 配置(复制系数,不同步首领,最小同步副本)
Topic:replication.factor Broker: default.replication.factor,主题的复制系数。
复制系数是N,那么在N–1个broker失效的情况下,客户端仍然能够从主题读取数据或向主题写入数据。也会占用N倍的磁盘空间。
Broker: unclean.leader.election.enable, 默认值是 false。
如果允许不同步副本成为首领,那么就要承担丢失数据和消费者读取到不一致的数据的风险。如果不允许它们成为首领,那么就要接受较低的可用性,因为必须等待原先的首领恢复到可用状态。
Topic | Broker: min.insync.replica,最少同步副本
对于一个包含3个副本的主题,如果min.insync.replicas 被设置为2,那么至少需要有两个同步副本才能向分区写入数据。
2. 生产者配置(ack的值,send函数)
为保证Producer发送的数据,能可靠地发送到指定的Topic,Topic的每个Partition 收到Producer发送的数据后,都需要向Producer发送ACK。如果Producer收到ACK,就会进行下一轮的发送,否则重新发送数据。
根据可靠性需求配置恰当的 acks。
acks=0 发送完就认为成功,很可能发生数据丢失。acks=1 Kafka 默认的设置。表示 Producer 要 Leader 确认已成功接收数据才发送下一条(批)Message。
acks=all Leader 接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都确认消息已同步,Producer 才发送下一条(批)Message。
为了提升集群的数据持久性,可以将Broker的 min.insync.replicas设置为2,确保至少有两个副本跟生产者保持“同步”。
生产者需要配合将ack设置为all,这样就可以确保至少有两个副本(首领和另一个副本)确认写入成功,从而防止在以下情况下丢失数据。
不过在消息被写入所有同步副本之前,Kafka不允许消费者读取它们。
代码里正确处理异常
让生产者在遇到可重试错误时保持重试。
处理不可重试的broker错误,比如消息大小错误、身份验证错误等。
3. 消费者配置(2个,offset reset,自动提交)
group.id
如果两个消费者具有相同的群组ID,并订阅了同一个主题,那么每个消费者将分到主题分区的一个子集,也就是说它们只能读取到所有消息的一个子集(但整个群组可以读取到主题所有的消息)。如果你希望一个消费者可以读取主题所有的消息,那么就需要为它设置唯一的 group.id。
auto.offset.reset
没有有效的偏移量。一个是 earliest,从分区的开始位置读取数据,保证最少的数据丢失。另一个值是 latest,从分区的末尾位置读取数据,很有可能会错过一些消息。
enable.auto.commit
自动提交的主要缺点是我们无法控制应用程序可能重复处理的消息的数量,比如消费者在还没有触发自动提交之前处理了一些消息,然后被关闭。
如果应用程序的处理逻辑比较复杂(比如把消息交给另外一个后台线程去处理),那么就只能使用手动提交了,因为自动提交机制有可能会在还没有处理完消息时就提交偏移量。
auto.commit.interval.ms
如果选择使用自动提交,那么可以通过这个参数来控制提交的频率,默认每5秒提交一次。
在遇到可重试错误时,把消息写到另一个重试主题,并继续处理其他消息。另一个消费者群组负责处理重试主题中的消息,或者让一个消费者同时订阅主主题和重试主题。这种模式有点儿像其他消息系统中的死信队列。
3.2 怎么保证消息顺序
1. 发送者顺序
- 首先要考虑同步发送消息。 acks > 0
- 调用 send 方法返回的 Future 对象的 get 方式阻塞等待结果。
- 打开幂等性,设置
enable.idempotence = true
,可以给消息添加序列号,每次会把序列号递增 1。
2. 消费者顺序
只使用一个消费者。
消费端采用一个阻塞队列。
提高消费端的处理性能避免触发再均衡。
3. broker顺序
- 1个Topic(主题)只创建1个Partition(分区)。
- 同类别消息有同样的 key,就会被分配到同样的分区中,保证有序。
3.3 如何避免丢失消息
1. 生产者丢失
Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。
配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
设置 acks = all。acks 是 Producer 的一个参数,所有副本 Broker 都要接收到消息,该消息才算是“已提交”。
2. 消费者丢失
- 确定消费完成后才提交消息。
- 如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。
3. Broker 端丢失
Kafka 收到消息后会先存储在也缓存中(Page Cache)中,之后由操作系统根据自己的策略进行刷盘或者通过 fsync 命令强制刷盘。如果系统挂掉,在 PageCache 中的数据就会丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,落后太多的不能成为新的 Leader。
- 设置 replication.factor >= 3。这也是 Broker 端的参数。将消息多保存几份冗余。
- 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。
3.4 如何避免重复消费
原因:
再平衡机制
在Kafka里面有一个Partition Balance机制,重新Rebalance之后,Consumer还是会从之前没提交的Offset位置开始消费,也会导致消息重复消费的问题。
自动提交间隔
Kafka消费端的自动提交逻辑有一个默认的5秒间隔,在Consumer消费的过程中,应用程序被强制kill掉或者宕机,可能会导致Offset没提交,从而产生重复提交的问题。
解决:
- 提高消费端的处理性能避免触发Balance,比如可以用异步的方式来处理消息,缩短单个消息消费的时间。或者还可以调整消息处理的超时时间。还可以减少一次性从Broker上拉取数据的条数。
- 可以针对消息生成md5然后保存到mysql或者redis里面,在处理消息之前先去mysql或者redis里面判断是否已经消费过。这个方案其实就是利用幂等性的思想。
4. 常见问题
4.1 性能高的原因

生产者:批量发送,消息压缩,高效序列化,内存池复用。
存储层:IO多路复用,磁盘顺序写,利用Page Cache,分区分段结构。
消费者:批量拉取,零拷贝技术,mmap,稀疏索引。
4.2 Kafka事务
1. 幂等生产者
幂等生产者使用起来非常简单,只需在生产者配置中加入enable.idempotence=true。
如果启用了幂等生产者,那么每条消息都将包含生产者ID(PID)和序列号。我们将它们与目标主题和分区组合在一起,用于唯一标识一条消息。
2. 事务如何保证精确一致性
Kafka事务使用两阶段提交和事务日志。
- 在开始第一个事务之前,生产者需要通过调用initTransaction()来注册自己。
- initTransaction()API注册了一个带有新事务ID的协调器或者增加现有事务ID的epoch,用以隔离变成“僵尸”的旧生产者。当epoch增加时,挂起的事务将被中止。
- 下一步是调用beginTransaction()。告诉生产者,现在有一个正在执行中的事务。
- 一旦生产者开始发送消息,告诉broker自己有一个执行中的事务,这些信息将被记录在事务日志中( __transaction_state主题)。
- 现在是提交或中止事务的时候了。commitTransaction()方法或abortTransaction()方法将向事务协调器发送一个EndTransactionRequest。事务协调器会把提交或中止事务的意图记录到事务日志中。
- 如果这个步骤执行成功,那么事务协调器将负责完成提交(或中止)过程。它会向所有涉及事务的分区写入一个提交标记,然后将提交成功的信息写入事务日志。
3. 如何使用事务
- 使用事务的最常见也最推荐的方式是在Streams中启用精确一次性保证。只需要将processing.guarantee设置为exactly_once或exactly_once_beta
- 如果想在不使用Streams的情况下获得精确一次性保证,该怎么办?这个时候,可以直接使用事务API。
5. 消息队列对比
5.1 Kafka vs RocketMQ
kafka设计初衷是用于日志传输,而RocketMQ的设计用于解决各类应用可靠的消息传输。
Kafka:
- 一个partition对应一个文件,每次消息来都是顺序写这个文件。
- 顺序写磁盘的方式存储消息,因此可以达到非常高的写入吞吐量。
- 定时刷盘,而不是每次写都刷盘,所以kafka的写非常高效。
- 当broker中的topic分区过多的时候,Kafka的性能不如rocketMq。
RocketMQ:
- Kafka是一个分区一个文件,当topic太多的时候,分区总数也会增加,在flush消息的时候,会产生文件对磁盘的竞争,出现性能下降的情况。
- rocketmq是把消息都写到一个CommitLog文件中,所以相当于一个文件的顺序写。kafka的partition存储的是整个消息, Rocketmq 不是仅仅把 partition改成了ConsumeQueue,ConsumeQueue存储的是每个消息在commitlog这个文件的地址,但是消息存在于commitlog中。
- Rocketmq 支持事务消息,定时消息,消费失败重试。
6. 参考资料
- https://colobu.com/2019/09/27/install-Kafka-on-Mac/
- https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients/
- https://cloud.tencent.com/developer/article/1541215
- https://www.lixueduan.com/posts/kafka/09-avoid-msg-lost/
- https://mp.weixin.qq.com/s/YJFltTP4J5si1Z5SbuMUJw
- https://www.cnblogs.com/youngchaolin/p/12641463.html
- 《kafka权威指南2.0》
- https://blog.csdn.net/shijinghan1126/article/details/104724407
- https://www.cnblogs.com/xijiu/p/16917741.html
- https://forum.huawei.com/enterprise/en/kafka-certificate-authentication-mechanism/thread/667284205782056960-667213860102352896