教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

Paho是什么?Paho實(shí)現(xiàn)消息收發(fā)的操作流程

更新時間:2023年10月13日15時28分 來源:傳智教育 瀏覽次數(shù):

Paho Java客戶端是用Java編寫的MQTT客戶端庫,用于開發(fā)在JVM或其他Java兼容平臺(例如Android)上運(yùn)行的應(yīng)用程序。

Paho不僅可以對接EMQ X Broker,還可以對接滿足符合MQTT協(xié)議規(guī)范的消息代理服務(wù)端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1協(xié)議版本基本能滿足百分之九十多的接入場景。

Paho Java客戶端提供了兩個API:

1:MqttAsyncClient提供了一個完全異步的API,其中活動的完成是通過注冊的回調(diào)通知的。

2:MqttClient是MqttAsyncClient周圍的同步包裝器,在這里,功能似乎與應(yīng)用程序同步。

Paho實(shí)現(xiàn)消息收發(fā)

(1)找到項(xiàng)目:emq-demo,添加坐標(biāo)依賴

<dependency>
     <groupId>org.eclipse.paho</groupId>
     <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
     <version>1.2.2</version>
</dependency>

(2)編寫客戶端封裝類的代碼:com.itheima.mqtt.client.EmqClient

/**
* Created by 傳智播客*黑馬程序員.
*/
@Component
public class EmqClient {

    private Logger log = LoggerFactory.getLogger(EmqClient.class);
    
  private IMqttClient mqttClient;
  
  @Autowired
  private MqttProperties mqttProperties;
  
  @Autowired
  private MqttCallback mqttCallback;
  
  @PostConstruct
  private void init(){
      //MqttClientPersistence是接口 實(shí)現(xiàn)類有:MqttDefaultFilePersistence;MemoryPersistence
      MqttClientPersistence memoryPersistence = new MemoryPersistence();
      try {
          mqttClient = new
MqttClient(mqttProperties.getBrokerUrl(),mqttProperties.getClientId(),memoryPersistence);
      } catch (MqttException e) {
           log.error("MqttClient初始化失敗,brokerurl={},clientId=
{}",mqttProperties.getBrokerUrl(),mqttProperties.getClientId());
      }
}

   /**
    * 連接broker
    * @param username
    * @param password
    */
public void connect(String username,String password){
    //創(chuàng)建MQTT連接選項(xiàng)對象--可配置mqtt連接相關(guān)選項(xiàng)
    MqttConnectOptions connectOptions = new MqttConnectOptions();
    
    //自動重連
    connectOptions.setAutomaticReconnect(true);
    /**
     * 設(shè)置為true后意味著:客戶端斷開連接后emq不保留會話保留會話,否則會產(chǎn)生訂閱共享隊(duì)列的存活
客戶端收不到消息的情況
     * 因?yàn)閿嚅_的連接還被保留的話,emq會將隊(duì)列中的消息負(fù)載到斷開但還保留的客戶端,導(dǎo)致存活的客戶
端收不到消息
     * 解決該問題有兩種方案:1.連接斷開后不要保持;2.保證每個客戶端有固定的clientId
     */
    connectOptions.setCleanSession(true);
    connectOptions.setUserName(username);
    connectOptions.setPassword(password.toCharArray());
    //設(shè)置mqtt消息回調(diào)
    mqttClient.setCallback(mqttCallback);
    //連接broker
    try {
         mqttClient.connect(connectOptions);
    } catch (MqttException e) {
        log.error("連接mqtt broker失敗,失敗原因:{}",e.getMessage());
    }
    
  }

   /**
    * 發(fā)布
    * @param topic
    * @param msg
    */
   public void publish(String topic, String msg, QosEnum qos, boolean retain){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos.value());
        mqttMessage.setRetained(retain);
        mqttMessage.setPayload(msg.getBytes());
        if(mqttClient.isConnected()){
                try {
                    mqttClient.publish(topic,mqttMessage);
                } catch (MqttException e) {
                    log.error("mqtt消息發(fā)布失敗,topic={},msg={},qos={},retain={},errormsg=
{}",topic,msg,qos,retain,e.getMessage());
                }
        }
   }
   
   /**
    * 訂閱
    * @param topicFilter
    * @return
    */
   public void subscribe(String topicFilter,QosEnum qos){
      try {
          mqttClient.subscribe(topicFilter,qos.value());
      } catch (MqttException e) {
      
          log.error("訂閱失敗,topicfilter={},qos={},errormsg=
{}",topicFilter,qos,e.getMessage());
        }
    }
    
    /**
     * 斷開連接
     */
    @PreDestroy
    public void disConnect(){
        try {
             mqttClient.disconnect();
        } catch (MqttException e) {
             log.error("斷開連接出現(xiàn)異常,errormsg={}",e.getMessage());
        }
    }
}

需要在application.yml中添加自定義的配置:

mqtt:
broker-url: tcp://192.168.200.129:1883
client-id: demo-client
username: user
password: 123456

同時需要創(chuàng)建屬性配置類來加載該配置數(shù)據(jù),創(chuàng)建:com.itheima.mqtt.properties.MqttProperties

/**
 * Created by 傳智播客*黑馬程序員.
 */
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {

    private String brokerUrl;

    private String clientId;

    private String username;

    private String password;


    public String getBrokerUrl() {
        return brokerUrl;
    }

    public void setBrokerUrl(String brokerUrl) {
         this.brokerUrl = brokerUrl;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "MqttProperties{" +
                "brokerUrl='" + brokerUrl + '\'' +
                ", clientId='" + clientId + '\'' +
                ", username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
     }
}

還需創(chuàng)建QoS服務(wù)之類枚舉:com.itheima.mqtt.enums.QosEnum

/**
 * Created by 傳智播客*黑馬程序員.
 */
public enum QosEnum {

   QoS0(0),QoS1(1),QoS2(2);

   QosEnum(int qos) {

       this.value = qos;
   }

   private final int value;


   public int value(){
   return this.value;
 }
}

(3)在連接接收到消息之后,我們需要將消息傳入消息回調(diào):com.itheima.mqtt.client.MessageCallback

/**
 * Created by 傳智播客*黑馬程序員.
 */
@Component
public class MessageCallback implements MqttCallback {
    private Logger log = LoggerFactory.getLogger(MessageCallback.class);
    @Override
    public void connectionLost(Throwable cause) {
        //丟失對服務(wù)端的連接后觸發(fā)該方法回調(diào),此處可以做一些特殊處理,比如重連
        log.info("丟失了對broker的連接");
    }
    /**
    * 訂閱到消息后的回調(diào)
    * 該方法由mqtt客戶端同步調(diào)用,在此方法未正確返回之前,不會發(fā)送ack確認(rèn)消息到broker
    * 一旦該方法向外拋出了異常客戶端將異常關(guān)閉,當(dāng)再次連接時;所有QoS1,QoS2且客戶端未進(jìn)行ack確認(rèn)的
    消息都將由
    * broker服務(wù)器再次發(fā)送到客戶端
    * @param topic
    * @param message
    * @throws Exception
    */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("訂閱到了消息;topic={},messageid={},qos={},msg={}", topic, message.getId(), message.getQos(), new String(message.getPayload()));
    }
    /**
     * 消息發(fā)布完成且收到ack確認(rèn)后的回調(diào)
     * QoS0:消息被網(wǎng)絡(luò)發(fā)出后觸發(fā)一次
     * QoS1:當(dāng)收到broker的PUBACK消息后觸發(fā)
     * QoS2:當(dāng)收到broer的PUBCOMP消息后觸發(fā)
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        log.info("消息發(fā)送完成,messageId={},topics={}", messageId, topics);
    }
}

(4)編寫消息發(fā)布和訂閱的測試,在啟動類中添加如下代碼。

@Autowired
private EmqClient emqClient;

@Autowired
private MqttProperties mqttProperties;

@PostConstruct
public void init(){
     emqClient.connect(mqttProperties.getUsername(),mqttProperties.getPassword());
     //訂閱某一主題
     emqClient.subscribe("testtopic/#", QosEnum.QoS2);
     //開啟一個新的線程向該主題發(fā)送消息
     new Thread(()->{
         while (true){
          emqClient.publish("testtopic/123","mqtt msg:"+
LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME),QosEnum.QoS2,false);
          try {
            TimeUnit.SECONDS.sleep(5);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }).start();
}

(5)測試:在Dashboard中開啟使用username進(jìn)行認(rèn)證的組件,其他組件停止即可,然后啟動項(xiàng)目,查看控制臺輸出即可。

0 分享到:
和我們在線交談!