作者 钟来

模块整理

@@ -39,5 +39,22 @@ public class Topic { @@ -39,5 +39,22 @@ public class Topic {
39 this.payloadtype = payloadtype; 39 this.payloadtype = payloadtype;
40 } 40 }
41 41
  42 + public String getSendMessageStrigFromTopic(Object msg)
  43 + {
  44 + switch (this.topicType)
  45 + {
  46 + case "POST_REQ":
  47 + return msg+"";
  48 + case "GET_REQ":
  49 + return "";
  50 + case "PUT_REQ":
  51 + return "";
  52 + case "READ_REQ":
  53 + return "";
  54 + default:
  55 + break;
  56 + }
  57 + return null;
  58 + }
42 59
43 } 60 }
@@ -58,8 +58,7 @@ public class MqttCallback implements MqttCallbackExtended { @@ -58,8 +58,7 @@ public class MqttCallback implements MqttCallbackExtended {
58 @Override 58 @Override
59 public void messageArrived(String s, MqttMessage mqttMessage) { 59 public void messageArrived(String s, MqttMessage mqttMessage) {
60 Topic desttopic = TopicUtil.initTopic(s); 60 Topic desttopic = TopicUtil.initTopic(s);
61 - desttopic.setPayloadtype("POST_REQ");  
62 - 61 + desttopic.setTopicType("POST_REQ");
63 //接收到消息 62 //接收到消息
64 StringBuffer buffer = new StringBuffer(); 63 StringBuffer buffer = new StringBuffer();
65 buffer.append("topic:"); 64 buffer.append("topic:");
@@ -79,7 +78,6 @@ public class MqttCallback implements MqttCallbackExtended { @@ -79,7 +78,6 @@ public class MqttCallback implements MqttCallbackExtended {
79 { 78 {
80 log.error("消息{},topic为空,不做解析"); 79 log.error("消息{},topic为空,不做解析");
81 log.error("消息《"+s+"》解析为空 》》》内容:\r\n"+buffer.toString()); 80 log.error("消息《"+s+"》解析为空 》》》内容:\r\n"+buffer.toString());
82 - terminalService.publish(TopicUtil.generateSendMessageTopic(topic),"0");  
83 return; 81 return;
84 } 82 }
85 83
@@ -92,7 +90,7 @@ public class MqttCallback implements MqttCallbackExtended { @@ -92,7 +90,7 @@ public class MqttCallback implements MqttCallbackExtended {
92 if(null == iotDevice) 90 if(null == iotDevice)
93 { 91 {
94 log.info("设备{}不存在",topic.getClientid()); 92 log.info("设备{}不存在",topic.getClientid());
95 - terminalService.publish(TopicUtil.generateSendMessageTopic(topic),"1"); 93 + terminalService.publish(TopicUtil.generateSendMessageTopic(desttopic),"1");
96 return; 94 return;
97 } 95 }
98 if("ONLINE".equals(topic.getTopicType().toUpperCase())) 96 if("ONLINE".equals(topic.getTopicType().toUpperCase()))
@@ -115,7 +113,7 @@ public class MqttCallback implements MqttCallbackExtended { @@ -115,7 +113,7 @@ public class MqttCallback implements MqttCallbackExtended {
115 } 113 }
116 if(null == dto) 114 if(null == dto)
117 { 115 {
118 - terminalService.publish(TopicUtil.generateSendMessageTopic(topic),"2"); 116 + terminalService.publish(TopicUtil.generateSendMessageTopic(desttopic),"2");
119 return; 117 return;
120 } 118 }
121 log.info("{} 解析到的dto【{}】",dto); 119 log.info("{} 解析到的dto【{}】",dto);
@@ -127,7 +125,7 @@ public class MqttCallback implements MqttCallbackExtended { @@ -127,7 +125,7 @@ public class MqttCallback implements MqttCallbackExtended {
127 dataPersistenceService.persistence(topic,dto); 125 dataPersistenceService.persistence(topic,dto);
128 126
129 terminalService.publish(TopicUtil.generateSendMessageTopic(desttopic),"2"); 127 terminalService.publish(TopicUtil.generateSendMessageTopic(desttopic),"2");
130 - log.info("{}payload解析完成",s); 128 + log.info("{} payload解析完成",s);
131 } catch (Exception e) { 129 } catch (Exception e) {
132 log.error(s+"消息解析异常",e); 130 log.error(s+"消息解析异常",e);
133 try { 131 try {
@@ -149,37 +147,10 @@ public class MqttCallback implements MqttCallbackExtended { @@ -149,37 +147,10 @@ public class MqttCallback implements MqttCallbackExtended {
149 } 147 }
150 } 148 }
151 149
152 -// public static org.apache.log4j.Logger getLoggerByName(String name) {  
153 -// // 生成新的Logger  
154 -// // 如果已經有了一個Logger實例返回現有的  
155 -// org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(name);  
156 -// // 清空Appender。特別是不想使用現存實例時一定要初期化  
157 -// logger.removeAllAppenders();  
158 -// // 設定Logger級別。  
159 -//// logger.setLevel(Level.DEBUG);  
160 -// // 設定是否繼承父Logger。  
161 -// // 默認為true。繼承root輸出。  
162 -// // 設定false後將不輸出root。  
163 -// logger.setAdditivity(true);  
164 -// // 生成新的Appender  
165 -// FileAppender appender = new RollingFileAppender();  
166 -// PatternLayout layout = new PatternLayout();  
167 -// // log的输出形式  
168 -// String conversionPattern = "%d{HH:mm:ss} [%p] -%m%n";  
169 -// layout.setConversionPattern(conversionPattern);  
170 -// appender.setLayout(layout);  
171 -// // log输出路径  
172 -// // 这里使用了环境变量[catalina.home],只有在tomcat环境下才可以取到  
173 -//// String tomcatPath = java.lang.System.getProperty("catalina.home");  
174 -// appender.setFile( "logs/" + name+"_"+ DateUtils.parseDateToStr("yyyyMMdd",new Date()) + ".log");  
175 -// // log的文字码  
176 -// appender.setEncoding("UTF-8");  
177 -// // true:在已存在log文件后面追加 false:新log覆盖以前的log  
178 -// appender.setAppend(true);  
179 -// // 适用当前配置  
180 -// appender.activateOptions();  
181 -// // 将新的Appender加到Logger中  
182 -// logger.addAppender(appender);  
183 -// return logger;  
184 -// } 150 + public MqttCallback binldTerminalService(TerminalService terminalService)
  151 + {
  152 + this.terminalService = terminalService;
  153 + return this;
  154 + }
  155 +
185 } 156 }
@@ -78,7 +78,7 @@ public class TerminalService { @@ -78,7 +78,7 @@ public class TerminalService {
78 options.setConnectionTimeout(15); 78 options.setConnectionTimeout(15);
79 //设置断开后重新连接 79 //设置断开后重新连接
80 options.setAutomaticReconnect(true); 80 options.setAutomaticReconnect(true);
81 - mqttclient.setCallback(mqttCallback); 81 + mqttclient.setCallback(mqttCallback.binldTerminalService(this));
82 } 82 }
83 83
84 private void connect() throws MqttException { 84 private void connect() throws MqttException {
@@ -55,6 +55,7 @@ public class TopicUtil { @@ -55,6 +55,7 @@ public class TopicUtil {
55 */ 55 */
56 public static String generateSendMessageTopic(Topic topic) 56 public static String generateSendMessageTopic(Topic topic)
57 { 57 {
  58 +
58 return "/"+generate(topic,"/"); 59 return "/"+generate(topic,"/");
59 } 60 }
60 61
@@ -42,7 +42,7 @@ public class ClienNoticeService { @@ -42,7 +42,7 @@ public class ClienNoticeService {
42 // CREATED: 只在put和replace方法清零过期时间 42 // CREATED: 只在put和replace方法清零过期时间
43 // ACCESSED: 在CREATED策略基础上增加, 在还没过期时get方法清零过期时间。 43 // ACCESSED: 在CREATED策略基础上增加, 在还没过期时get方法清零过期时间。
44 // 清零过期时间也就是重置过期时间,重新计算过期时间. 44 // 清零过期时间也就是重置过期时间,重新计算过期时间.
45 - private static ExpiringMap<String, ClienConnection> clienConnectionMap = ExpiringMap.builder().maxSize(20000).expiration(5, TimeUnit.SECONDS) 45 + private static ExpiringMap<String, ClienConnection> clienConnectionMap = ExpiringMap.builder().maxSize(20000).expiration(15, TimeUnit.SECONDS)
46 .asyncExpirationListener(new ExpirationListener<String, ClienConnection>() { 46 .asyncExpirationListener(new ExpirationListener<String, ClienConnection>() {
47 @Override 47 @Override
48 public void expired(String s, ClienConnection clienConnection) { 48 public void expired(String s, ClienConnection clienConnection) {
@@ -49,7 +49,7 @@ mqtt: @@ -49,7 +49,7 @@ mqtt:
49 mqtt_usernames: 6_WP,12_BPQ,10_TLJ,NWDB_2023,WLJ_1,YWB_A700E,12_ZNZY 49 mqtt_usernames: 6_WP,12_BPQ,10_TLJ,NWDB_2023,WLJ_1,YWB_A700E,12_ZNZY
50 #订阅的topic 50 #订阅的topic
51 topics: ADD_POST,ALL_POST,DB_TOPIC_DISTRIBUTE,GET/+,online,PUT_REQ/+,READ_REQ/+ 51 topics: ADD_POST,ALL_POST,DB_TOPIC_DISTRIBUTE,GET/+,online,PUT_REQ/+,READ_REQ/+
52 - sub_clientid: '+' 52 + sub_clientid: '866838067732715'
53 topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/{{messageid}}" 53 topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/{{messageid}}"
54 top_return_map: '{"PUT":"PUT_REQ","READ":"READ_REQ"}' 54 top_return_map: '{"PUT":"PUT_REQ","READ":"READ_REQ"}'
55 username: sysuser 55 username: sysuser
@@ -67,5 +67,5 @@ sys: @@ -67,5 +67,5 @@ sys:
67 rocketmq: 67 rocketmq:
68 #nameservice服务器地址(多个以英文逗号隔开) 68 #nameservice服务器地址(多个以英文逗号隔开)
69 name-server: 47.115.144.179:9876 69 name-server: 47.115.144.179:9876
70 - send-topic: lh-mqtt-service-deviceCommand-test1 70 + send-topic: lh-mqtt-service-deviceCommand-test
71 send-tag: 1 71 send-tag: 1