使用java订阅mqtt服务器消息代码

使用java订阅mqtt服务器发送的消息。

yuniee
yuniee   Follow

# 使用java订阅mqtt服务器发送的消息。

# 🦟前提

有一个mqtt服务器,可以自行搭建或使用公共mqtt服务器。

使用前请先安装Paho模块

# 👾代码

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class App {
    public staticvoid main(String[] args) {

        String subTopic = "testtopic/#";   //订阅消息的主题
        String pubTopic= "testtopic/1";   //发布消息的主题
        String content ="测试测试";           //发布消息的内容
        int qos = 2;                        //服务等级,0:最多一次;
                                           //        1:至少一次;
                                           //        2:确保只有一次。
        String broker = "tcp://127.0.0.1"; //MQTT服务器地址和端口

        String clientId= "emqx_test";          //客户端ID标志码

        //约束内存持久化
        MemoryPersistencepersistence = new MemoryPersistence();

        try {
            //实例化一个客户端
            MqttClient client = new MqttClient(broker, clientId, persistence);

            //设置MQTT连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            //连接MQTT服务器账户和密码
            connOpts.setUserName("username");
            connOpts.setPassword("password".toCharArray());
            //保留会话
            connOpts.setCleanSession(true);

            //设置回调,收到订阅信息要执行的任务
            client.setCallback(new OnMessageCallback());

            //建立连接
            //System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            //System.out.println("Connected");
           //System.out.println("Publishing message: " + content);

            //订阅某个主题
            client.subscribe(subTopic);

            //消息发布所需参数
            //MqttMessage message = new MqttMessage(content.getBytes());
            //message.setQos(qos);
            //发布消息(主题和消息作为参数)
            //client.publish(pubTopic, message);
            //System.out.println("Messagepublished");

            //这里用一个死循环,保证程序始终运行着,上面的回调函数才能够不断得获取订阅信息,
            //否则程序结束,回调函数也停止监控订阅信息,就收不到订阅信息了。
            while(true){

            }
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }

    }
}


#回调的方法

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttMessage;





//回调类继承MqttCallback类,覆盖下面的三个方法,public class OnMessageCallback implements MqttCallback {



    public void connectionLost(Throwable throwable) {

        // 连接丢失后,一般在这里面进行重连

        System.out.println("连接断开,可以做重连");

    }



    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

        // subscribe后得到的消息会执行到这里面

        System.out.println("接收消息主题:" + s);

        System.out.println("接收消息Qos:" + mqttMessage.getQos());

        System.out.println("接收消息内容:" + new String(mqttMessage.getPayload()));

    }



    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());

    }

}
记录一些基础但有用的玩机指南,然后我想写啥写啥,希望这里有适合你的内容