国产成人精品18p,天天干成人网,无码专区狠狠躁天天躁,美女脱精光隐私扒开免费观看

如何在Spring Boot中使用MQTT

發(fā)布時(shí)間:2021-07-17 21:51 來(lái)源:腳本之家 閱讀:0 作者:Houtaroy 欄目: 編程語(yǔ)言 歡迎投稿:712375056

目錄

      為什么選擇MQTT

      MQTT的定義相信很多人都能講的頭頭是道,本文章也不討論什么高大上的東西,旨在用最簡(jiǎn)單直觀(guān)的方式讓每一位剛接觸的同行們可以最快的應用起來(lái)

      先從使用MQTT需要什么開(kāi)始分析:

      • 消息服務(wù)器
      • 不同應用/設備之間的頻繁交互
      • 可能涉及一對多的消息傳遞

      根據上面列舉的這三點(diǎn),我們大概可以了解到, MQTT最適合的場(chǎng)景是消息做為系統的重要組成部分,且參與著(zhù)系統關(guān)鍵業(yè)務(wù)邏輯的情形

      MQTT, 啟動(dòng)!

      既然決定使用它,我們首先要研究的是如何讓MQTT正常工作,畢竟它不是簡(jiǎn)單的在maven里加入個(gè)依賴(lài)就完事的

      我們總共需要干如下兩件事:

      • 下載消息服務(wù)器, 作為broker
      • 在maven中引入依賴(lài)
      <dependency>  
          <groupId>org.springframework.integration</groupId>  
          <artifactId>spring-integration-mqtt</artifactId>  
          <version>5.3.2.RELEASE</version>  
      </dependency>
      

      完成上面兩步后, 啟動(dòng)EMQX服務(wù)器, 正式進(jìn)入我們的MQTT旅途

      使用方式

      在Spring Boot中使用MQTT的代碼, 筆者總結了如下兩種方式:

      • 使用spring-integration的消息通道概念
      • 使用傳統的Client客戶(hù)端概念

      第一種會(huì )產(chǎn)生一定程度的心智負擔,但在筆者成功搭配(抄襲+造輪子)自動(dòng)注冊后, 比后者要方便許多

      在介紹具體代碼之前, 我們先簡(jiǎn)單整理下使用中最常見(jiàn)的概念:

      • 主題: MQTT消息的主要傳播途徑, 我們向主題發(fā)布消息, 訂閱主題, 從主題中讀取消息并進(jìn)行業(yè)務(wù)邏輯處理, 主題是消息的通道
      • 生產(chǎn)者: MQTT消息的發(fā)送者, 他們向主題發(fā)送消息
      • 消費者: MQTT消息的接收者, 他們訂閱自己需要的主題, 并從中獲取消息
      • broker: 消息轉發(fā)器, 消息是通過(guò)它來(lái)承載的, EMQX就是我們的broker, 在使用中我們不用關(guān)心它的具體實(shí)現

      其實(shí), MQTT的使用流程就是: 生產(chǎn)者給主題發(fā)消息->broker進(jìn)行消息的傳遞->訂閱該主題的消費者拿到消息并進(jìn)行相應的業(yè)務(wù)邏輯

      Client模式

      本模式和傳統的數據鏈接,Redis鏈接基本一致,有開(kāi)發(fā)經(jīng)驗的小伙伴們可以很輕松的駕馭,我們需要考慮的就是如果創(chuàng )建對應的工廠(chǎng),是單例模式,還是原型,亦或是造個(gè)池子呢?

      我們使用單例模式來(lái)進(jìn)行本次的介紹

      創(chuàng )建工廠(chǎng)類(lèi)

      首先, 我們創(chuàng )造一個(gè)工廠(chǎng)(就不承認設計模式中毒)

      public class MqttFactory {  
      
          private static MqttProperties configuration;  
          
          private static MqttClient client;  
       
          /**
          *   獲取客戶(hù)端實(shí)例
          *   單例模式, 存在則返回, 不存在則初始化
          */
          public static MqttClient getInstance() {    
              if (client == null) {      
                  init();    
              }    
              return client;  
          }  
          
          /**
          *   初始化客戶(hù)端
          */
          public static void init() {    
              try {      
                  client = new MqttClient(configuration.getAddress(), "client-" + System.currentTimeMillis());      
                  // MQTT配置對象
                  MqttConnectOptions options = new MqttConnectOptions();      
                  // 設置自動(dòng)重連, 其它具體參數可以查看MqttConnectOptions
                  options.setAutomaticReconnect(true);      
                  if (!client.isConnected()) {        
                  client.connect(options);      
                  }    
              } catch (MqttException e) {      
                  LOGGER.error(String.format("MQTT: 連接消息服務(wù)器[%s]失敗", configuration.getAddress()));    
              }  
          }
          
      }
      

      關(guān)于MQTT的具體配置可以查看, 在這里就不做說(shuō)明了

      多嘴一句, 文檔永遠比某些博客給力!!!

      創(chuàng )建工具類(lèi)

      接下來(lái), 我們創(chuàng )建MqttUtil, 用于消息的發(fā)送以及主題的訂閱

      public class MqttUtil {  
      
          /**
          *   發(fā)送消息
          *   @param topic 主題
          *   @param data 消息內容
          */
          public static void send(String topic, Object data) {    
              // 獲取客戶(hù)端實(shí)例
              MqttClient client = MqttFactory.getInstance();    
              ObjectMapper mapper = new ObjectMapper();    
              try {
                  // 轉換消息為json字符串
                  String json = mapper.writeValueAsString(data);      
                  client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8)));    
              } catch (JsonProcessingException e) {      
                  LOGGER.error(String.format("MQTT: 主題[%s]發(fā)送消息轉換json失敗", topic));    
              } catch (MqttException e) {      
                  LOGGER.error(String.format("MQTT: 主題[%s]發(fā)送消息失敗", topic));    
              }  
          }
          
          /** 
          * 訂閱主題 
          * @param topic 主題 
          * @param listener 消息監聽(tīng)處理器 
          */
          public static void subscribe(String topic, IMqttMessageListener listener) {  
              MqttClient client = MqttFactory.getInstance();  
              try {    
                  client.subscribe(topic, listener);  
              } catch (MqttException e) {    
                  LOGGER.error(String.format("MQTT: 訂閱主題[%s]失敗", topic));  
              }
          }
          
      }
      

      相信小伙伴們注意到了IMqttMessageListener這個(gè)東西, 我們只需要創(chuàng )建一個(gè)監聽(tīng)類(lèi), 實(shí)現IMqttMessageListener接口, 就可以處理消息啦, 代碼如下:

      public class MessageListener implements IMqttMessageListener {  
      
          /** 
          * 處理消息
          * @param topic 主題 
          * @param mqttMessage 消息 
          */
          @Override  
          public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {   
              LOGGER.info(String.format("MQTT: 訂閱主題[%s]發(fā)來(lái)消息[%s]", topic, new String(mqttMessage.getPayload())));  
          }
          
          public static void main(String[] args) {  
              //訂閱主題test01, 使用MessageListener來(lái)處理它的消息
              MqttUtil.subscribe("test01", new MessageListener());
          }
      
      }
      

      無(wú)論是發(fā)送還是訂閱,是不是都很好理解?

      舒服的事情結束后, 帶來(lái)的是無(wú)盡的折磨和空虛, 來(lái)吧, 讓我們挑戰下心智負擔大的第二種模式!

      Spring Integration

      什么是Spring Integration?對不起,我不知道,我也不想知道

      為什么使用Spring Integration?因為它真的很好維護

      網(wǎng)上大部分教程都是針對Spring Integration的, 可能是我第一次接觸, 千篇一律看的我莫名其妙, 所以我選擇放棄了他們, 選擇了大神的自動(dòng)配置方式,并在其基礎上,針對心智負擔進(jìn)行了相應的調整

      還記得我們之前討論過(guò)的概念嗎?主題/生產(chǎn)者/消費者

      在Spring Integration中,我們新加入一些概念, 并把之前的進(jìn)行微調:

      • 通道: 消息傳輸和接受的管道, 每一條消息都是通過(guò)它鉆進(jìn)鉆出
      • 客戶(hù)端工廠(chǎng): 用于創(chuàng )建MQTT客戶(hù)端, 和模式一中的類(lèi)似
      • 消息適配器: 用于接收MQTT消息, 進(jìn)行轉換, 但不參與業(yè)務(wù)邏輯
      • 入站通道: 搭配消息適配器, 消息進(jìn)入站臺的通道
      • 出站通道: 搭配客戶(hù)端工廠(chǎng), 消息發(fā)出站臺的通道
      • 主題: 還是主題, 它不變
      • 生產(chǎn)者: 擁有出站通道的家伙
      • 消費者: 擁有入站通道的家伙

      如果能漸漸理解上面定義的話(huà), 這種模式的流程其實(shí)可以變成這樣:

      • 生產(chǎn)者: 創(chuàng )建指定客戶(hù)端工廠(chǎng)的出站通道->發(fā)送消息
      • 消費者: 創(chuàng )建指定消息適配器的入站通道->接收消息->進(jìn)入消息攔截器->業(yè)務(wù)邏輯

      其實(shí)在筆者看來(lái), 這符合Spring Boot的理念, 約定優(yōu)于配置

      代碼已挪入公司私服, 待后續個(gè)人私服配置好后再補充筆記

      總結

      MQTT作為消息服務(wù), 能夠滿(mǎn)足我們大部分的開(kāi)發(fā)需求, 但還有一些遺留問(wèn)題筆者還沒(méi)進(jìn)行過(guò)深入思考和實(shí)踐:

      • 如何利用qos機制保證數據不會(huì )丟失
      • 消息的隊列和排序
      • 集群模式下的應用

      以上就是如何在Spring Boot中使用MQTT的詳細內容,更多關(guān)于在Spring Boot中使用MQTT的資料請關(guān)注腳本之家其它相關(guān)文章!

      免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。

      最近中文字幕免费MV视频| 风流少妇又紧又爽又丰满| 亚洲第一无码XXXXXX| 99精品国产99久久久久久97| 欧美国产日韩A欧美在线视频| 色欲ΑV一区二区三区天美传媒|