1. 领域模型
RocketMQ 中消息的生命周期主要分为消息生产、消息存储、消息消费这三部分。

1.1 消息生产
1.2 消息存储
主题(Topic)
消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。
队列(MessageQueue)
类比于其他消息队列中的分区,消息在队列内具备顺序性存储特征。
消息(Message)
最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。
1.3 消息消费
消费者分组(ConsumerGroup)
同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。
消费者(Consumer)
消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。
订阅关系(Subscription)
消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。
订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。
2. 消息类型
消息不可变性:消息一旦产生后,消息的内容不会发生改变。消费端获取的消息都是只读消息视图。
消息持久化:默认对消息进行持久化,即将接收到的消息存储到 RocketMQ 服务端的存储文件中,保证消息的可回溯性和系统故障场景下的可恢复性。
2.1 普通消息
普通消息一般应用于微服务解耦、事件驱动、数据集成[日志系统]等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。
生命周期

初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。
消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 如果一定时间后没有收到消费者的响应, RocketMQ会对消息进行重试处理。
消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。逻辑标记已消费,消费者仍然可以回溯消息重新消费。
消息删除:RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| MessageBuilder messageBuilder = new MessageBuilderImpl(); Message message = messageBuilder.setTopic("topic") .setKeys("messageKey") .setTag("messageTag") .setBody("messageBody".getBytes()) .build(); try { SendReceipt sendReceipt = producer.send(message); System.out.println(sendReceipt.getMessageId()); } catch (ClientException e) { e.printStackTrace(); }
MessageListener messageListener = new MessageListener() { @Override public ConsumeResult consume(MessageView messageView) { System.out.println(messageView); return ConsumeResult.SUCCESS; } };
List<MessageView> messageViewList = null; try { messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView -> { System.out.println(messageView); try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { e.printStackTrace(); }
|
2.2 定时/延时消息
场景举例:
例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求。
订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。
规则:
- 定时消息设置的定时时间是一个预期触发的系统时间戳,而不是一段延时时长。
- 定时时长最大值默认为24小时,不支持自定义修改。
- 定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度。
- 避免大量相同定时时刻的消息,造成系统压力过大。
生命周期

定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| MessageBuilder messageBuilder = new MessageBuilderImpl();;
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000; Message message = messageBuilder.setTopic("topic") .setKeys("messageKey") .setTag("messageTag") .setDeliveryTimestamp(deliverTimeStamp) .setBody("messageBody".getBytes()) .build(); try { SendReceipt sendReceipt = producer.send(message); System.out.println(sendReceipt.getMessageId()); } catch (ClientException e) { e.printStackTrace(); }
MessageListener messageListener = new MessageListener() { @Override public ConsumeResult consume(MessageView messageView) { System.out.println(messageView.getDeliveryTimestamp()); return ConsumeResult.SUCCESS; } };
List<MessageView> messageViewList = null; try { messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView -> { System.out.println(messageView); try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { e.printStackTrace(); }
|
2.3 顺序消息
发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

场景举例:
股票系统先出价先交易的原则。
数据库变更增量同步,例如binlog发送到rocketmq。
生产顺序性:
如需保证消息生产的顺序性,则必须满足以下条件:
- 单一生产者
- 串行发送
满足以上条件的生产者,将顺序消息发送至 RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。
看上图,消息组1和消息组4的消息混合存储在队列1中, RocketMQ 保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。
消费顺序性:
- 业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。
- 消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。
- 有限重试:一条消息如果一直重试失败,超过最大重试次数后将不再重试。
使用建议:
如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。
串行消费,避免批量消费导致乱序。消费顺序为1->23(批量处理,失败)->23(重试处理)->4,此时可能由于消息3的失败导致消息2被重复处理,最后导致消息消费乱序。
消息组尽可能打散,避免集中导致热点。例如,将订单ID、用户ID作为消息组关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| MessageBuilder messageBuilder = new MessageBuilderImpl();; Message message = messageBuilder.setTopic("topic") .setKeys("messageKey") .setTag("messageTag") .setMessageGroup("fifoGroup001") .setBody("messageBody".getBytes()) .build(); try { SendReceipt sendReceipt = producer.send(message); System.out.println(sendReceipt.getMessageId()); } catch (ClientException e) { e.printStackTrace(); }
MessageListener messageListener = new MessageListener() { @Override public ConsumeResult consume(MessageView messageView) { System.out.println(messageView); return ConsumeResult.SUCCESS; } };
List<MessageView> messageViewList = null; try { messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView -> { System.out.println(messageView); try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { e.printStackTrace(); }
|
2.4 事务消息
分布式事务消息在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
场景举例:
用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。

事务消息处理流程

- 生产者将消息发送至 RocketMQ服务端。【这是一个半事务消息】
- RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为”暂不能投递”,这种状态下的消息即为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 若服务端一直未收到发送者提交的二次确认结果,服务端将对生产者集群中任一生产者实例发起消息回查。
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
生命周期

- 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
- 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
- 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
- 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应, RocketMQ会对消息进行重试处理。
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
使用规则
- RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。
- 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| private static boolean checkOrderById(String orderId) { return true; }
private static boolean doLocalTransaction() { return true; }
public static void main(String[] args) throws ClientException { ClientServiceProvider provider = new ClientServiceProvider(); MessageBuilder messageBuilder = new MessageBuilderImpl(); Producer producer = provider.newProducerBuilder() .setTransactionChecker(messageView -> {
final String orderId = messageView.getProperties().get("OrderId"); if (Strings.isNullOrEmpty(orderId)) { return TransactionResolution.ROLLBACK; } return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK; }) .build(); final Transaction transaction; try { transaction = producer.beginTransaction(); } catch (ClientException e) { e.printStackTrace(); return; } Message message = messageBuilder.setTopic("topic") .setKeys("messageKey") .setTag("messageTag") .addProperty("OrderId", "xxx") .setBody("messageBody".getBytes()) .build(); final SendReceipt sendReceipt; try { sendReceipt = producer.send(message, transaction); } catch (ClientException e) { return; }
boolean localTransactionOk = doLocalTransaction(); if (localTransactionOk) { try { transaction.commit(); } catch (ClientException e) { e.printStackTrace(); } } else { try { transaction.rollback(); } catch (ClientException e) { e.printStackTrace(); } } }
|
3. 头脑风暴
3.1 延时消息原理

会把消息投递到延时队列(Topic = SCHEDULE_TOPIC_XXXX),特殊的topic。
定时任务线程池会有 18 个线程来对延时队列进行调度,每个线程调度一个延时级别,调度任务把延时消息再投递到原始队列,这样 Consumer 就可以拉取到了。
为了弥补延时消息的不足,RocketMQ 5.0 引入了定时消息,定时消息引入了秒级的时间轮算法。时间轮算法的优势是不用去遍历所有的任务,每一个时间节点上的任务用链表串起来,当时间轮上的指针移动到当前的时间时,这个时间节点上的全部任务都执行。
可以用定时消息可以做定时任务吗?
可以。
3.2 顺序消息原理
- 发送消息时,指定消息组。
- 单一生产者串行发送,消费者也建议串行消费。
3.3 事务消息原理
半事务消息 + 二阶段提交。
消息存在事务存储系统中。
4. 参考资料