乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      springboot如何集成mqtt消息推送

       賈朋亮博客 2019-07-19
      1.需求分析
          近期筆者項目需要用到mqtt實現(xiàn)消息推送,筆者選擇emq作為mqtt服務(wù)器載體,上篇筆者講解了如何在linux中安裝mqtt服務(wù),安裝鏈接:https://blog.csdn.net/zhangxing52077/article/details/80567270,接下來筆者將講解如何在springboot中集成mqtt

      2.實現(xiàn)方案
      ①pom依賴

      <!--mqtt-->
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-integration</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.integration</groupId>
         <artifactId>spring-integration-stream</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.integration</groupId>
         <artifactId>spring-integration-mqtt</artifactId>
      </dependency>
      ②yml中配置mqtt(自定義配置)

      #mq配置
      com:
        mqtt:
          host: tcp://ip:1883
          clientid: mqttjs_e8022a4d0b
          topic: good,test,yes
          username: zhangxing
          password: zxp52077
          timeout: 10
          keepalive: 20
      ③創(chuàng)建mqtt消息屬性配置類

      @Component
      @ConfigurationProperties(prefix = "com.mqtt")
      @Setter
      @Getter
      public class MqttConfiguration {

          private String host;

          private String clientid;

          private String topic;

          private String username;

          private String password;

          private int timeout;

          private int keepalive;

      }
      ④創(chuàng)建mqtt消息推送實體

      @Slf4j
      @Setter
      @Getter
      public class PushPayload {
          //推送類型
          private String type;
          //推送對象
          private String mobile;
          //標題
          private String title;
          //內(nèi)容
          private String content;
          //數(shù)量
          private Integer badge = 1;
          //鈴聲
          private String sound = "default";


          public PushPayload(String type, String mobile, String title, String content, Integer badge , String sound){
              this.type = type;
              this.mobile = mobile;
              this.title = title;
              this.content = content;
              this.badge = badge;
              this.sound = sound;
          }

          public static class Builder{
              //推送類型
              private String type;
              //推送對象
              private String mobile;
              //標題
              private String title;
              //內(nèi)容
              private String content;
              //數(shù)量
              private Integer badge = 1;
              //鈴聲
              private String sound = "default";

              public Builder setType(String type) {
                  this.type = type;
                  return this;
              }

              public Builder setMobile(String mobile) {
                  this.mobile = mobile;
                  return this;
              }

              public Builder setTitle(String title) {
                  this.title = title;
                  return this;
              }

              public Builder setContent(String content) {
                  this.content = content;
                  return this;
              }

              public Builder setBadge(Integer badge) {
                  this.badge = badge;
                  return this;
              }

              public Builder setSound(String sound) {
                  this.sound = sound;
                  return this;
              }

              public PushPayload bulid(){
                 return new PushPayload(type,mobile,title,content,badge,sound);
              }
          }


          public static Builder getPushPayloadBuider(){
              return new Builder();
          }


          @Override
          public String toString() {
              return JSON.toJSONString(this, SerializerFeature.DisableCircularReferenceDetect);
          }

        
      }
      ⑤創(chuàng)建mqtt消息推送或訂閱客戶端

      @Slf4j
      public class MqttPushClient {

          private MqttClient client;

          private static volatile MqttPushClient mqttPushClient = null;

          public static MqttPushClient getInstance(){

              if(null == mqttPushClient){
                  synchronized (MqttPushClient.class){
                      if(null == mqttPushClient){
                          mqttPushClient = new MqttPushClient();
                      }
                  }

              }
              return mqttPushClient;

          }

          private MqttPushClient() {
              connect();
          }

          private void connect(){
              try {
                  client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENTID, new MemoryPersistence());
                  MqttConnectOptions options = new MqttConnectOptions();
                  options.setCleanSession(false);
                  options.setUserName(PropertiesUtil.MQTT_USER_NAME);
                  options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
                  options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
                  options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
                  try {
                      client.setCallback(new PushCallback());
                      client.connect(options);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }

          /**
           * 發(fā)布,默認qos為0,非持久化
           * @param topic
           * @param pushMessage
           */
          public void publish(String topic,PushPayload pushMessage){
              publish(0, false, topic, pushMessage);
          }

          /**
           * 發(fā)布
           * @param qos
           * @param retained
           * @param topic
           * @param pushMessage
           */
          public void publish(int qos,boolean retained,String topic,PushPayload pushMessage){
              MqttMessage message = new MqttMessage();
              message.setQos(qos);
              message.setRetained(retained);
              message.setPayload(pushMessage.toString().getBytes());
              MqttTopic mTopic = client.getTopic(topic);
              if(null == mTopic){
                  log.error("topic not exist");
              }
              MqttDeliveryToken token;
              try {
                  token = mTopic.publish(message);
                  token.waitForCompletion();
              } catch (MqttPersistenceException e) {
                  e.printStackTrace();
              } catch (MqttException e) {
                  e.printStackTrace();
              }
          }

          /**
           * 訂閱某個主題,qos默認為0
           * @param topic
           */
          public void subscribe(String topic){
              subscribe(topic,0);
          }

          /**
           * 訂閱某個主題
           * @param topic
           * @param qos
           */
          public void subscribe(String topic,int qos){
              try {
                  client.subscribe(topic, qos);
              } catch (MqttException e) {
                  e.printStackTrace();
              }
          }


          public static void main(String[] args) throws Exception {
              String kdTopic = "good";
              PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("15345715326")
                       .setContent("designModel")
                      .bulid();
              MqttPushClient.getInstance().publish(0, false, kdTopic, pushMessage);
          }
      }
      ⑥配置獲取類的編寫

      public class PropertiesUtil {

         public static String MQTT_HOST;
         public static String MQTT_CLIENTID;
         public static String MQTT_USER_NAME;
         public static String MQTT_PASSWORD;
         public static int MQTT_TIMEOUT;
         public static int MQTT_KEEP_ALIVE;


         public static final String ELASTIC_SEARCH_HOST;

         public static final int ELASTIC_SEARCH_PORT;

         public static final String ELASTIC_SEARCH_CLUSTER_NAME;

         static {
            MQTT_HOST = loadMqttProperties().getProperty("MQTT_HOST");
            MQTT_CLIENTID = loadMqttProperties().getProperty("MQTT_CLIENTID");
            MQTT_USER_NAME = loadMqttProperties().getProperty("MQTT_USER_NAME");
            MQTT_PASSWORD = loadMqttProperties().getProperty("MQTT_PASSWORD");
            MQTT_TIMEOUT = Integer.valueOf(loadMqttProperties().getProperty("MQTT_TIMEOUT"));
            MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttProperties().getProperty("MQTT_KEEP_ALIVE"));

         }

         static {
            ELASTIC_SEARCH_HOST = loadEsProperties().getProperty("ES_HOST");
            ELASTIC_SEARCH_PORT = Integer.valueOf(loadEsProperties().getProperty("ES_PORT"));
            ELASTIC_SEARCH_CLUSTER_NAME = loadEsProperties().getProperty("ES_CLUSTER_NAME");
         }

         private static Properties loadMqttProperties() {
            InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/mqtt.yml");
            Properties properties = new Properties();
            try {
               properties.load(inputstream);
               return properties;
            } catch (IOException e) {
               throw new RuntimeException(e);
            } finally {
               try {
                  if (inputstream != null) {
                     inputstream.close();
                  }
               } catch (IOException e) {
                  throw new RuntimeException(e);
               }
            }
         }

         private static Properties loadEsProperties() {
            InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/elasticsearch.properties");
            Properties properties = new Properties();
            try {
               properties.load(inputstream);
               return properties;
            } catch (IOException e) {
               throw new RuntimeException(e);
            } finally {
               try {
                  if (inputstream != null) {
                     inputstream.close();
                  }
               } catch (IOException e) {
                  throw new RuntimeException(e);
               }
            }
         }


      }
      ⑦mqtt推送回調(diào)類

      /**
       * @auther zx
       * @date 2018/5/28 9:20
       */
      public class PushCallback implements MqttCallback {

          public void connectionLost(Throwable cause) {
              // 連接丟失后,一般在這里面進行重連
              System.out.println("連接斷開,可以做重連");
          }

          public void deliveryComplete(IMqttDeliveryToken token) {
              System.out.println("deliveryComplete---------" + token.isComplete());
          }

          public void messageArrived(String topic, MqttMessage message) throws Exception {
              // subscribe后得到的消息會執(zhí)行到這里面
              System.out.println("接收消息主題 : " + topic);
              System.out.println("接收消息Qos : " + message.getQos());
              System.out.println("接收消息內(nèi)容 : " + new String(message.getPayload()));
          }
      }
      3.效果測試
      @Test
      public void test() {

         PushPayload pushPayload = PushPayload.getPushPayloadBuider().setContent("test")
               .setMobile("119")
               .setType("2018")
               .bulid();
         mqttClientComponent.push("yes",pushPayload);

      }
      mqtt客戶端效果顯示

        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多