# 使用java订阅mqtt服务器发送的消息。
# 🦟前提
有一个mqtt服务器,可以自行搭建或使用公共mqtt服务器。
使用前请先安装Paho模块
# 👾代码
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class App {
public staticvoid main(String[] args) {
String subTopic = "testtopic/#"; //订阅消息的主题
String pubTopic= "testtopic/1"; //发布消息的主题
String content ="测试测试"; //发布消息的内容
int qos = 2; //服务等级,0:最多一次;
// 1:至少一次;
// 2:确保只有一次。
String broker = "tcp://127.0.0.1"; //MQTT服务器地址和端口
String clientId= "emqx_test"; //客户端ID标志码
//约束内存持久化
MemoryPersistencepersistence = new MemoryPersistence();
try {
//实例化一个客户端
MqttClient client = new MqttClient(broker, clientId, persistence);
//设置MQTT连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
//连接MQTT服务器账户和密码
connOpts.setUserName("username");
connOpts.setPassword("password".toCharArray());
//保留会话
connOpts.setCleanSession(true);
//设置回调,收到订阅信息要执行的任务
client.setCallback(new OnMessageCallback());
//建立连接
//System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
//System.out.println("Connected");
//System.out.println("Publishing message: " + content);
//订阅某个主题
client.subscribe(subTopic);
//消息发布所需参数
//MqttMessage message = new MqttMessage(content.getBytes());
//message.setQos(qos);
//发布消息(主题和消息作为参数)
//client.publish(pubTopic, message);
//System.out.println("Messagepublished");
//这里用一个死循环,保证程序始终运行着,上面的回调函数才能够不断得获取订阅信息,
//否则程序结束,回调函数也停止监控订阅信息,就收不到订阅信息了。
while(true){
}
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
#回调的方法
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
//回调类继承MqttCallback类,覆盖下面的三个方法,public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable throwable) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" + s);
System.out.println("接收消息Qos:" + mqttMessage.getQos());
System.out.println("接收消息内容:" + new String(mqttMessage.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
}