申明:本篇博客,只是用于自我学习,提升,没有任何其他用途。不便贴出任何涉及代码层面的东西。只是学习其思想。
由于公司用的是自研中间件tmq(事务消息队列),看源码的时候,顺便总结一下。
tmq:除了支持普通的消息投递订阅以外,还支持事务消息。它的通信框架是nettey来实现的。同时,又可以通过自研的配置中心来实现自由配置。主要分为三个模块:tmq-broker,tmq-client,tmq-common。
tmq-broker:用来承接消息投递和订阅。其中的消息队列使用开源框架Disruptor中的RingBuffer来实现。同时实现了事务消息,消息DB存储,消息堆积报警,系统平滑扩容等特性。
tmq-client:是提供给业务方使用的二方包,其中有Producer和Customer两个模型,能够向broker投递消息以及消费broker推过来的消息。同时,向broker发送状态报文,从而让broker感知各个客户端的心跳。
tmq-common:是一个工具模块,其中定义了TCP传输中的一些协议以及一些常用的工具类。这三大模块将在之后的章节里进行详细的介绍。
tmq-common:主要是netty和protocol
PubMsgProtocol为pub向broker所投递的消息。
AckPubMsgProtocol为broker对pub投递消息状态的应答。
PubReportProtocol为pub向broker上报自身的状态,从而保持心跳。
TransactionStatusProtocol为pub向broker所投递的本地事务执行情况的信息。
CheckMsgProtocol为broker向producer发送check包,check本地事务是否成功。
SubReportProtocol为sub向broker上报自身状态,从而保持心跳。
SubMsgProtocol为broker向sub所投递的消息。
AckSubMsgProtocol为sub向broker消费msg状态的应答。
tmq-broker:包括消息队列、事务消息、消息DB存储、broker两两互备、心跳感应、消费的投递与消费、系统平滑扩容以及消息推挤报警等特性。而实现这些功能的核心是ProducerListener、CustomerListener、BrokerListener、DisruptorMsgQueue、ConfigManager、MsgGroupCfgManager这些类。
broker工作流程:
ProducerListener通过监听8085端口,感知pub所投递过来的msg,并将非half类型(这个将属于事务消息的范畴,将在之后的章节里进行详细介绍,这里讨论的是普通消息)的msg存储到DB中。
随后,将msg放入消息队列,即DisruptorMsgQueue中,等待消费者消费。
发送线程从消息队列中获取消息。
查阅订阅关系,获取相应的channel。
将msg通过channel投递给响应的消费者。
同时,在broker初始化的时候,会启动一个check线程,将broker两两互备,通过BrokerListener感知各自的状态,同时,会进行扫表,来处理消息,以防止产生消息堆积。除此之外,broker上还有一个CustomerListener,用来感知消费者消费msg后的响应,根据msg是否被消费成功,从而更新msg在DB中的状态。ProducerListener和CustomerListener还有一个比较重要的功能,它们会监听pub和sub发送过来的ReportProtocol,从而感知pub和sub的心跳。下边将分节对ProducerListener、CustomerListener和BrokerListener进行详细介绍。
事务消息
pub向broker投递事务消息,消息中的msg_type字段为2。
pub接下来执行本地事务。
broker接收到事务消息后,将消息存储到DB。
pub将本地事务执行结果传递给broker。
broker根据本地事务的执行结果做出不同的操作。成功则对消息进行投递,并更新msg_type字段为3;失败则删除DB中的消息。
check线程扫表,将msg_type=1或3的消息放到RingBuffer中等待向sub投递。