一、简介
mica-mqtt 基于 t-io 实现的简单、低延迟、高性能 的 mqtt 物联网开源组件。使用详见 mica-mqtt gitee 源码 mica-mqtt-example 模块。
在多个朋友咨询 mica-mqtt 集群之后我添加了一个 mica-mqtt-broker 模块演示了基于 redis pub/sub 实现集群实现。
二、功能
[x] 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。
[x] 支持 websocket mqtt 子协议(支持 mqtt.js)。
[x] 支持 http rest api,http api 文档详见。
[x] 支持 MQTT client 客户端。
[x] 支持 MQTT server 服务端。
[x] 支持 MQTT 遗嘱消息。
[x] 支持 MQTT 保留消息。
[x] 支持自定义消息(mq)处理转发实现集群。
[x] MQTT 客户端 阿里云 mqtt 连接 demo。
[x] 支持 GraalVM 编译成本机可执行程序。
[x] 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。
[x] mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。
[x] 基于 redis pub/sub 实现集群,详见 mica-mqtt-broker 模块。
三、待办
[ ] 优化处理 mqtt session,以及支持部分 mqtt v5.0 新特性。
四、更新记录
✨ 添加 mica-mqtt-broker 模块,基于 redis pub/sub 实现 mqtt 集群。
✨ mica-mqtt-broker 基于 redis 实现客户端状态存储。
✨ mica-mqtt-broker 基于 redis 实现遗嘱、保留消息存储。
✨ mqtt-server http api 调整订阅和取消订阅,方便集群处理。
✨ mica-mqtt-spring-boot-example 添加 mqtt 和 http api 认证示例。
✨ 添加 mqtt 5 所有 ReasonCode。
✨ 优化解码 PacketNeededLength 计算。
? 修复遗嘱消息,添加消息类型。
? 修复 mqtt-server 保留消息匹配规则。
五、Spring boot 快速接入
5.1 添加依赖
<dependency><groupId>net.dreamlu</groupId><artifactId>mica-mqtt-spring-boot-starter</artifactId><version>1.1.2</version></dependency>
5.2 服务端配置示例
mqtt:server:enabled:true#是否开启,默认:trueip:127.0.0.1#服务端ip默认:127.0.0.1port:5883#端口,默认:1883name:Mica-Mqtt-Server#名称,默认:Mica-Mqtt-Serverbuffer-allocator:HEAP#堆内存和堆外内存,默认:堆内存heartbeat-timeout:120000#心跳超时,单位毫秒,默认:1000*120read-buffer-size:8092#接收数据的buffersize,默认:8092max-bytes-in-message:8092#消息解析最大bytes长度,默认:8092debug:true#如果开启prometheus指标收集建议关闭websocket-enable:true#开启websocket子协议,默认开启websocket-port:8083#websocket端口,默认:8083
5.3 服务端可实现接口(注册成 Spring Bean 即可)
5.4 Prometheus + Grafana 监控对接
得益于 t-io 良好的设计,监控指标直接对接的 t-iostat,目前支持下列指标,后期会不断完善。
关于 mica-mqtt-spring-boot-starter 更多请查看文档:https://gitee.com/596392912/mica-mqtt/tree/master/mica-mqtt-spring-boot-starter
六、普通 java 项目接入
6.1 maven 依赖
<dependency><groupId>net.dreamlu</groupId><artifactId>mica-mqtt-core</artifactId><version>1.1.2</version></dependency>
6.2 mica-mqtt 客户端
//初始化mqtt客户端MqttClientclient=MqttClient.create().ip("127.0.0.1").port(1883)//默认:1883.username("admin").password("123456").version(MqttVersion.MQTT_5)//默认:3_1_1.clientId("xxxxxx")//默认:MICA-MQTT-前缀和36进制的纳秒数.connect();//连接//消息订阅,同类方法subxxxclient.subQos0("/test/#",(topic,payload)->{logger.info(topic+'\t'+ByteBufferUtil.toString(payload));});//取消订阅client.unSubscribe("/test/#");//发送消息client.publish("/test/client",ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));//断开连接client.disconnect();//重连client.reconnect();//停止client.stop();
6.3 mica-mqtt 服务端
//注意:为了能接受更多链接(降低内存),请添加jvm参数-Xss129kMqttServermqttServer=MqttServer.create()//默认:127.0.0.1.ip("127.0.0.1")//默认:1883.port(1883)//默认为:8092(mqtt默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大t-io会尝试解析多次(建议根据实际业务情况而定).readBufferSize(512)//自定义认证.authHandler((clientId,userName,password)->true)//消息监听.messageListener((clientId,topic,mqttQoS,payload)->{logger.info("clientId:{}topic:{}mqttQoS:{}message:{}",clientId,topic,mqttQoS,ByteBufferUtil.toString(payload));}).debug()//开启t-iodebug信息日志.start();//发送给某个客户端mqttServer.publish("clientId","/test/123",ByteBuffer.wrap("mica最牛皮".getBytes()));//发送给所有在线监听这个topic的客户端mqttServer.publishAll("/test/123",ByteBuffer.wrap("mica最牛皮".getBytes()));//停止服务mqttServer.stop();