基于Paho的MQTT使用

一.准备工作

1.Broker选择

在进行编写编程前,需要先准备MQTT Broker主要进行接收发布的所有消息,并将其过滤后分发给不同的消息订阅者。

image
image

这里推荐使用 EMQTT

下载解压后,进入/bin文件,命令行执行emqx start 打开http://ip:18083就可以进入管理后台 image

2.Spring boot

pom.xml文件

<!--mqtt依赖支持mqtt5-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
            <version>1.2.5</version>
        </dependency>

二.编写

SpringBoot 项目中新建一个PahoMqttConfig文件进行Mqtt管理配置,注意加上@Configuration注解

@Configuration
public class PahoMqttConfig {
  private static final byte[] WILL_DATA;

  static {
    WILL_DATA = "offline".getBytes();
  }
}

添加MqttConnectionOptions进行连接服务器配置

@Configuration
public class PahoMqttConfig {
  private static final byte[] WILL_DATA;

  static {
    WILL_DATA = "offline".getBytes();
  }
  @Bean
  public MqttConnectOptions getMqttConnectOptions() {
    MqttConnectOptions options = new MqttConnectOptions();
    // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
    // 这里设置为true表示每次连接到服务器都以新的身份连接
    options.setCleanSession(true);
    // 设置连接的用户名
    options.setUserName("admin");
    // 设置连接的密码
    options.setPassword("admin".toCharArray());
    options.setServerURIs(StringUtils.split(url, ","));
    // 设置超时时间 单位为秒
    options.setConnectionTimeout(10);
    // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
    options.setKeepAliveInterval(20);
    // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
    options.setWill("willTopic", WILL_DATA, 2false);
    return options;
  }
}

自定义MQTT发布订阅客户端

//主MQTT设备上报订阅客户端
    @Bean(value = "pahoDeviceInfo")
    public MqttAsyncClient pahoDeviceInfo() throws MqttException {
        MemoryPersistence persistence = new MemoryPersistence();
        MqttAsyncClient sampleClient = new MqttAsyncClient("tcp://127.0.0.1:1883""deviceClientId", persistence);
        sampleClient.setCallback(pahoDeviceCallBack); //设置响应回调
        logger.info("pahoDeviceInfoClient Connecting to broker:{}" + serverURIs);
        IMqttToken token = sampleClient.connect(getOptions());
        token.waitForCompletion();
        sampleClient.subscribe("deviceInfo",0);
        return sampleClient;
    }

注意到sampleClient.setCallback(pahoResponseCallBack);传入了一个pahoResponseCallBack在这个类中定义回调方法

@Component("pahoDeviceCallBack")
public class pahoDeviceCallBack implements MqttCallback {
    private static Logger logger = LoggerFactory.getLogger(pahoDeviceCallBack.class);
    @Override
    public void disconnected(MqttDisconnectResponse disconnectResponse) {
        logger.info("disconnected................");
    }

    @Override
    public void mqttErrorOccurred(MqttException exception) {
        logger.info("mqttErrorOccurred................");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String reust = new String(message.getPayload(), "utf-8");
        logger.info("messageArrived  处理上报消息: {}",reust);
    }

    @Override
    public void deliveryComplete(IMqttToken token) {
        logger.info("deliveryComplete................");
    }

    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        logger.info("connectComplete................");
    }

    @Override
    public void authPacketArrived(int reasonCode, MqttProperties properties) {
        logger.info("authPacketArrived................");
    }
}

三.测试运行

项目运行,在emqtt界面上看到已经订阅deivceInfo topic image

emqtt提供websocket进行测试

总结,以上为简单的Springboot MQTT操作 paho mqtt 在2020年7月代码更新了mqttv5版本,支持mqtt5协议。