本文介绍使用java模拟MQTT客户端连接
1.pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.2</version>
</dependency>
2.Consumer
package com.chenk.mqconsumer;
import com.chenk.mqcommon.util.MyClient;
/**
* @Author chenk
* @create 2021/1/10 17:43
*/
public class Consumer {
public static void main(String[] args) {
for (int i = 0; i < 2000; i++) {
int finalI = i;
new Thread(new Runnable() {
@Override
public void run() {
new MyClient("CKConsumerTest" + finalI, true).consume("CKTopicTest");
}
}).start();
}
}
}
3.MyClient
package com.chenk.mqcommon.util;
import com.chenk.mqcommon.listener.MyIMqttMessageListener;
import com.google.gson.Gson;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @Author chenk
* @create 2020/12/23 22:37
*/
@Data
@Slf4j
public class MyClient {
static Gson GSON = new Gson();
private MqttClient client;
private String HOST = "tcp://127.0.0.1:1883";
public String clientid = "chenk";
private String userName = "admin";
private String passWord = "2188a3b0-c071-4159-922b-ac8ad1ab2f44";
public MyClient(String clientid, boolean retained) {
try {
this.client = retained ? new MqttClient(HOST, clientid, new MemoryPersistence()) : new MqttClient(HOST, clientid);
} catch (MqttException e) {
e.printStackTrace();
}
connect();
}
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(60);
try {
client.setCallback(new PushCallback());
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
}
public void consume(String topic) {
try {
client.subscribeWithResponse(topic, new MyIMqttMessageListener());
} catch (MqttException e) {
e.printStackTrace();
}
}
}
4.PushCallBack
package com.chenk.mqcommon.util;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@Slf4j
public class PushCallback implements MqttCallback {
@Override
public void connectionLost(Throwable e) {
// 连接丢失后,在这里面进行重连
log.error("连接断开,可以做重连");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("deliveryComplete---------" + token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
// subscribe后得到的消息会执行到这里面
log.info("接收消息主题 : " + topic);
log.info("接收消息Qos : " + message.getQos());
log.info("接收消息内容 : " + new String(message.getPayload()));
}
}