作者 钟来

U渔设备的档位bug修改

正在显示 16 个修改的文件 包含 417 行增加79 行删除
@@ -53,7 +53,7 @@ public class ProtocolParserAndPurificationFactory<T> { @@ -53,7 +53,7 @@ public class ProtocolParserAndPurificationFactory<T> {
53 Topic topic = protocolParserFactory.analysisTopic(topicStr); 53 Topic topic = protocolParserFactory.analysisTopic(topicStr);
54 //通过解析服务器将数据解析成键值对的形式存储到json对象里面,方便清洗 54 //通过解析服务器将数据解析成键值对的形式存储到json对象里面,方便清洗
55 AnalysisResult analysisResult = protocolParserFactory.analysisPayload(topic,payload); 55 AnalysisResult analysisResult = protocolParserFactory.analysisPayload(topic,payload);
56 - if(null == analysisResult.getJsonObject()) 56 + if(null == analysisResult || null == analysisResult.getJsonObject())
57 { 57 {
58 return null; 58 return null;
59 } 59 }
@@ -17,6 +17,7 @@ import com.zhonglai.luhui.device.protocol.factory.dto.DeviceCommand; @@ -17,6 +17,7 @@ import com.zhonglai.luhui.device.protocol.factory.dto.DeviceCommand;
17 import com.zhonglai.luhui.device.protocol.factory.dto.NoticeMessageDto; 17 import com.zhonglai.luhui.device.protocol.factory.dto.NoticeMessageDto;
18 import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto; 18 import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto;
19 import com.zhonglai.luhui.device.protocol.factory.service.IotThingsModelService; 19 import com.zhonglai.luhui.device.protocol.factory.service.IotThingsModelService;
  20 +import com.zhonglai.luhui.device.protocol.factory.service.MqttSubscribeService;
20 import com.zhonglai.luhui.device.protocol.factory.service.PersistenceDBService; 21 import com.zhonglai.luhui.device.protocol.factory.service.PersistenceDBService;
21 import net.jodah.expiringmap.ExpirationListener; 22 import net.jodah.expiringmap.ExpirationListener;
22 import net.jodah.expiringmap.ExpirationPolicy; 23 import net.jodah.expiringmap.ExpirationPolicy;
@@ -34,7 +35,9 @@ import org.springframework.beans.factory.annotation.Autowired; @@ -34,7 +35,9 @@ import org.springframework.beans.factory.annotation.Autowired;
34 import org.springframework.beans.factory.annotation.Value; 35 import org.springframework.beans.factory.annotation.Value;
35 import org.springframework.stereotype.Service; 36 import org.springframework.stereotype.Service;
36 37
  38 +import java.util.Arrays;
37 import java.util.Map; 39 import java.util.Map;
  40 +import java.util.Set;
38 import java.util.concurrent.TimeUnit; 41 import java.util.concurrent.TimeUnit;
39 42
40 /** 43 /**
@@ -65,6 +68,9 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message @@ -65,6 +68,9 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message
65 @Autowired 68 @Autowired
66 private IotThingsModelService iotThingsModelService; 69 private IotThingsModelService iotThingsModelService;
67 70
  71 + @Autowired
  72 + private MqttSubscribeService mqttSubscribeService;
  73 +
68 @Value("${mqtt.client.operationTime}") 74 @Value("${mqtt.client.operationTime}")
69 private long operationTime; //客户端操作时间 75 private long operationTime; //客户端操作时间
70 76
@@ -101,14 +107,14 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message @@ -101,14 +107,14 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message
101 NoticeMessageDto noticeMessageDomain = deviceCommandServiceFactory.read(deviceCommand.getDeviceId(), deviceCommand.getData()); 107 NoticeMessageDto noticeMessageDomain = deviceCommandServiceFactory.read(deviceCommand.getDeviceId(), deviceCommand.getData());
102 if(null == noticeMessageDomain) 108 if(null == noticeMessageDomain)
103 { 109 {
104 - new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备不支持读取功能"); 110 + return new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备不支持读取功能");
105 } 111 }
106 return sendMessage(noticeMessageDomain); 112 return sendMessage(noticeMessageDomain);
107 case write: 113 case write:
108 noticeMessageDomain = deviceCommandServiceFactory.write(deviceCommand.getDeviceId(), deviceCommand.getData()); 114 noticeMessageDomain = deviceCommandServiceFactory.write(deviceCommand.getDeviceId(), deviceCommand.getData());
109 if(null == noticeMessageDomain) 115 if(null == noticeMessageDomain)
110 { 116 {
111 - new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备不支持写入功能"); 117 + return new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备不支持写入功能");
112 } 118 }
113 return sendMessage(noticeMessageDomain); 119 return sendMessage(noticeMessageDomain);
114 case notice: 120 case notice:
@@ -130,6 +136,13 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message @@ -130,6 +136,13 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message
130 case upIotThingsModelTranslate: 136 case upIotThingsModelTranslate:
131 iotThingsModelService.upIotThingsModelTranslate(deviceCommand.getData().get("product_id").getAsInt()); 137 iotThingsModelService.upIotThingsModelTranslate(deviceCommand.getData().get("product_id").getAsInt());
132 return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功"); 138 return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
  139 + case addSubscribe:
  140 + Set<Integer> okProduct = mqttSubscribeService.assignIpAddSubscribe(deviceCommand.getData().get("ip").getAsString(),deviceCommand.getData().get("product_ids").getAsString());
  141 + if (null == okProduct || okProduct.size() ==0)
  142 + {
  143 + return new Message(MessageCode.DEFAULT_FAIL_CODE,"订阅为成功请检查原因");
  144 + }
  145 + return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"订阅成功", Arrays.toString(okProduct.toArray()));
133 default: 146 default:
134 return new Message(MessageCode.DEFAULT_FAIL_CODE,"指令类型不存在,请联系管理员"); 147 return new Message(MessageCode.DEFAULT_FAIL_CODE,"指令类型不存在,请联系管理员");
135 } 148 }
@@ -195,14 +208,14 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message @@ -195,14 +208,14 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message
195 */ 208 */
196 public void replySendMessage(String clientid, ApiClientRePlyDto apiClientRePlyDto) 209 public void replySendMessage(String clientid, ApiClientRePlyDto apiClientRePlyDto)
197 { 210 {
198 - log.info("开始通知{},数据:{}",clientid,apiClientRePlyDto); 211 + log.info("开始通知{},数据:{}",clientid,GsonConstructor.get().toJson(apiClientRePlyDto));
199 //判断有没有需要回复的客户端,如果有就回复 212 //判断有没有需要回复的客户端,如果有就回复
200 ClienConnection clienConnection = getClienConnection(clientid); 213 ClienConnection clienConnection = getClienConnection(clientid);
201 if(null != clienConnection) 214 if(null != clienConnection)
202 { 215 {
203 synchronized(clienConnection) 216 synchronized(clienConnection)
204 { 217 {
205 - log.info("正在通知{},通知结果{}",clientid,apiClientRePlyDto); 218 + log.info("正在通知{},通知结果{}",clientid,GsonConstructor.get().toJson(apiClientRePlyDto));
206 clienConnection.reply(apiClientRePlyDto); 219 clienConnection.reply(apiClientRePlyDto);
207 } 220 }
208 } 221 }
1 package com.zhonglai.luhui.device.protocol.factory.dto; 1 package com.zhonglai.luhui.device.protocol.factory.dto;
2 2
3 public enum CommandType { 3 public enum CommandType {
  4 + /**
  5 + * 读
  6 + */
4 read, 7 read,
  8 + /**
  9 + * 写
  10 + */
5 write, 11 write,
  12 + /**
  13 + * 通知
  14 + */
6 notice, 15 notice,
  16 + /**
  17 + * 清除网关缓存
  18 + */
7 cleanDeviceHost, 19 cleanDeviceHost,
  20 + /**
  21 + * 清除终端缓存
  22 + */
8 cleanDeviceInfo, 23 cleanDeviceInfo,
  24 + /**
  25 + * 更新数据模型
  26 + */
9 upIotThingsModel, 27 upIotThingsModel,
10 - upIotThingsModelTranslate 28 + /**
  29 + * 更新数据翻译模型
  30 + */
  31 + upIotThingsModelTranslate,
  32 + /**
  33 + * 添加订阅
  34 + */
  35 + addSubscribe
11 } 36 }
  1 +package com.zhonglai.luhui.device.protocol.factory.dto;
  2 +
  3 +import lombok.Data;
  4 +
  5 +@Data
  6 +public class ProtocolSubTopics {
  7 + private Integer productid;
  8 + private Integer type;
  9 + private String sub_topics;
  10 + private Integer role_id;
  11 + private String mqtt_username;
  12 +
  13 +}
  1 +package com.zhonglai.luhui.device.protocol.factory.dto;
  2 +
  3 +import java.util.Set;
  4 +
  5 +public class SubTopicsAndproductDto {
  6 + private Set<String> tlist;
  7 + private Set<Integer> plist;
  8 +
  9 + private Set<String> untlist;
  10 + private boolean isUp;
  11 +
  12 + public Set<String> getUntlist() {
  13 + return untlist;
  14 + }
  15 +
  16 + public void setUntlist(Set<String> untlist) {
  17 + this.untlist = untlist;
  18 + }
  19 +
  20 + public Set<String> getTlist() {
  21 + return tlist;
  22 + }
  23 +
  24 + public void setTlist(Set<String> tlist) {
  25 + this.tlist = tlist;
  26 + }
  27 +
  28 + public Set<Integer> getPlist() {
  29 + return plist;
  30 + }
  31 +
  32 + public void setPlist(Set<Integer> plist) {
  33 + this.plist = plist;
  34 + }
  35 +
  36 + public boolean isUp() {
  37 + return isUp;
  38 + }
  39 +
  40 + public void setUp(boolean up) {
  41 + isUp = up;
  42 + }
  43 +}
  1 +package com.zhonglai.luhui.device.protocol.factory.service;
  2 +
  3 +import com.ruoyi.common.utils.StringUtils;
  4 +import com.zhonglai.luhui.device.analysis.comm.config.SysParameter;
  5 +import com.zhonglai.luhui.device.domain.IotProduct;
  6 +import com.zhonglai.luhui.device.domain.IotProtocolClass;
  7 +import com.zhonglai.luhui.device.protocol.factory.dto.ProtocolSubTopics;
  8 +import com.zhonglai.luhui.device.protocol.factory.dto.SubTopicsAndproductDto;
  9 +import com.zhonglai.luhui.device.protocol.factory.service.impl.DefaultDbService;
  10 +import org.eclipse.paho.client.mqttv3.MqttClient;
  11 +import org.eclipse.paho.client.mqttv3.MqttException;
  12 +import org.slf4j.Logger;
  13 +import org.slf4j.LoggerFactory;
  14 +import org.springframework.beans.factory.annotation.Autowired;
  15 +import org.springframework.beans.factory.annotation.Value;
  16 +import org.springframework.stereotype.Service;
  17 +
  18 +import java.util.ArrayList;
  19 +import java.util.HashSet;
  20 +import java.util.List;
  21 +import java.util.Set;
  22 +
  23 +@Service
  24 +public class MqttSubscribeService {
  25 + private final Logger log = LoggerFactory.getLogger(this.getClass());
  26 + private static Set<String> topics = new HashSet<>();
  27 +
  28 + private MqttClient mqttclient;
  29 +
  30 + @Autowired
  31 + private PersistenceDBService persistenceDBService;
  32 +
  33 + @Value("${mqtt.productids:#{null}}")
  34 + private String productids;
  35 +
  36 + public Set<Integer> subscribe(MqttClient mqttclient) throws MqttException
  37 + {
  38 + return subscribe(mqttclient,productids);
  39 + }
  40 +
  41 + public Set<Integer> subscribe(MqttClient mqttclient,String productids) throws MqttException {
  42 + this.mqttclient = mqttclient;
  43 + return addSubscribe(productids);
  44 + }
  45 +
  46 + /**
  47 + * 根据产品id订阅
  48 + * 如果产品id为空,就根据ip订阅
  49 + * @param productids
  50 + * @return
  51 + * @throws MqttException
  52 + */
  53 + public Set<Integer> addSubscribe(String productids) throws MqttException
  54 + {
  55 + if(null == mqttclient || !mqttclient.isConnected())
  56 + {
  57 + return null;
  58 + }
  59 + if (StringUtils.isEmpty(productids))
  60 + {
  61 + List<ProtocolSubTopics> list = persistenceDBService.getProtocolSubTopicsFromIp(SysParameter.service_ip);
  62 + SubTopicsAndproductDto subTopicsAndproductDto = getSubTopicsFromProtocolSubTopics(list);
  63 + return subscribeForProductids(subTopicsAndproductDto);
  64 + }
  65 + List<ProtocolSubTopics> list = persistenceDBService.getProtocolSubTopicsFromProductids(productids,"1".equals(System.getProperty("RunInIDEA"))?false:true); //如果是本地环境就不需要ip的null,因为后面也不会修改ip
  66 +
  67 + SubTopicsAndproductDto subTopicsAndproductDto = getSubTopicsFromProtocolSubTopics(list);
  68 + return subscribeForProductids(subTopicsAndproductDto);
  69 + }
  70 +
  71 + private Set<Integer> subscribeForProductids(SubTopicsAndproductDto subTopicsAndproductDto) throws MqttException {
  72 +
  73 + //如果卸载topic是存在的就先卸载
  74 + Set<String> untlist = subTopicsAndproductDto.getUntlist();
  75 + if(null != untlist && untlist.size()>0)
  76 + {
  77 + String[] sts = new String[untlist.size()];
  78 + int i=0;
  79 + for (String s:untlist)
  80 + {
  81 + sts[i++] = s;
  82 + }
  83 + mqttclient.unsubscribe(sts);
  84 + }
  85 + if(subTopicsAndproductDto.isUp())
  86 + {
  87 + Set<String> ts = subTopicsAndproductDto.getTlist();
  88 + String[] sts = new String[ts.size()];
  89 + int i=0;
  90 + for (String s:ts)
  91 + {
  92 + sts[i++] = s;
  93 + }
  94 + mqttclient.subscribe(sts);
  95 + log.info("-----------topic【{}】订阅成功--------------------",sts);
  96 +
  97 + if (!"1".equals(System.getProperty("RunInIDEA"))) //如果是本地环境不需要跟新服务器地址
  98 + {
  99 + //订阅成功,跟新订阅服务器地址
  100 + persistenceDBService.upSubscribeServiceIp(subTopicsAndproductDto.getPlist(), SysParameter.service_ip);
  101 + }
  102 +
  103 + }
  104 + return subTopicsAndproductDto.getPlist();
  105 + }
  106 +
  107 + /**
  108 + * 指定ip订阅产品id
  109 + * 如果产品已经被订阅就先取消订阅
  110 + * @param ip
  111 + * @param productids
  112 + * @return
  113 + */
  114 + public Set<Integer> assignIpAddSubscribe(String ip,String productids)
  115 + {
  116 + if(SysParameter.service_ip.equals(ip))
  117 + {
  118 + try {
  119 + List<ProtocolSubTopics> list = persistenceDBService.getProtocolSubTopicsFromProductids(productids,false);
  120 + SubTopicsAndproductDto subTopicsAndproductDto = getSubTopicsFromProtocolSubTopicsAndIp(list,ip);
  121 + return subscribeForProductids(subTopicsAndproductDto);
  122 + } catch (MqttException e) {
  123 + return null;
  124 + }
  125 + }
  126 +
  127 + return null;
  128 + }
  129 +
  130 + /**
  131 + * 根据产品订阅信息和ip,生成topic
  132 + * ip用户来判断是否和当前服务器ip一直,如果不一直就卸载topic,一致就订阅topic
  133 + * @param list
  134 + * @return
  135 + */
  136 + private SubTopicsAndproductDto getSubTopicsFromProtocolSubTopicsAndIp(List<ProtocolSubTopics> list,String ip)
  137 + {
  138 + SubTopicsAndproductDto subTopicsAndproductDto= new SubTopicsAndproductDto();
  139 + if(null != list && list.size()!=0)
  140 + {
  141 + Set<String> tlist = new HashSet<>();
  142 + Set<Integer> plist = new HashSet<>();
  143 + Set<String> untlist = new HashSet<>();
  144 + for (ProtocolSubTopics protocolSubTopics:list)
  145 + {
  146 + if (StringUtils.isNotEmpty(protocolSubTopics.getSub_topics()))
  147 + {
  148 + String[] ps = protocolSubTopics.getSub_topics().split(",");
  149 + for (String p:ps)
  150 + {
  151 + String topic = generateTopicFromSub_topics(protocolSubTopics.getRole_id(),protocolSubTopics.getMqtt_username(),p);
  152 + if(topics.contains(topic))
  153 + {
  154 + untlist.add(topic);
  155 + }
  156 + if(SysParameter.service_ip.equals(ip))
  157 + {
  158 + plist.add(protocolSubTopics.getProductid());
  159 + tlist.add(topic);
  160 + topics.add(topic);
  161 + subTopicsAndproductDto.setUp(true);
  162 + }
  163 + }
  164 + }
  165 + }
  166 + subTopicsAndproductDto.setPlist(plist);
  167 + subTopicsAndproductDto.setTlist(tlist);
  168 + subTopicsAndproductDto.setUntlist(untlist);
  169 + }
  170 + return subTopicsAndproductDto;
  171 + }
  172 + /**
  173 + * 根据产品订阅信息,生成topic
  174 + * @param list
  175 + * @return
  176 + */
  177 + private SubTopicsAndproductDto getSubTopicsFromProtocolSubTopics(List<ProtocolSubTopics> list)
  178 + {
  179 + SubTopicsAndproductDto subTopicsAndproductDto= new SubTopicsAndproductDto();
  180 + if(null != list && list.size()!=0)
  181 + {
  182 + Set<String> tlist = new HashSet<>();
  183 + Set<Integer> plist = new HashSet<>();
  184 + for (ProtocolSubTopics protocolSubTopics:list)
  185 + {
  186 + if (StringUtils.isNotEmpty(protocolSubTopics.getSub_topics()))
  187 + {
  188 + String[] ps = protocolSubTopics.getSub_topics().split(",");
  189 + for (String p:ps)
  190 + {
  191 + String topic = generateTopicFromSub_topics(protocolSubTopics.getRole_id(),protocolSubTopics.getMqtt_username(),p);
  192 + if(!topics.contains(topic))
  193 + {
  194 + plist.add(protocolSubTopics.getProductid());
  195 + tlist.add(topic);
  196 + topics.add(topic);
  197 + subTopicsAndproductDto.setUp(true);
  198 + }
  199 +
  200 + }
  201 + }
  202 + }
  203 + subTopicsAndproductDto.setPlist(plist);
  204 + subTopicsAndproductDto.setTlist(tlist);
  205 + }
  206 +
  207 + return subTopicsAndproductDto;
  208 + }
  209 +
  210 + private String generateTopicFromSub_topics(Integer roleid,String mqtt_username,String topicType)
  211 + {
  212 + StringBuffer topicStringBuffer = new StringBuffer("/");
  213 + topicStringBuffer.append(roleid);
  214 + topicStringBuffer.append("/");
  215 + topicStringBuffer.append(mqtt_username);
  216 + topicStringBuffer.append("/");
  217 + topicStringBuffer.append("+");
  218 + topicStringBuffer.append("/");
  219 + topicStringBuffer.append("+");
  220 + topicStringBuffer.append("/");
  221 + topicStringBuffer.append(topicType);
  222 +
  223 + String topic = topicStringBuffer.toString();
  224 + return topic;
  225 + }
  226 +}
1 package com.zhonglai.luhui.device.protocol.factory.service; 1 package com.zhonglai.luhui.device.protocol.factory.service;
2 2
  3 +import cn.hutool.core.bean.BeanUtil;
3 import com.google.gson.JsonObject; 4 import com.google.gson.JsonObject;
4 import com.ruoyi.common.utils.GsonConstructor; 5 import com.ruoyi.common.utils.GsonConstructor;
5 import com.ruoyi.common.utils.StringUtils; 6 import com.ruoyi.common.utils.StringUtils;
@@ -12,14 +13,13 @@ import com.zhonglai.luhui.device.protocol.factory.config.DeviceCach; @@ -12,14 +13,13 @@ import com.zhonglai.luhui.device.protocol.factory.config.DeviceCach;
12 import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto; 13 import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto;
13 import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceInfoDto; 14 import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceInfoDto;
14 import com.zhonglai.luhui.device.protocol.factory.dto.ProtocolPurificationModel; 15 import com.zhonglai.luhui.device.protocol.factory.dto.ProtocolPurificationModel;
  16 +import com.zhonglai.luhui.device.protocol.factory.dto.ProtocolSubTopics;
15 import com.zhonglai.luhui.device.protocol.factory.service.impl.DefaultDbService; 17 import com.zhonglai.luhui.device.protocol.factory.service.impl.DefaultDbService;
16 import com.zhonglai.luhui.device.protocol.factory.sync.ProtocolSyncFactory; 18 import com.zhonglai.luhui.device.protocol.factory.sync.ProtocolSyncFactory;
17 import org.springframework.beans.factory.annotation.Autowired; 19 import org.springframework.beans.factory.annotation.Autowired;
18 import org.springframework.stereotype.Service; 20 import org.springframework.stereotype.Service;
19 21
20 -import java.util.HashMap;  
21 -import java.util.List;  
22 -import java.util.Map; 22 +import java.util.*;
23 23
24 /** 24 /**
25 * 和数据库有个的操作都在这里 25 * 和数据库有个的操作都在这里
@@ -128,6 +128,34 @@ public class PersistenceDBService { @@ -128,6 +128,34 @@ public class PersistenceDBService {
128 return defaultDbService.getIotProductById(id); 128 return defaultDbService.getIotProductById(id);
129 } 129 }
130 130
  131 + /**
  132 + * 获取产品订阅列表
  133 + * @param productids
  134 + * @return
  135 + */
  136 + public List<ProtocolSubTopics> getProtocolSubTopicsFromProductids(String productids,boolean isNotIp)
  137 + {
  138 + List<ProtocolSubTopics> list = defaultDbService.getSubTopicsFromProtocolClassToProductids(productids,isNotIp);
  139 + return list;
  140 + }
  141 +
  142 + /**
  143 + * 同一个ip监听的产品列表
  144 + * @param ip
  145 + * @return
  146 + */
  147 + public List<ProtocolSubTopics> getProtocolSubTopicsFromIp(String ip)
  148 + {
  149 + List<ProtocolSubTopics> list = defaultDbService.getProtocolSubTopicsFromIp(ip);
  150 + return list;
  151 + }
  152 +
  153 +
  154 + public int upSubscribeServiceIp(Set<Integer> ids, String ip) {
  155 + String is = Arrays.toString(ids.toArray());
  156 + return defaultDbService.upSubscribeServiceIp(is,ip);
  157 + }
  158 +
131 public Map<String,IotThingsModel> getIotThingsModelMap(Integer product_id) 159 public Map<String,IotThingsModel> getIotThingsModelMap(Integer product_id)
132 { 160 {
133 List<IotThingsModel> list = defaultDbService.getIotThingsModelList(product_id); 161 List<IotThingsModel> list = defaultDbService.getIotThingsModelList(product_id);
@@ -11,6 +11,7 @@ import com.zhonglai.luhui.device.analysis.comm.dto.TableGenerateSqlEnum; @@ -11,6 +11,7 @@ import com.zhonglai.luhui.device.analysis.comm.dto.TableGenerateSqlEnum;
11 import com.zhonglai.luhui.device.domain.*; 11 import com.zhonglai.luhui.device.domain.*;
12 import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto; 12 import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto;
13 import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceInfoDto; 13 import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceInfoDto;
  14 +import com.zhonglai.luhui.device.protocol.factory.dto.ProtocolSubTopics;
14 import org.springframework.stereotype.Service; 15 import org.springframework.stereotype.Service;
15 16
16 import java.util.ArrayList; 17 import java.util.ArrayList;
@@ -206,4 +207,37 @@ public class DefaultDbService { @@ -206,4 +207,37 @@ public class DefaultDbService {
206 return 0; 207 return 0;
207 } 208 }
208 209
  210 + /**
  211 + * 根据产品id查询订阅信息
  212 + * @param productids
  213 + * @param isNotIp 是否需要subscribe_service_ip为空
  214 + * @return
  215 + */
  216 + public List<ProtocolSubTopics> getSubTopicsFromProtocolClassToProductids(String productids,boolean isNotIp)
  217 + {
  218 + StringBuffer stringBuffer = new StringBuffer("SELECT b.id productid,a.`type`,a.sub_topics,b.`role_id`,b.`mqtt_username` FROM `iot_protocol_class` a LEFT JOIN `iot_product` b ON b.`analysis_clas`=a.`id` WHERE a.`type`=1 "+(isNotIp?"AND b.`subscribe_service_ip` is null":"")+" AND b.`id` IN(");
  219 + stringBuffer.append(productids);
  220 + stringBuffer.append(")");
  221 + return baseDao.findBysql(stringBuffer.toString(), ProtocolSubTopics.class);
  222 + }
  223 +
  224 + public List<ProtocolSubTopics> getProtocolSubTopicsFromIp(String ip)
  225 + {
  226 + StringBuffer stringBuffer = new StringBuffer("SELECT b.id productid,a.`type`,a.sub_topics,b.`role_id`,b.`mqtt_username` FROM `iot_protocol_class` a LEFT JOIN `iot_product` b ON b.`analysis_clas`=a.`id` WHERE a.`type`=1 AND b.`subscribe_service_ip` = ?");
  227 + return baseDao.findBysql(stringBuffer.toString(), ProtocolSubTopics.class,ip);
  228 + }
  229 + /**
  230 + * 跟新订阅服务地址
  231 + * @param ids
  232 + * @param ip
  233 + * @return
  234 + */
  235 + public int upSubscribeServiceIp(String ids,String ip) {
  236 + return baseDao.updateBySql("UPDATE `iot_product` SET subscribe_service_ip=? WHERE id in ("+ids+")",ip);
  237 + }
  238 +
  239 + public String getProductIdsByIp(String ip)
  240 + {
  241 + return baseDao.findBysql("SELECT GROUP_CONCAT(id) ids FROM `iot_product` WHERE subscribe_service_ip = ? GROUP BY subscribe_service_ip",String.class,ip);
  242 + }
209 } 243 }
@@ -12,7 +12,7 @@ import com.zhonglai.luhui.device.protocol.uyu.analysis.topic.CkaDtSndR; @@ -12,7 +12,7 @@ import com.zhonglai.luhui.device.protocol.uyu.analysis.topic.CkaDtSndR;
12 import com.zhonglai.luhui.device.protocol.uyu.analysis.topic.CkaDtSndS; 12 import com.zhonglai.luhui.device.protocol.uyu.analysis.topic.CkaDtSndS;
13 13
14 public class UyuProtocolParserFactoryImpl implements ProtocolParserFactory<byte[]> { 14 public class UyuProtocolParserFactoryImpl implements ProtocolParserFactory<byte[]> {
15 - private static final String topicModel = "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/{{sorclientid}}"; 15 + private static final String topicModel = "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}";
16 @Override 16 @Override
17 public Topic analysisTopic(String topicStr) { 17 public Topic analysisTopic(String topicStr) {
18 Topic topic = TopicUtil.initTopicFromModelStr(topicStr,topicModel); 18 Topic topic = TopicUtil.initTopicFromModelStr(topicStr,topicModel);
1 package com.zhonglai.luhui.device.protocol.uyu.analysis.topic; 1 package com.zhonglai.luhui.device.protocol.uyu.analysis.topic;
2 2
  3 +import cn.hutool.core.map.MapUtil;
  4 +import cn.hutool.json.JSONUtil;
  5 +import com.alibaba.fastjson.JSONObject;
3 import com.google.gson.JsonObject; 6 import com.google.gson.JsonObject;
4 import com.ruoyi.common.constant.GenConstants; 7 import com.ruoyi.common.constant.GenConstants;
5 import com.ruoyi.common.utils.ByteUtil; 8 import com.ruoyi.common.utils.ByteUtil;
@@ -60,7 +63,7 @@ public class CkaDtSndS { @@ -60,7 +63,7 @@ public class CkaDtSndS {
60 JsonObject senserData = new JsonObject(); 63 JsonObject senserData = new JsonObject();
61 jsonObject.add(addres+"",senserData); 64 jsonObject.add(addres+"",senserData);
62 byte b5 = payload[4]; 65 byte b5 = payload[4];
63 - int dangwei = ByteUtil.getBitsValue(b5,0,3); 66 + int dangwei = ByteUtil.getBitsValue(b5,0,3)+1;
64 int kaiguan = ByteUtil.getBitValue(b5,7); 67 int kaiguan = ByteUtil.getBitValue(b5,7);
65 68
66 int zhuanshu = new Long(ByteUtil.bytesToLongDESC(payload,5,2)).intValue(); 69 int zhuanshu = new Long(ByteUtil.bytesToLongDESC(payload,5,2)).intValue();
@@ -93,7 +96,7 @@ public class CkaDtSndS { @@ -93,7 +96,7 @@ public class CkaDtSndS {
93 @Override 96 @Override
94 public void setReplyMessage(Message message) { 97 public void setReplyMessage(Message message) {
95 message.setCode(MessageCode.DEFAULT_SUCCESS_CODE); 98 message.setCode(MessageCode.DEFAULT_SUCCESS_CODE);
96 - message.setData(GsonConstructor.get().fromJson(jsonObject.toString(), HashMap.class)); 99 + message.setData(JSONObject.parseObject(jsonObject.toString(),HashMap.class));
97 } 100 }
98 }); 101 });
99 } 102 }
@@ -56,13 +56,7 @@ public class DeviceCommandListenServiceImpl implements DeviceCommandServiceFacto @@ -56,13 +56,7 @@ public class DeviceCommandListenServiceImpl implements DeviceCommandServiceFacto
56 private Topic getWriteTopic(String deviceId, IotProduct iotProduct) 56 private Topic getWriteTopic(String deviceId, IotProduct iotProduct)
57 { 57 {
58 Topic topic = new Topic(); 58 Topic topic = new Topic();
59 - if (deviceId.indexOf("_")>0)  
60 - {  
61 - topic.setTopicType("CKA-DT-SND_R/"+deviceId.split("_")[1]);  
62 - }else{  
63 - topic.setTopicType("CKA-DT-SND_R/"+deviceId);  
64 - }  
65 - 59 + topic.setTopicType("CKA-DT-SND_R");
66 topic.setClientid(deviceId); 60 topic.setClientid(deviceId);
67 topic.setRoleid(iotProduct.getRole_id()+""); 61 topic.setRoleid(iotProduct.getRole_id()+"");
68 topic.setUsername(iotProduct.getMqtt_username()); 62 topic.setUsername(iotProduct.getMqtt_username());
@@ -31,7 +31,7 @@ public class MqttServiceListenApplication { @@ -31,7 +31,7 @@ public class MqttServiceListenApplication {
31 31
32 public static void main(String[] args) { 32 public static void main(String[] args) {
33 log.info("启动服务"); 33 log.info("启动服务");
34 -// System.setProperty("RunInIDEA",checkRunInIDEA()); 34 + System.setProperty("RunInIDEA",checkRunInIDEA());
35 35
36 SpringApplicationBuilder builder = new SpringApplicationBuilder(MqttServiceListenApplication.class); 36 SpringApplicationBuilder builder = new SpringApplicationBuilder(MqttServiceListenApplication.class);
37 builder.run( args); 37 builder.run( args);
@@ -6,29 +6,32 @@ import com.zhonglai.luhui.device.analysis.util.TopicUtil; @@ -6,29 +6,32 @@ import com.zhonglai.luhui.device.analysis.util.TopicUtil;
6 import com.zhonglai.luhui.device.protocol.factory.comm.DataLogType; 6 import com.zhonglai.luhui.device.protocol.factory.comm.DataLogType;
7 import com.zhonglai.luhui.device.protocol.factory.comm.DeviceDataLog; 7 import com.zhonglai.luhui.device.protocol.factory.comm.DeviceDataLog;
8 import com.zhonglai.luhui.device.protocol.factory.service.BaseCallback; 8 import com.zhonglai.luhui.device.protocol.factory.service.BaseCallback;
  9 +import com.zhonglai.luhui.device.protocol.factory.service.MqttSubscribeService;
9 import lombok.SneakyThrows; 10 import lombok.SneakyThrows;
10 -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;  
11 -import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;  
12 -import org.eclipse.paho.client.mqttv3.MqttException;  
13 -import org.eclipse.paho.client.mqttv3.MqttMessage; 11 +import org.eclipse.paho.client.mqttv3.*;
14 import org.springframework.beans.factory.annotation.Autowired; 12 import org.springframework.beans.factory.annotation.Autowired;
  13 +import org.springframework.beans.factory.annotation.Value;
15 import org.springframework.stereotype.Component; 14 import org.springframework.stereotype.Component;
16 15
  16 +import java.util.List;
  17 +import java.util.Set;
  18 +
17 @Component 19 @Component
18 public class MqttCallback extends BaseCallback implements MqttCallbackExtended { 20 public class MqttCallback extends BaseCallback implements MqttCallbackExtended {
19 21
20 private static final String topicOnlineModel = "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}"; 22 private static final String topicOnlineModel = "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}";
  23 + private MqttClient mqttclient;
21 24
22 @Autowired 25 @Autowired
23 - private TerminalService terminalService; //客户端服务  
24 - 26 + private MqttSubscribeService mqttSubscribeService;
25 27
26 @SneakyThrows 28 @SneakyThrows
27 @Override 29 @Override
28 public void connectComplete(boolean b, String s) { 30 public void connectComplete(boolean b, String s) {
29 // 连接成功 31 // 连接成功
30 log.info("连接成功"); 32 log.info("连接成功");
31 - terminalService.subscribe(); 33 + Set<Integer> topics = mqttSubscribeService.subscribe(mqttclient);
  34 + log.info("-----------产品id{}订阅成功--------------------",topics);
32 } 35 }
33 36
34 @Override 37 @Override
@@ -46,16 +49,7 @@ public class MqttCallback extends BaseCallback implements MqttCallbackExtended { @@ -46,16 +49,7 @@ public class MqttCallback extends BaseCallback implements MqttCallbackExtended {
46 DeviceDataLog.info(baseTopic.getClientid(), DataLogType.发来数据,s,mqttMessage.toString().replaceAll("\\n","").replaceAll("\\s", "")); 49 DeviceDataLog.info(baseTopic.getClientid(), DataLogType.发来数据,s,mqttMessage.toString().replaceAll("\\n","").replaceAll("\\s", ""));
47 50
48 try { 51 try {
49 - //过滤在线消息  
50 -// if (s.endsWith("online"))  
51 -// {  
52 -// String state = new String(mqttMessage.getPayload());  
53 -// deviceCashUpService.upOnline(state);  
54 -// return ;  
55 -// }  
56 -  
57 messageArrived(baseTopic.getClientid(),s,mqttMessage.getPayload()); 52 messageArrived(baseTopic.getClientid(),s,mqttMessage.getPayload());
58 -  
59 }catch (Exception e) 53 }catch (Exception e)
60 { 54 {
61 log.error(s+"解析失败",e); 55 log.error(s+"解析失败",e);
@@ -72,9 +66,9 @@ public class MqttCallback extends BaseCallback implements MqttCallbackExtended { @@ -72,9 +66,9 @@ public class MqttCallback extends BaseCallback implements MqttCallbackExtended {
72 } 66 }
73 } 67 }
74 68
75 - public MqttCallback binldTerminalService(TerminalService terminalService) 69 + public MqttCallback binldTerminalService(MqttClient mqttclient)
76 { 70 {
77 - this.terminalService = terminalService; 71 + this.mqttclient = mqttclient;
78 return this; 72 return this;
79 } 73 }
80 74
1 package com.zhonglai.luhui.mqtt.service.proxy.comm.service; 1 package com.zhonglai.luhui.mqtt.service.proxy.comm.service;
2 2
3 -import com.zhonglai.luhui.device.analysis.comm.factory.Topic;  
4 -import com.zhonglai.luhui.device.analysis.dto.Message;  
5 -import com.zhonglai.luhui.device.analysis.dto.MessageCode;  
6 -import com.zhonglai.luhui.device.analysis.dto.MessageCodeType;  
7 import com.zhonglai.luhui.device.analysis.util.TopicUtil; 3 import com.zhonglai.luhui.device.analysis.util.TopicUtil;
8 import com.zhonglai.luhui.device.protocol.factory.control.ClienNoticeServiceFactory; 4 import com.zhonglai.luhui.device.protocol.factory.control.ClienNoticeServiceFactory;
9 import com.zhonglai.luhui.device.protocol.factory.dto.NoticeMessageDto; 5 import com.zhonglai.luhui.device.protocol.factory.dto.NoticeMessageDto;
@@ -18,7 +14,7 @@ import org.springframework.stereotype.Service; @@ -18,7 +14,7 @@ import org.springframework.stereotype.Service;
18 public class MqttClienNoticeServiceImpl implements ClienNoticeServiceFactory { 14 public class MqttClienNoticeServiceImpl implements ClienNoticeServiceFactory {
19 private static final Logger log = LoggerFactory.getLogger(MqttClienNoticeServiceImpl.class); 15 private static final Logger log = LoggerFactory.getLogger(MqttClienNoticeServiceImpl.class);
20 @Autowired 16 @Autowired
21 - private TerminalService terminalService; 17 + private TerminalServiceImpl terminalServiceImpl;
22 @Override 18 @Override
23 public boolean sendMessage( NoticeMessageDto noticeMessageDomain) { 19 public boolean sendMessage( NoticeMessageDto noticeMessageDomain) {
24 if(null == noticeMessageDomain || null == noticeMessageDomain.getCommd()) 20 if(null == noticeMessageDomain || null == noticeMessageDomain.getCommd())
@@ -28,7 +24,7 @@ public class MqttClienNoticeServiceImpl implements ClienNoticeServiceFactory { @@ -28,7 +24,7 @@ public class MqttClienNoticeServiceImpl implements ClienNoticeServiceFactory {
28 MqttMessage mqttMessage = new MqttMessage(); 24 MqttMessage mqttMessage = new MqttMessage();
29 mqttMessage.setPayload(noticeMessageDomain.getCommd()); 25 mqttMessage.setPayload(noticeMessageDomain.getCommd());
30 try { 26 try {
31 - terminalService.publish(TopicUtil.generateSendMessageTopic(noticeMessageDomain.getTopic(),noticeMessageDomain.getTopicconfig()),mqttMessage); 27 + terminalServiceImpl.publish(TopicUtil.generateSendMessageTopic(noticeMessageDomain.getTopic(),noticeMessageDomain.getTopicconfig()),mqttMessage);
32 return true; 28 return true;
33 } catch (MqttException e) { 29 } catch (MqttException e) {
34 log.error("转发消息异常",e); 30 log.error("转发消息异常",e);
1 package com.zhonglai.luhui.mqtt.service.proxy.comm.service; 1 package com.zhonglai.luhui.mqtt.service.proxy.comm.service;
2 2
3 -import com.ruoyi.common.utils.StringUtils;  
4 import com.zhonglai.luhui.device.analysis.comm.config.SysParameter; 3 import com.zhonglai.luhui.device.analysis.comm.config.SysParameter;
5 import com.zhonglai.luhui.device.analysis.comm.util.ByteUtil; 4 import com.zhonglai.luhui.device.analysis.comm.util.ByteUtil;
6 import com.zhonglai.luhui.device.analysis.util.TopicUtil; 5 import com.zhonglai.luhui.device.analysis.util.TopicUtil;
7 import com.zhonglai.luhui.device.protocol.factory.comm.DataLogType; 6 import com.zhonglai.luhui.device.protocol.factory.comm.DataLogType;
8 import com.zhonglai.luhui.device.protocol.factory.comm.DeviceDataLog; 7 import com.zhonglai.luhui.device.protocol.factory.comm.DeviceDataLog;
9 import com.zhonglai.luhui.device.protocol.factory.plugins.FileChangeListener; 8 import com.zhonglai.luhui.device.protocol.factory.plugins.FileChangeListener;
10 -import com.zhonglai.luhui.device.protocol.factory.service.impl.DefaultDbService;  
11 import org.eclipse.paho.client.mqttv3.MqttClient; 9 import org.eclipse.paho.client.mqttv3.MqttClient;
12 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 10 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
13 import org.eclipse.paho.client.mqttv3.MqttException; 11 import org.eclipse.paho.client.mqttv3.MqttException;
@@ -22,19 +20,14 @@ import org.springframework.stereotype.Service; @@ -22,19 +20,14 @@ import org.springframework.stereotype.Service;
22 import javax.annotation.PostConstruct; 20 import javax.annotation.PostConstruct;
23 import javax.annotation.PreDestroy; 21 import javax.annotation.PreDestroy;
24 import java.io.IOException; 22 import java.io.IOException;
25 -import java.util.ArrayList;  
26 import java.util.List; 23 import java.util.List;
27 -import java.util.concurrent.ExecutorService;  
28 -import java.util.concurrent.LinkedBlockingQueue;  
29 -import java.util.concurrent.ThreadPoolExecutor;  
30 -import java.util.concurrent.TimeUnit;  
31 24
32 /** 25 /**
33 * 终端服务 26 * 终端服务
34 */ 27 */
35 @Service 28 @Service
36 -public class TerminalService {  
37 - private static final Logger log = LoggerFactory.getLogger(TerminalService.class); 29 +public class TerminalServiceImpl {
  30 + private static final Logger log = LoggerFactory.getLogger(TerminalServiceImpl.class);
38 @Autowired 31 @Autowired
39 private MqttCallback mqttCallback; 32 private MqttCallback mqttCallback;
40 33
@@ -52,8 +45,6 @@ public class TerminalService { @@ -52,8 +45,6 @@ public class TerminalService {
52 private String username; 45 private String username;
53 @Value("${mqtt.password}") 46 @Value("${mqtt.password}")
54 private String password; 47 private String password;
55 - @Value("${mqtt.topics}")  
56 - private List<String> topics;  
57 48
58 private MqttClient mqttclient; 49 private MqttClient mqttclient;
59 50
@@ -69,7 +60,7 @@ public class TerminalService { @@ -69,7 +60,7 @@ public class TerminalService {
69 options.setConnectionTimeout(15); 60 options.setConnectionTimeout(15);
70 //设置断开后重新连接 61 //设置断开后重新连接
71 options.setAutomaticReconnect(true); 62 options.setAutomaticReconnect(true);
72 - mqttclient.setCallback(mqttCallback.binldTerminalService(this)); 63 + mqttclient.setCallback(mqttCallback.binldTerminalService(mqttclient));
73 } 64 }
74 65
75 private void connect() throws MqttException { 66 private void connect() throws MqttException {
@@ -78,26 +69,6 @@ public class TerminalService { @@ -78,26 +69,6 @@ public class TerminalService {
78 mqttclient.connect(options); 69 mqttclient.connect(options);
79 } 70 }
80 71
81 - public List<String> subscribe() throws MqttException {  
82 - List<String> ts = getCompletionTopics();  
83 - if(null != ts && ts.size() !=0)  
84 - {  
85 - mqttclient.subscribe(ts.toArray(new String[ts.size()]));  
86 - }  
87 - return ts;  
88 - }  
89 -  
90 - public List<String> getCompletionTopics()  
91 - {  
92 -// if ("1".equals(System.getProperty("RunInIDEA")) && StringUtils.isNotEmpty(textTopic))  
93 -// {  
94 -// List<String> list = new ArrayList<>();  
95 -// list.add(textTopic);  
96 -// return list;  
97 -// }  
98 - return topics;  
99 - }  
100 -  
101 @PostConstruct 72 @PostConstruct
102 public void startMqttListenerService() throws MqttException, IOException { 73 public void startMqttListenerService() throws MqttException, IOException {
103 fileChangeListener.onApplicationEvent(); 74 fileChangeListener.onApplicationEvent();
@@ -107,8 +78,6 @@ public class TerminalService { @@ -107,8 +78,6 @@ public class TerminalService {
107 sysParameter.inittopicconfig(); 78 sysParameter.inittopicconfig();
108 connect(); 79 connect();
109 log.info("-----------mqtt连接服务器成功--------------------"); 80 log.info("-----------mqtt连接服务器成功--------------------");
110 - List<String> topics = subscribe();  
111 - log.info("-----------订阅{}成功--------------------",topics);  
112 81
113 } 82 }
114 83
@@ -28,7 +28,7 @@ mqtt: @@ -28,7 +28,7 @@ mqtt:
28 client: 28 client:
29 #客户端操作时间 29 #客户端操作时间
30 operationTime: 10 30 operationTime: 10
31 - topics: /2/UY_ZYJ_V001/# 31 + productids: 23
32 32
33 #rocketmq配置信息 33 #rocketmq配置信息
34 rocketmq: 34 rocketmq: