Java-pahoMQTT
基于Paho的MQTT使用
一.准备工作
1.Broker选择
在进行编写编程前,需要先准备MQTT Broker主要进行接收发布的所有消息,并将其过滤后分发给不同的消息订阅者。

这里推荐使用 EMQTT
下载解压后,进入/bin文件,命令行执行emqx start 打开http://ip:18083就可以进入管理后台
2.Spring boot
pom.xml文件
<!--mqtt依赖支持mqtt5-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
二.编写
SpringBoot 项目中新建一个PahoMqttConfig文件进行Mqtt管理配置,注意加上@Configuration注解
@Configuration
public class PahoMqttConfig {
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
}
添加MqttConnectionOptions进行连接服务器配置
@Configuration
public class PahoMqttConfig {
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName("admin");
// 设置连接的密码
options.setPassword("admin".toCharArray());
options.setServerURIs(StringUtils.split(url, ","));
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
}
自定义MQTT发布订阅客户端
//主MQTT设备上报订阅客户端
@Bean(value = "pahoDeviceInfo")
public MqttAsyncClient pahoDeviceInfo() throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
MqttAsyncClient sampleClient = new MqttAsyncClient("tcp://127.0.0.1:1883", "deviceClientId", persistence);
sampleClient.setCallback(pahoDeviceCallBack); //设置响应回调
logger.info("pahoDeviceInfoClient Connecting to broker:{}" + serverURIs);
IMqttToken token = sampleClient.connect(getOptions());
token.waitForCompletion();
sampleClient.subscribe("deviceInfo",0);
return sampleClient;
}
注意到sampleClient.setCallback(pahoResponseCallBack);传入了一个pahoResponseCallBack在这个类中定义回调方法
@Component("pahoDeviceCallBack")
public class pahoDeviceCallBack implements MqttCallback {
private static Logger logger = LoggerFactory.getLogger(pahoDeviceCallBack.class);
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
logger.info("disconnected................");
}
@Override
public void mqttErrorOccurred(MqttException exception) {
logger.info("mqttErrorOccurred................");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String reust = new String(message.getPayload(), "utf-8");
logger.info("messageArrived 处理上报消息: {}",reust);
}
@Override
public void deliveryComplete(IMqttToken token) {
logger.info("deliveryComplete................");
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
logger.info("connectComplete................");
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {
logger.info("authPacketArrived................");
}
}
三.测试运行
项目运行,在emqtt界面上看到已经订阅deivceInfo topic
emqtt提供websocket进行测试



总结,以上为简单的Springboot MQTT操作 paho mqtt 在2020年7月代码更新了mqttv5版本,支持mqtt5协议。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 Biliar!