1. Kafka介绍
Kafka是LinkedIn开发并开源的一套分布式的高性能消息引擎服务。
- 消息系统:Kafka具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等强大的功能。
- 存储系统:Kafka 的消息持久化功能和多副本机制,我们可以把Kafka作为长期的数据存储系统来使用。
- 流式处理平台:Kafka还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作,也是一个分布式流处理平台。
1.1 基础架构
- topic为单位,分区后有多块(Partition),每块又是主从复制的,每块是有个leader的(生成消费和leader打交道)。
- 每个服务叫做 Broker,可以认为部署了一个 kafka 实例。
- 消费者组,包含多个消费者。一个分区只能由一个消费者消费。
- 消费者组之间互不影响,可以同时消费一个 topic。
1.2 主题和分区
一个主题下面有多个分区,这些分区会存储到不同的服务器上面。
生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。
每个 Partition 可以设置多个副本。它们会选取一个副本作为 Leader,而其余的作为 Follower。
我们的生产者在发送数据的时候,是直接发送到 Leader Partition 里面,然后 Follower Partition 会去 Leader 那里自行同步数据,消费者消费数据的时候,也是从 Leader 那去消费数据的。
1.3 副本
- AR:分区中所有的副本。
- ISR: 和leader保持一定程度的副本。默认情况下,只有 ISR 晋升为 Leader(也可以通过修改相应的参数配置来改变)
- OSR: 落后过多的副本。
- HW:高水位,消费者只能拉取到这个offset之前的消息。
1.4 生产者
1. ACK 机制
0:发送完就认为成功,很可能发生数据丢失。
1: Kafka 默认的设置。表示 Producer 要 Leader 确认已成功接收数据才发送下一条(批)Message。
ALL: Leader 接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都确认消息已同步,Producer 才发送下一条(批)Message。
不过在消息被写入所有同步副本之前,Kafka不允许消费者读取它们。
2. 配置
为了提升集群的数据持久性,可以将min.insync.replicas设置为2,确保至少有两个副本跟生产者保持“同步”。生产者需要配合将ack设置为all,这样就可以确保至少有两个副本(首领和另一个副本)确认写入成功,从而防止在以下情况下丢失数据。
1.5 消费者
在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在Zookeeper或Kafka上,如果消费者关闭或重启,它的读取状态不会丢失。
随着时间的推移,Kafka对ZooKeeper的依赖在减少。新版的Kafka消费者,并将偏移量提交到Kafka,消除对ZooKeeper的依赖。
session.timeout.ms指定了消费者可以在多长时间内不与服务器发生交互而仍然被认为还“活着”,默认是10秒。
1. 消费者组
- 一个主题的分区,只能被消费组内一个消费者消费。
- 不同消费者组,可以消费同一个Topic,互不影响。
2. 提交偏移量
那么消费者是如何提交偏移量的呢?消费者会向一个叫作 __consumer_offset的主题发送消息,消息里包含每个分区的偏移量。 enable.auto.commit,你可以决定让消费者自动提交偏移量,也可以在代码里手动提交偏移量。
3. 再均衡触发重复消费
如果触发再均衡,再均衡完成之后,每个消费者可能会被分配新的分区,而不是之前读取的那个。
假设我们使用默认的5秒提交时间间隔,并且消费者在最后一次提交偏移量之后3秒会发生崩溃。再均衡完成之后,接管分区的消费者将从最后一次提交的偏移量的位置开始读取消息。这个偏移量实际上落后了3秒,所以在这3秒内到达的消息会被重复处理。
可以通过修改提交时间间隔来更频繁地提交偏移量,缩小可能导致重复消息的时间窗口,但无法完全避免。
1.6 消息分区策略
轮询
如果键为null,并且使用了默认的分区器,那么记录将被随机发送给主题的分区。分区器使用轮询调度(round-robin)算法将消息均衡地分布到各个分区中。从Kafka 2.4开始,在处理键为null的记录时,默认分区器使用的轮询调度算法具备了黏性。
key 指定分区
如果键不为空且使用了默认的分区器,那么Kafka会对键进行哈希(使用Kafka自己的哈希算法,即使升级Java版本,哈希值也不会发生变化),然后根据哈希值把消息映射到特定的分区。这里的关键在于同一个键总是被映射到同一个分区,
自定义策略
实现 Partitioner 接口就能自定义分区策略。
指定 Partiton 发送
1.7 消息保留策略
要么保留一段时间(比如7天),要么保留到消息达到一定大小的字节数(比如1GB)。
保留多使用log.retention.hours参数来配置时间,默认为168小时,也就是1周。
参数log.retention.bytes来指定,对应的是每一个分区。也就是说,如果一个主题包含8个分区,并且log.retention.bytes被设置为1 GB,那么这个主题最多可以保留8 GB的数据。
1.8 稀疏索引
kafka 所面临的查询场景其实很简单:能按照 offset 或者 timestamp 查到消息即可。
消息的 offset 完全可以设计成有序的(实际上是一个单调递增 long 类型的字段),这样消息在日志文件中本身就是有序存放的了。可以将消息划分成若干个 block,只索引每个 block 第一条消息的 offset 即可。
当给定一个 offset 时,Kafka 采用的是二分查找来高效定位不大于 offset 的物理位移,然后找到目标消息。另外可以通过 mmap(memory mapped files) 读写上面提到的稀疏索引文件,进一步提高查询消息的速度。
因为索引文件是稀疏的,它们相对较小。将它们映射到内存中可以加快查找过程,这是内存映射文件提供的主要好处。
注意:mmap 和 page cache 是两个概念,网上很多资料把它们混淆在一起。此外,还有资料谈到 Kafka 在读 log 文件时也用到了 mmap,通过对 2.8.0 版本的源码分析,这个信息也是错误的,其实只有索引文件的读写才用到了 mmap。
2. 头脑风暴
2.1 kafka性能高的原因
生产消息的时候,批量发送,消息压缩。
存储消息的时候,IO对路复用,磁盘顺序写 ,利用 PageCache。
消费消息的时候,零拷贝技术。
2.2 怎么保证消息顺序
1. 发送者顺序
- 首先要考虑同步发送消息。 acks > 0
- 调用 send 方法返回的 Future 对象的 get 方式阻塞等待结果。
- 打开幂等性,设置
enable.idempotence = true
,可以给消息添加序列号,每次会把序列号递增 1。
2. 消费者顺序
只使用一个消费者。
消费端采用一个阻塞队列。
提高消费端的处理性能避免触发再均衡。
3. broker顺序
- 1个Topic(主题)只创建1个Partition(分区)。
- 同类别消息有同样的 key,就会被分配到同样的分区中,保证有序。
2.3 如何避免丢失消息
1. 发送端丢失
Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。
配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
2. 消费端丢失
- 确定消费完成后才提交消息。
- 如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。
3. Broker 端丢失
Kafka 收到消息后会先存储在也缓存中(Page Cache)中,之后由操作系统根据自己的策略进行刷盘或者通过 fsync 命令强制刷盘。如果系统挂掉,在 PageCache 中的数据就会丢失。
- 设置 acks = all。acks 是 Producer 的一个参数,所有副本 Broker 都要接收到消息,该消息才算是“已提交”。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,落后太多的不能成为新的 Leader。
- 设置 replication.factor >= 3。这也是 Broker 端的参数。将消息多保存几份冗余。
- 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。
2.4 如何避免重复消费
原因:
再平衡机制
在Kafka里面有一个Partition Balance机制,重新Rebalance之后,Consumer还是会从之前没提交的Offset位置开始消费,也会导致消息重复消费的问题。
自动提交间隔
Kafka消费端的自动提交逻辑有一个默认的5秒间隔,在Consumer消费的过程中,应用程序被强制kill掉或者宕机,可能会导致Offset没提交,从而产生重复提交的问题。
解决:
- 提高消费端的处理性能避免触发Balance,比如可以用异步的方式来处理消息,缩短单个消息消费的时间。或者还可以调整消息处理的超时时间。还可以减少一次性从Broker上拉取数据的条数。
- 可以针对消息生成md5然后保存到mysql或者redis里面,在处理消息之前先去mysql或者redis里面判断是否已经消费过。这个方案其实就是利用幂等性的思想。
3. 参考资料
- https://colobu.com/2019/09/27/install-Kafka-on-Mac/
- https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients/
- https://cloud.tencent.com/developer/article/1541215
- https://www.lixueduan.com/posts/kafka/09-avoid-msg-lost/
- https://mp.weixin.qq.com/s/YJFltTP4J5si1Z5SbuMUJw
- https://www.cnblogs.com/youngchaolin/p/12641463.html
- kafka权威指南2.0