|
|
|
package com.zhonglai.luhui.device.protocol.factory.control;
|
|
|
|
|
|
|
|
import cn.hutool.extra.spring.SpringUtil;
|
|
|
|
import com.google.gson.JsonObject;
|
|
|
|
import com.ruoyi.common.utils.GsonConstructor;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.clien.ClienConnection;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.clien.impl.ClienConnectionImpl;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.config.SysParameter;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.dto.ApiClientRePlyDto;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.dto.TerminalClientRePlyDto;
|
|
|
|
import com.zhonglai.luhui.device.analysis.comm.factory.Topic;
|
|
...
|
...
|
@@ -23,17 +25,27 @@ import com.zhonglai.luhui.device.protocol.factory.service.PersistenceDBService; |
|
|
|
import net.jodah.expiringmap.ExpirationListener;
|
|
|
|
import net.jodah.expiringmap.ExpirationPolicy;
|
|
|
|
import net.jodah.expiringmap.ExpiringMap;
|
|
|
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
|
|
import org.apache.rocketmq.client.exception.MQClientException;
|
|
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
|
|
import org.apache.rocketmq.spring.annotation.MessageModel;
|
|
|
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
|
import org.apache.rocketmq.spring.annotation.SelectorType;
|
|
|
|
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
|
|
|
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
|
|
|
|
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
|
import java.util.Arrays;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Set;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
...
|
...
|
@@ -42,8 +54,8 @@ import java.util.concurrent.TimeUnit; |
|
|
|
* 设备指令监听服务
|
|
|
|
*/
|
|
|
|
@Service
|
|
|
|
@RocketMQMessageListener(consumerGroup = "deviceCommand", topic = "deviceCommandListen",messageModel = MessageModel.BROADCASTING)
|
|
|
|
public class DeviceCommandListenService implements RocketMQReplyListener<MessageExt, Message> {
|
|
|
|
@RocketMQMessageListener(consumerGroup = "deviceCommand", topic = "deviceCommandListen",selectorType = SelectorType.TAG,selectorExpression = "${rocketmq.operationToken}",messageModel = MessageModel.BROADCASTING)
|
|
|
|
public class DeviceCommandListenService implements RocketMQReplyListener<MessageExt, Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(DeviceCommandListenService.class);
|
|
|
|
|
|
|
|
private static ExpiringMap<String, ClienConnection> clienConnectionMap = ExpiringMap.builder().maxSize(20000).expiration(15, TimeUnit.SECONDS)
|
|
...
|
...
|
@@ -63,15 +75,11 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message |
|
|
|
@Autowired
|
|
|
|
private ClienNoticeServiceFactory clienNoticeServiceFactory;
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
private IotThingsModelService iotThingsModelService;
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
private MqttSubscribeService mqttSubscribeService;
|
|
|
|
|
|
|
|
@Value("${mqtt.client.operationTime}")
|
|
|
|
private long operationTime; //客户端操作时间
|
|
|
|
|
|
|
|
private String selectorExpression;
|
|
|
|
private DefaultMQPushConsumer defaultMQPushConsumer;
|
|
|
|
@Override
|
|
|
|
public Message onMessage(MessageExt messageExt) {
|
|
|
|
log.info("监听到消息{}",messageExt);
|
|
...
|
...
|
@@ -81,88 +89,60 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message |
|
|
|
|
|
|
|
DeviceCommand deviceCommand = GsonConstructor.get().fromJson(str, DeviceCommand.class);
|
|
|
|
|
|
|
|
if (deviceCommand.getCommandType().equals(CommandType.read) || deviceCommand.getCommandType().equals(CommandType.write) || deviceCommand.getCommandType().equals(CommandType.notice))
|
|
|
|
ParserDeviceHostDto parserDeviceHostDto = DeviceCach.getDeviceHost(deviceCommand.getDeviceId());
|
|
|
|
if(null == parserDeviceHostDto)
|
|
|
|
{
|
|
|
|
ParserDeviceHostDto parserDeviceHostDto = DeviceCach.getDeviceHost(deviceCommand.getDeviceId());
|
|
|
|
if(null == parserDeviceHostDto)
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"设备连接丢失,请稍后尝试");
|
|
|
|
}
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"设备连接丢失,请稍后尝试");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (clienConnectionMap.containsKey(deviceCommand.getDeviceId()))
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"有其他人在控制设备,请稍后重试!");
|
|
|
|
}
|
|
|
|
if (clienConnectionMap.containsKey(deviceCommand.getDeviceId()))
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"有其他人在控制设备,请稍后重试!");
|
|
|
|
}
|
|
|
|
|
|
|
|
String classname = persistenceDBService.getClassnameFromIotProtocolClassId(parserDeviceHostDto.getIotProduct().getAnalysis_clas());
|
|
|
|
try {
|
|
|
|
ProtocolParserFactory protocolParserFactory = PluginsClassLoader.getJarClass(ProtocolParserFactory.class,classname);
|
|
|
|
String controlCls = classname.replace("analysis."+protocolParserFactory.getClass().getSimpleName(),"control.DeviceCommandListenServiceImpl");
|
|
|
|
DeviceCommandServiceFactory deviceCommandServiceFactory = PluginsClassLoader.getJarClass(DeviceCommandServiceFactory.class,controlCls);
|
|
|
|
if(null != deviceCommandServiceFactory)
|
|
|
|
String classname = persistenceDBService.getClassnameFromIotProtocolClassId(parserDeviceHostDto.getIotProduct().getAnalysis_clas());
|
|
|
|
try {
|
|
|
|
ProtocolParserFactory protocolParserFactory = PluginsClassLoader.getJarClass(ProtocolParserFactory.class,classname);
|
|
|
|
String controlCls = classname.replace("analysis."+protocolParserFactory.getClass().getSimpleName(),"control.DeviceCommandListenServiceImpl");
|
|
|
|
DeviceCommandServiceFactory deviceCommandServiceFactory = PluginsClassLoader.getJarClass(DeviceCommandServiceFactory.class,controlCls);
|
|
|
|
if(null != deviceCommandServiceFactory)
|
|
|
|
{
|
|
|
|
switch (deviceCommand.getCommandType())
|
|
|
|
{
|
|
|
|
switch (deviceCommand.getCommandType())
|
|
|
|
{
|
|
|
|
case read:
|
|
|
|
NoticeMessageDto noticeMessageDomain = deviceCommandServiceFactory.read(deviceCommand.getDeviceId(), deviceCommand.getData());
|
|
|
|
if(null == noticeMessageDomain)
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备不支持读取功能");
|
|
|
|
}
|
|
|
|
return sendMessage(noticeMessageDomain);
|
|
|
|
case write:
|
|
|
|
noticeMessageDomain = deviceCommandServiceFactory.write(deviceCommand.getDeviceId(), deviceCommand.getData());
|
|
|
|
if(null == noticeMessageDomain)
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备不支持写入功能");
|
|
|
|
}
|
|
|
|
return sendMessage(noticeMessageDomain);
|
|
|
|
case notice:
|
|
|
|
if(deviceCommandServiceFactory.notice(deviceCommand.getDeviceId(), deviceCommand.getData()))
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
}else {
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"指令发送失败");
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"指令类型不存在,请联系管理员");
|
|
|
|
}
|
|
|
|
}else {
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备的控制服务异常,请联系管理员");
|
|
|
|
case read:
|
|
|
|
NoticeMessageDto noticeMessageDomain = deviceCommandServiceFactory.read(deviceCommand.getDeviceId(), deviceCommand.getData());
|
|
|
|
if(null == noticeMessageDomain)
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备不支持读取功能");
|
|
|
|
}
|
|
|
|
return sendMessage(noticeMessageDomain);
|
|
|
|
case write:
|
|
|
|
noticeMessageDomain = deviceCommandServiceFactory.write(deviceCommand.getDeviceId(), deviceCommand.getData());
|
|
|
|
if(null == noticeMessageDomain)
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备不支持写入功能");
|
|
|
|
}
|
|
|
|
return sendMessage(noticeMessageDomain);
|
|
|
|
case notice:
|
|
|
|
if(deviceCommandServiceFactory.notice(deviceCommand.getDeviceId(), deviceCommand.getData()))
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
}else {
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"指令发送失败");
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"指令类型不存在,请联系管理员");
|
|
|
|
}
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
log.error("消息发送失败",e);
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"等待通知错误");
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
switch (deviceCommand.getCommandType())
|
|
|
|
{
|
|
|
|
case cleanDeviceHost:
|
|
|
|
DeviceCach.cleanDeviceHost(deviceCommand.getDeviceId());
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
case cleanDeviceInfo:
|
|
|
|
DeviceCach.cleanDeviceInfo(deviceCommand.getDeviceId()+"_"+deviceCommand.getData().get("sensor_number").getAsString());
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
case upIotThingsModel:
|
|
|
|
iotThingsModelService.upIotThingsModel(deviceCommand.getData().get("product_id").getAsInt());
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
case upIotThingsModelTranslate:
|
|
|
|
iotThingsModelService.upIotThingsModelTranslate(deviceCommand.getData().get("product_id").getAsInt());
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"指令发送成功");
|
|
|
|
case addSubscribe:
|
|
|
|
Set<Integer> okProduct = mqttSubscribeService.assignIpAddSubscribe(deviceCommand.getData().get("ip").getAsString(),deviceCommand.getData().get("product_ids").getAsString());
|
|
|
|
if (null == okProduct || okProduct.size() ==0)
|
|
|
|
{
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"订阅未成功请检查原因");
|
|
|
|
}
|
|
|
|
return new Message(MessageCode.DEFAULT_SUCCESS_CODE,"订阅成功", Arrays.toString(okProduct.toArray()));
|
|
|
|
default:
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"指令类型不存在,请联系管理员");
|
|
|
|
}else {
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"该设备的控制服务异常,请联系管理员");
|
|
|
|
}
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
log.error("消息发送失败",e);
|
|
|
|
return new Message(MessageCode.DEFAULT_FAIL_CODE,"等待通知错误");
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public Message sendMessage( NoticeMessageDto noticeMessageDomain) throws InterruptedException {
|
|
|
|
|
|
|
|
Topic topic = noticeMessageDomain.getTopic();
|
|
...
|
...
|
@@ -196,19 +176,6 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message |
|
|
|
return clienConnectionMap.containsKey(clientid);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void log(Topic topic,JsonObject jsonObject)
|
|
|
|
{
|
|
|
|
// AddPostDto addPostDto = new AddPostDto();
|
|
|
|
// addPostDto.setData(JSON.parseObject(jsonObject.toString()));
|
|
|
|
// addPostDto.setIotTerminalList(new ArrayList<>());
|
|
|
|
// addPostDto.setDeviceSensorDataList(new ArrayList<>());
|
|
|
|
// addPostDto.setLogDeviceOperationList(new ArrayList<>());
|
|
|
|
// businessDataUpdateService.updataDta(BusinessDataUpdateService.Type.ADD,topic,addPostDto,"远程控制",true);
|
|
|
|
// cacheServiceImpl.updateCache(topic,addPostDto);
|
|
|
|
// addPostDto.setLogDeviceOperationList(logDeviceOperationList);
|
|
|
|
// dataPersistenceService.persistence(topic,addPostDto);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 通知给api操作端
|
|
|
|
* @param clientid
|
|
...
|
...
|
@@ -230,4 +197,29 @@ public class DeviceCommandListenService implements RocketMQReplyListener<Message |
|
|
|
log.info("结束通知{}",clientid);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void prepareStart(DefaultMQPushConsumer consumer) {
|
|
|
|
//设置当前实例的名称
|
|
|
|
consumer.setInstanceName(this.getClass().getSimpleName());
|
|
|
|
this.defaultMQPushConsumer = consumer;
|
|
|
|
}
|
|
|
|
|
|
|
|
public String getSelectorExpression()
|
|
|
|
{
|
|
|
|
if(null == selectorExpression)
|
|
|
|
{
|
|
|
|
String[] beannames = SpringUtil.getBeanNamesForType(DefaultRocketMQListenerContainer.class);
|
|
|
|
if(null != beannames)
|
|
|
|
{
|
|
|
|
for (String beanname:beannames)
|
|
|
|
{
|
|
|
|
DefaultRocketMQListenerContainer container = SpringUtil.getBean(beanname);
|
|
|
|
if (container.getConsumer().equals(this.defaultMQPushConsumer)) {
|
|
|
|
selectorExpression = container.getSelectorExpression();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return selectorExpression;
|
|
|
|
}
|
|
|
|
} |
...
|
...
|
|