RocketMQ 源码版本 4.9.1
概述
整体架构
各角色说明:
NameServer,负责提供路由服务
Producer,生产者,负责发送消息
Broker,消息队列,负责存储消息并提供相关的API操作
Consumer,消费者,负责消费消息
生产-消费模型
消息发送方式:同步,异步,单向
消息类型:普通消息(包含延迟消息),顺序消息(全局顺序与局部顺序),事务消息
本文通过同步发送普通消息的Demo,来了解消息发送的主要流程。
生产者
下面看到一个生产者发送消息的 demo
主要做了几件事:
初始化一个生产者(DefaultMQProducer)对象
设置 NameServer 的地址
启动生产者
发送消息
启动
启动过程中,主要的几个定时任务:
NameServer 地址定时更新任务(没有显示指定 NameServer 的场景),支持动态更新 NameServer 地址
本地路由信息定时更新任务,定时从 NameServer 拉取最新的路由信息更新到本地缓存
Broker心跳检测任务,定时向Broker集群发送心跳包,并清除已下线的Broker
注意:由于生产者与消费者底层都是通过 MQClientInstance 类与 Broker 服务通信,而消息拉取服务,重负载均衡服务都是针对消费者端的服务,因此即使生产者启动了这两个服务,实际也不会生效。
消息发送
消息发送过程中,主要包含两个核心步骤:
路由信息获取,根据消息的 Topic 拿到可用 Broker 的服务路由
负载均衡机制,即选择合适的消息队列
路由获取
步骤如下:
从本地缓存获取指定 Topic 的路由信息,如果获取到则返回结果
从 NameServer 获取指定 Topic 的路由信息,如果获取到则更新本地缓存,返回结果
从 NameServer 获取默认 Topic 的路由信息,如果获取到则更新本地缓存,返回结果
负载均衡
宗旨就是均匀地把消息发送到各个 Broker 中的消息队列。
负载均衡策略分两种:
默认的负载均衡策略,即通过轮询的方式选择消息队列,在线程级别维护了一个队列下标计数器
故障延迟的负载均衡策略,选择消息队列时,在一定时间内会规避掉故障的 Broker
故障延迟机制
普通的负载均衡策略虽然也有规避故障 Broker 的逻辑,但它只能作用在一次消息发送的重试场景。
实现逻辑如下:
在线程级别维护一个队列下标计数器
每次选择队列时会将计数器的值 + 1,再和当前可用队列总数取模,最终计算出目标队列的下标
在重试发送消息时,会传入上次发送失败的 BrokerName,如果发现目标队列所属的 Broker 与上次发送失败的 Broker 名称相同,则继续步骤1,2 重新选择队列(通过这种方式规避故障的 Broker)
如果希望在多次发送消息过程中,规避掉发生故障的Broker,则可通过 sendLatencyFaultEnable 配置开启故障延迟机制。
整体流程如下:
核心点:
在消息发送失败后,会根据消息发送的延迟时间将对应的 Broker 隔离一段时间,称为故障隔离期
故障隔离期内生产者认为该 Broker 不可用,即在下次发送消息时会规避掉该 Broker。
隔离时间依据上一次消息发送的延迟时间来定,延迟时间越长则相应的隔离时间也越久。如下是 RocketMQ 定义的延迟时间与隔离时间的对应关系。
总结
本文主要包含以下内容:
生产者启动流程
生产者消息发送主流程
路由信息获取流程
生产者负载均衡策略,并主要介绍了故障延迟机制
通过阅读这篇文章,可以对 RocketMQ 消息发送流程有一个整体的认识,了解了它通过负载均衡策略,故障延迟机制实现了系统可扩展,高可用的特性。