java模拟MQTT客户端连接

本文介绍使用java模拟MQTT客户端连接

1.pom.xml

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.4.2</version>
</dependency>

2.Consumer

package com.chenk.mqconsumer;

import com.chenk.mqcommon.util.MyClient;

/**
 * @Author chenk
 * @create 2021/1/10 17:43
 */
public class Consumer {

    public static void main(String[] args) {
        for (int i = 0; i < 2000; i++) {
            int finalI = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    new MyClient("CKConsumerTest" + finalI, true).consume("CKTopicTest");
                }
            }).start();
        }
    }
}

3.MyClient

package com.chenk.mqcommon.util;

import com.chenk.mqcommon.listener.MyIMqttMessageListener;
import com.google.gson.Gson;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * @Author chenk
 * @create 2020/12/23 22:37
 */
@Data
@Slf4j
public class MyClient {

    static Gson GSON = new Gson();

    private MqttClient client;
    private String HOST = "tcp://127.0.0.1:1883";
    public String clientid = "chenk";
    private String userName = "admin";
    private String passWord = "2188a3b0-c071-4159-922b-ac8ad1ab2f44";

    public MyClient(String clientid, boolean retained) {
        try {
            this.client = retained ? new MqttClient(HOST, clientid, new MemoryPersistence()) : new MqttClient(HOST, clientid);
        } catch (MqttException e) {
            e.printStackTrace();
        }
        connect();
    }

    private void connect() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        options.setConnectionTimeout(10);
        options.setKeepAliveInterval(60);
        try {
            client.setCallback(new PushCallback());
            client.connect(options);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void consume(String topic) {
        try {
            client.subscribeWithResponse(topic, new MyIMqttMessageListener());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

4.PushCallBack

package com.chenk.mqcommon.util;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

@Slf4j
public class PushCallback implements MqttCallback {

    @Override
    public void connectionLost(Throwable e) {
        // 连接丢失后,在这里面进行重连
        log.error("连接断开,可以做重连");
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("deliveryComplete---------" + token.isComplete());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        // subscribe后得到的消息会执行到这里面
        log.info("接收消息主题 : " + topic);
        log.info("接收消息Qos : " + message.getQos());
        log.info("接收消息内容 : " + new String(message.getPayload()));
    }
}

5.启动Consumer的main方法即可

添加新评论