作者 钟来

数据服务修改成插件模式,支持http和mqll协议插件

正在显示 56 个修改的文件 包含 1610 行增加239 行删除
... ... @@ -106,7 +106,7 @@ public class ByteUtil {
}
public static void main(String[] args) {
System.out.println(ByteUtil.toHexString( ByteUtil.intToBytesASC(2011239256,4)));
System.out.println(ByteUtil.bytesToLongDESC(new byte[]{(byte)0x09,(byte) 0xF7},0,2));
}
/**
... ... @@ -216,4 +216,57 @@ public class ByteUtil {
return value;
}
/**
* 获取字节里面指定字节位的值
* @param b 字节
* @param position 字节位编号(从0开始计数)
* @return
*/
public static int getBitValue(byte b, int position) {
// 检查位置是否有效
if (position < 0 || position >= 8) {
throw new IllegalArgumentException("Position must be between 0 and 7 (inclusive).");
}
// 将1左移position位,以便创建一个只有指定位为1的掩码
int mask = 1 << position;
// 将字节与掩码进行位与运算,如果指定位为1,则结果不为0
int result = b & mask;
// 如果结果为0,说明指定位是0;否则,指定位是1
return result != 0 ? 1 : 0;
}
/**
* 获取字节里面指定多个字节位的值
* @param b 字节
* @param startBit 开始位的位置(从0开始计数)
* @param numberOfBits 需要获取的位的数量
* @return
*/
public static int getBitsValue(byte b, int startBit, int numberOfBits) {
// 检查参数的有效性
if (startBit < 0 || startBit >= 8 || numberOfBits <= 0 || startBit + numberOfBits > 8) {
throw new IllegalArgumentException("Invalid bit range.");
}
// 将字节转换为无符号整数,以便进行位操作
int unsignedByte = b & 0xFF;
// 计算掩码,用于提取指定位
int mask = (1 << numberOfBits) - 1;
// 将掩码左移,以便对齐到正确的位位置
mask <<= startBit;
// 应用掩码,提取指定位
int bits = unsignedByte & mask;
// 右移结果,以便将提取的位放在最低有效位上
bits >>= startBit;
return bits;
}
}
... ...
... ... @@ -6,6 +6,8 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.support.ResourceEditorRegistrar;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
... ... @@ -143,4 +145,33 @@ public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationC
final String[] activeProfiles = getActiveProfiles();
return StringUtils.isNotEmpty(activeProfiles) ? activeProfiles[0] : null;
}
public static void registerBean(String name, Object beanInstance) {
String beanName = name==null?beanInstance.getClass().getSimpleName():name;
if (containsBean(beanName))
{
return;
}
beanFactory.registerSingleton(beanName, beanInstance);
}
/**
* 删除指定名称的bean
*
* @param name
* @return boolean
*/
public static boolean deleteBean(String name)
{
if (containsBean(name))
{
beanFactory.destroyBean(name, getBean(name));
return true;
}
return false;
}
public static boolean isSpringBean()
{
return null != beanFactory;
}
}
... ...
... ... @@ -42,7 +42,7 @@ public class EnumModelOutput extends ThingsModelItemBase<String>
{
return null;
}
return getValue()+"";
return getValue().toString();
}
... ...
... ... @@ -67,24 +67,24 @@ public class IotProduct implements Serializable
private String sync_db;
@ApiModelProperty("解析服务")
private String analysis_clas; // varchar(100) DEFAULT 'com.zhonglai.luhui.device.protocol.factory.analysis.DefaultProtocolParserFactoryImpl' COMMENT '解析服务',
private Integer analysis_clas; // varchar(100) DEFAULT 'com.zhonglai.luhui.device.protocol.factory.analysis.DefaultProtocolParserFactoryImpl' COMMENT '解析服务',
@ApiModelProperty("清洗服务")
private String purification_clas; // varchar(100) DEFAULT 'com.zhonglai.luhui.device.protocol.factory.purification.DefaultProtocolPurificationFactoryImpl' COMMENT '清洗服务',
private Integer purification_clas; // varchar(100) DEFAULT 'com.zhonglai.luhui.device.protocol.factory.purification.DefaultProtocolPurificationFactoryImpl' COMMENT '清洗服务',
public String getPurification_clas() {
public Integer getPurification_clas() {
return purification_clas;
}
public void setPurification_clas(String purification_clas) {
public void setPurification_clas(Integer purification_clas) {
this.purification_clas = purification_clas;
}
public String getAnalysis_clas() {
public Integer getAnalysis_clas() {
return analysis_clas;
}
public void setAnalysis_clas(String analysis_clas) {
public void setAnalysis_clas(Integer analysis_clas) {
this.analysis_clas = analysis_clas;
}
... ...
package com.zhonglai.luhui.device.domain;
import com.ruoyi.common.annotation.PublicSQLConfig;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
@ApiModel("产品解析插件包")
public class IotProtocolClass implements Serializable {
@PublicSQLConfig(isSelect=false)
private static final long serialVersionUID = 1L;
@ApiModelProperty("主键id")
private Integer id;
@ApiModelProperty("创建时间")
private String create_time;
@ApiModelProperty("类型(1解析协议,2清洗协议,3数据库同步服务)")
private Integer type;
@ApiModelProperty("名称")
private String name;
@ApiModelProperty("模型举例")
private String case_model;
@ApiModelProperty("类名")
private String classname;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getCreate_time() {
return create_time;
}
public void setCreate_time(String create_time) {
this.create_time = create_time;
}
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCase_model() {
return case_model;
}
public void setCase_model(String case_model) {
this.case_model = case_model;
}
public String getClassname() {
return classname;
}
public void setClassname(String classname) {
this.classname = classname;
}
}
... ...
package com.zhonglai.luhui.admin.controller.data;
import com.ruoyi.common.core.domain.AjaxResult;
import com.zhonglai.luhui.dao.service.PublicService;
import com.zhonglai.luhui.device.analysis.comm.util.TableUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
@Api(tags = "地图轨迹")
@RestController
@RequestMapping("/data/atlasTrajectory")
public class AtlasTrajectoryController {
@Autowired
private PublicService publicService;
@ApiOperation("根据时间测试经纬度轨迹")
@ApiImplicitParams({
@ApiImplicitParam(value = "设备id",name = "deviceInfoId"),
@ApiImplicitParam(value = "经度数据类型",name = "latType"),
@ApiImplicitParam(value = "纬度数据类型",name = "lngType"),
@ApiImplicitParam(value = "日期(2024-06-05)",name = "day"),
@ApiImplicitParam(value = "开始时间(时间戳)",name = "startTime"),
@ApiImplicitParam(value = "结束时间(时间戳)",name = "endTime")
})
@GetMapping(value = "/oneDay/{deviceInfiId}/{day}")
public AjaxResult oneDay(@PathVariable String deviceInfoId,@PathVariable String day,String latType,String lngType,Integer startTime,Integer endTime)
{
String table = TableUtil.getTableName(day,"ly_sensor_data","device_sensor_data",3);
List<Map<String,Object>> list = publicService.getObjectListBySQL("select creat_time,data_value from `"+table+"` where device_info_id='"+deviceInfoId+"' and data_type='"+latType+"'");
return AjaxResult.success(list);
}
}
... ...
package com.zhonglai.luhui.admin.controller.data;
import com.ruoyi.common.core.domain.AjaxResult;
import com.zhonglai.luhui.dao.service.PublicService;
import com.zhonglai.luhui.device.analysis.comm.util.TableUtil;
import com.zhonglai.luhui.sys.service.ISysDictTypeService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
@Api(tags = "传感器数据管理")
@RestController
@RequestMapping("/data/senserData")
public class SenserDataController {
@Autowired
private PublicService publicService;
@ApiOperation("获取一天的原始数据")
@ApiImplicitParams({
@ApiImplicitParam(value = "设备id",name = "deviceInfoId"),
@ApiImplicitParam(value = "数据类型",name = "type"),
@ApiImplicitParam(value = "日期(2024-06-05)",name = "day")
})
@GetMapping(value = "/oneDay/{deviceInfiId}/{type}/{day}")
public AjaxResult oneDay(@PathVariable String deviceInfoId, @PathVariable String type, @PathVariable String day)
{
String table = TableUtil.getTableName(day,"ly_sensor_data","device_sensor_data",3);
List<Map<String,Object>> list = publicService.getObjectListBySQL("select creat_time,data_value from `"+table+"` where device_info_id='"+deviceInfoId+"' and data_type='"+type+"'");
return AjaxResult.success(list);
}
}
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-modules</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>lh-device-operation-service</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<description>
设备操作服务
</description>
<dependencies>
<!-- spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional> <!-- 表示依赖不会传递 -->
</dependency>
<!-- SpringBoot Web容器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions></exclusions>
</dependency>
<!-- Spring框架基本的核心工具 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-jar-device-service</artifactId>
</dependency>
<dependency>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-device-protocol-factory</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
... ...
package com.zhonglai.luhui.device.operation.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan({
"com.zhonglai.luhui.device.operation.service.clien",
})
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class DeviceOperationServiceApplication {
private static Logger log = LoggerFactory.getLogger(DeviceOperationServiceApplication.class);
public static void main(String[] args) {
log.info("启动服务");
SpringApplicationBuilder builder = new SpringApplicationBuilder(DeviceOperationServiceApplication.class);
builder.run( args);
}
}
... ...
package com.zhonglai.luhui.device.operation.service.clien;
import com.alibaba.fastjson.JSON;
import com.zhonglai.luhui.device.analysis.dto.Message;
import com.zhonglai.luhui.device.analysis.dto.MessageCode;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.group:}", topic = "${rocketmq.producer.send-topic:}",selectorType = SelectorType.TAG,selectorExpression = "${rocketmq.producer.send-tags:}",messageModel = MessageModel.BROADCASTING)
public class RocketMqService implements RocketMQReplyListener<MessageExt, Message> {
private static final Logger log = LoggerFactory.getLogger(RocketMqService.class);
@Override
public Message onMessage(MessageExt messageExt) {
log.info("监听到消息{}",messageExt);
// String clint = MessageUtil.getReplyToClient(messageExt);
String str = new String(messageExt.getBody());
log.info("消息body{}",str);
// DeviceCommandApi deviceCommandApi = JSON.parseObject(str, DeviceCommandApi.class);
// try {
// Message message = deviceCommandApi.invokeApi(deviceService);
// log.info("{} 指令执行完以后返回的结果{}",deviceCommandApi.getDeviceCommandApiParameter().getClient_id(),message);
// return message;
// } catch (Exception e) {
// log.error("执行异常",e);
// }
return new Message(MessageCode.DEFAULT_FAIL_CODE,"服务器玩脱了");
}
}
... ...
##服务器配置
server:
tomcat:
uri-encoding: UTF-8
port: 4887
servlet:
context-path: /
spring:
messages:
encoding: UTF-8
mvc:
#出现错误时, 直接抛出异常
throw-exception-if-no-handler-found: true
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
mqtt:
#链接地址
broker: tcp://175.24.61.68:1883
#唯一标识
clientId: ${random.uuid}
topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/{{messageid}}"
username: sysuser
password: "!@#1qaz"
client:
#客户端操作时间
operationTime: 10
# NameServer地址
rocketmq:
name-server: 47.115.144.179:9876
# 默认的消息组
producer:
group: lh-device-operation-service
send-message-timeout: 30000
send-topic: lh-device-operation-service
send-tags: 1
\ No newline at end of file
... ...
<configuration>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/output.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/output.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>5</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="myDataAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/myData/myData.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/myData/myData.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>5</maxHistory>
</rollingPolicy >
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Logger "com.example.first" 的配置 -->
<logger name="myDatalog" level="info" additivity="false">
<appender-ref ref="myDataAppender" />
</logger>
<root level="info">
<appender-ref ref="FILE" />
<appender-ref ref="CONSOLE" />
</root>
</configuration>
... ...
<?xml version="1.0" encoding="UTF-8"?>
<assembly>
<id>bin</id>
<!-- 最终打包成一个用于发布的zip文件 -->
<formats>
<format>zip</format>
</formats>
<!-- Adds dependencies to zip package under lib directory -->
<dependencySets>
<dependencySet>
<!--
不使用项目的artifact,第三方jar不要解压,打包进zip文件的lib目录
-->
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>lib</outputDirectory>
<unpack>false</unpack>
</dependencySet>
</dependencySets>
<fileSets>
<!-- 把项目相关的说明文件,打包进zip文件的根目录 -->
<fileSet>
<directory>${project.basedir}</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>README*</include>
<include>LICENSE*</include>
<include>NOTICE*</include>
</includes>
</fileSet>
<!-- 把项目的配置文件,打包进zip文件的config目录 -->
<fileSet>
<directory>${project.basedir}\src\main\resources\configs</directory>
<outputDirectory>../configs</outputDirectory>
<includes>
<include>*.properties</include>
</includes>
</fileSet>
<!-- 把项目的配置文件,提出来 -->
<fileSet>
<directory>${project.basedir}\src\main\resources</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>*.properties</include>
<include>*.yml</include>
</includes>
</fileSet>
<!-- 把项目的脚本文件目录( src/main/scripts )中的启动脚本文件,打包进zip文件的跟目录 -->
<fileSet>
<directory>${project.basedir}\bin</directory>
<outputDirectory></outputDirectory>
<includes>
<include>start.*</include>
<include>stop.*</include>
</includes>
</fileSet>
<!-- 把项目自己编译出来的jar文件,打包进zip文件的根目录 -->
<fileSet>
<directory>${project.build.directory}</directory>
<outputDirectory></outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
</fileSets>
</assembly>
\ No newline at end of file
... ...
package com.zhonglai.luhui.device.protocol.http.analysis.analysis;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import com.ruoyi.common.utils.GsonConstructor;
import com.zhonglai.luhui.device.analysis.comm.factory.Topic;
import com.zhonglai.luhui.device.analysis.util.TopicUtil;
import com.zhonglai.luhui.device.protocol.factory.analysis.ProtocolParserFactory;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
public class ProtocolParserServiceImpl implements ProtocolParserFactory<JsonObject> {
private static final String topicModel = "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{imei}}/{{topicType}}";
... ...
package com.zhonglai.luhui.device.protocol.http.analysis.purification;
import com.google.gson.JsonObject;
import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.common.utils.GsonConstructor;
import com.zhonglai.luhui.device.analysis.comm.factory.Topic;
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto;
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceInfoDto;
import com.zhonglai.luhui.device.protocol.factory.dto.ProtocolPurificationModel;
import com.zhonglai.luhui.device.protocol.factory.purification.ProtocolPurificationFactory;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Service("http_purification")
public class ProtocolPurificationServiceImpl implements ProtocolPurificationFactory {
@Override
public ProtocolPurificationModel purification(Integer product_id, Topic topic, JsonObject analysisObject ) {
return null;
}
}
package com.zhonglai.luhui.device.protocol.http.analysis.service;
import com.zhonglai.luhui.device.analysis.comm.dao.BaseDao;
import com.zhonglai.luhui.device.domain.IotDevice;
import com.zhonglai.luhui.device.protocol.http.analysis.service.db.LsyDBFactoryImp;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@Service
public class LsyDbService {
private BaseDao lsy_baseDao = new BaseDao(new LsyDBFactoryImp());
public IotDevice getLsyDeviceById(String id)
{
List<Map<String,Object>> list = lsy_baseDao.findListBysql("SELECT order_dtu_imei,device_type FROM `order_dtu` WHERE order_dtu_imei='"+id+"'");
if(null != list && list.size() != 0)
{
Map<String,Object> map = list.get(0);
IotDevice iotDevice = new IotDevice();
iotDevice.setClient_id(id);
iotDevice.setMqtt_username(map.get("device_type")+"");
return iotDevice;
}
return null;
}
}
package com.zhonglai.luhui.device.protocol.http.analysis.service.db;
import com.zhonglai.luhui.device.analysis.comm.dao.DBFactory;
import org.apache.commons.dbcp.BasicDataSourceFactory;
import javax.sql.DataSource;
import java.io.FileInputStream;
import java.util.Properties;
public class LsyDBFactoryImp implements DBFactory {
private static DataSource ds = null;
static {
try {
if(null==ds )
{
String dbPath = System.getProperty("dbPath");
String path = null != dbPath?dbPath:System.getProperty("user.dir")+"/configs/";
Properties p = new Properties();
System.out.println("》》》》》》》》》》》》》数据库配置文件地址:"+path+"lsy_dbcpconfig.properties");
p.load(new FileInputStream(path+"lsy_dbcpconfig.properties"));
// p.load(DBFactory.class
// .getClassLoader().getResourceAsStream("configs/dbcpconfig.properties"));
ds = BasicDataSourceFactory.createDataSource(p);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public DataSource getDataSource() {
return ds;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-device-protocol-parser</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>lh-device-modbus</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-device-protocol-factory</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
... ...
package com.zhonglai.luhui.device.protocol.modbus.analysis;
import com.google.gson.JsonObject;
import com.zhonglai.luhui.device.analysis.comm.factory.Topic;
import com.zhonglai.luhui.device.analysis.util.TopicUtil;
import com.zhonglai.luhui.device.protocol.factory.analysis.ProtocolParserFactory;
import com.zhonglai.luhui.device.protocol.modbus.dto.ModbusDto;
public class ModbusProtocolParserFactoryImpl implements ProtocolParserFactory<byte[]> {
private static final String topicModel = "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{imei}}/{{topicType}}";
@Override
public Topic analysisTopic(String topicStr) {
Topic topic = TopicUtil.initTopicFromModelStr(topicStr,topicModel);
return topic;
}
@Override
public JsonObject analysisPayload(Topic topic, byte[] payload) {
ModbusDto modbusDto = new ModbusDto(payload);
String senserNumber = "1";
modbusDto.getCommdcode();
return null;
}
}
... ...
package com.zhonglai.luhui.device.protocol.modbus.dto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.utils.ByteUtil;
import lombok.Data;
import org.apache.commons.lang3.ArrayUtils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* Modbus协议
*/
@Data
public class ModbusDto implements Serializable {
private static final long serialVersionUID = -6008279428004571734L;
private String hstr;
protected Integer address; //地址位
protected Integer commdcode; //功能码
protected byte[] data; //数据
protected String crc; //16CRC 码
public ModbusDto(Integer address, Integer commdcode, byte[] data)
{
toModbusDto(address,commdcode,data);
}
protected void toModbusDto(Integer address,Integer commdcode,byte[] data)
{
this.address = address;
this.commdcode = commdcode;
this.data = data;
byte[] heardbyte = new byte[2];
heardbyte[0] = address.byteValue();
heardbyte[1] = commdcode.byteValue();
byte[] notlrcdata = ArrayUtils.addAll(heardbyte,data);
this.crc = generateLRC(notlrcdata);
this.hstr = ByteUtil.toHexString(notlrcdata)+this.crc;
}
public ModbusDto()
{
}
public ModbusDto(byte[] bytes)
{
this.hstr = ByteUtil.toHexString(bytes);
byte[] headAndData = ArrayUtils.subarray(bytes,0,bytes.length-2);
byte[] crcByte = ArrayUtils.subarray(bytes,bytes.length-2,bytes.length);
this.crc = ByteUtil.toHexString(crcByte).toUpperCase();
// 校验CRC
if (!ByteUtil.getCRC16(headAndData).equals(crc)) {
System.out.println("CRC校验失败");
return ;
}
this.address = Math.toIntExact(ByteUtil.bytesToLongDESC(headAndData, 0, 1));
this.commdcode = Math.toIntExact(ByteUtil.bytesToLongDESC(headAndData, 1, 1));
this.data = ArrayUtils.subarray(headAndData,2,headAndData.length);;
}
public ModbusDto(String str) {
new ModbusDto(ByteUtil.hexStringToByte(str));
}
public byte[] generateCommd()
{
return ByteUtil.hexStringToByte(hstr);
}
// 计算CRC校验码
public static String generateLRC(byte[] data)
{
return ByteUtil.getCRC16(data);
}
public static void main(String[] args) {
String hexData = "01 03 00 0D 00 01".replace(" ","");
String crc = ByteUtil.getCRC16(ByteUtil.hexStringToByte(hexData));
System.out.println(crc);
Map<Integer,String> map = new HashMap<>();
map.put(1,"ssss");
map.put(2,"eae");
try {
System.out.println(new ObjectMapper().writeValueAsString(map));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
... ...
package com.zhonglai.luhui.device.protocol.factory;
import com.google.gson.JsonObject;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.zhonglai.luhui.device.analysis.comm.factory.Topic;
import com.zhonglai.luhui.device.analysis.util.TopicUtil;
import com.zhonglai.luhui.device.domain.IotProduct;
import com.zhonglai.luhui.device.protocol.factory.analysis.DefaultProtocolParserFactoryImpl;
import com.zhonglai.luhui.device.protocol.factory.analysis.ProtocolParserFactory;
import com.zhonglai.luhui.device.protocol.factory.comm.DataLogType;
import com.zhonglai.luhui.device.protocol.factory.comm.DeviceDataLog;
import com.zhonglai.luhui.device.protocol.factory.config.PluginsClassLoader;
import com.zhonglai.luhui.device.protocol.factory.dto.ProtocolPurificationModel;
import com.zhonglai.luhui.device.protocol.factory.plugins.InitPlugins;
import com.zhonglai.luhui.device.protocol.factory.purification.DefaultProtocolPurificationFactoryImpl;
import com.zhonglai.luhui.device.protocol.factory.purification.ProtocolPurificationFactory;
import com.zhonglai.luhui.device.protocol.factory.service.PersistenceDBService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -21,6 +28,8 @@ import java.util.Map;
*/
@Component
public class ProtocolParserAndPurificationFactory<T> {
@Autowired
protected PersistenceDBService persistenceDBService;
/**
* 根据topic对payload数据进行解析和清洗
... ... @@ -28,14 +37,13 @@ public class ProtocolParserAndPurificationFactory<T> {
* @param payload 数据信息
* @return
*/
public ProtocolPurificationModel analysisAndPurification(Integer product_id,String topicStr, T payload ,String analysis_clas,String purification_clas) throws ClassNotFoundException {
public ProtocolPurificationModel analysisAndPurification(IotProduct iotProduct, String topicStr, T payload ) throws InstantiationException, IllegalAccessException {
Topic baseTopic = TopicUtil.initTopicFromModelStr(topicStr,"/{{roleid}}/{{username}}"); //我们定义的topic
//根据产品类型找到对应的解析服务
ProtocolParserFactory protocolParserFactory = (ProtocolParserFactory) SpringUtils.getBean(Class.forName(analysis_clas));
ProtocolParserFactory protocolParserFactory = getProtocolParserFactory(persistenceDBService.getClassnameFromIotProtocolClassId(iotProduct.getAnalysis_clas()));
//根据产品类型找到对应的清洗服务
ProtocolPurificationFactory protocolPurificationFactory = (ProtocolPurificationFactory) SpringUtils.getBean(Class.forName(purification_clas));
ProtocolPurificationFactory protocolPurificationFactory =getProtocolPurificationFactory(persistenceDBService.getClassnameFromIotProtocolClassId(iotProduct.getPurification_clas()));
if(null != protocolParserFactory && null != protocolPurificationFactory) //需要解析服务和清除服务都存在业务才能进行下去
{
... ... @@ -45,7 +53,7 @@ public class ProtocolParserAndPurificationFactory<T> {
JsonObject jsonObject = protocolParserFactory.analysisPayload(topic,payload);
//通过模式对解析结果进行清洗,获得到的数据就是业务数据
ProtocolPurificationModel protocolPurificationModel = protocolPurificationFactory.purification(product_id,topic,jsonObject);
ProtocolPurificationModel protocolPurificationModel = protocolPurificationFactory.purification(iotProduct.getId(),topic,jsonObject);
return protocolPurificationModel;
}
... ... @@ -54,4 +62,47 @@ public class ProtocolParserAndPurificationFactory<T> {
return null;
}
/**
* 获取解析服务
* @param analysis_clas
* @return
* @throws InstantiationException
* @throws IllegalAccessException
*/
private ProtocolParserFactory getProtocolParserFactory(String analysis_clas) throws InstantiationException, IllegalAccessException {
if(StringUtils.isEmpty(analysis_clas))
{
return new DefaultProtocolParserFactoryImpl();
}
ProtocolParserFactory protocolParserFactory = PluginsClassLoader.getJarClass(ProtocolParserFactory.class,analysis_clas);
if(null == protocolParserFactory)
{
System.out.println("没有找到解析服务使用默认的");
protocolParserFactory = new DefaultProtocolParserFactoryImpl();
}
return protocolParserFactory;
}
/**
* 获取清洗服务
* @param purification_clas
* @return
* @throws InstantiationException
* @throws IllegalAccessException
*/
private ProtocolPurificationFactory getProtocolPurificationFactory(String purification_clas) throws InstantiationException, IllegalAccessException {
if(StringUtils.isEmpty(purification_clas))
{
return new DefaultProtocolPurificationFactoryImpl();
}
ProtocolPurificationFactory protocolPurificationFactory = PluginsClassLoader.getJarClass(ProtocolPurificationFactory.class,purification_clas);
if(null == protocolPurificationFactory)
{
protocolPurificationFactory = new DefaultProtocolPurificationFactoryImpl();
}
return protocolPurificationFactory;
}
}
... ...
... ... @@ -6,13 +6,11 @@ import com.zhonglai.luhui.device.analysis.comm.factory.Topic;
import com.zhonglai.luhui.device.analysis.util.TopicUtil;
import org.springframework.stereotype.Service;
@Service
public class DefaultProtocolParserFactoryImpl implements ProtocolParserFactory<byte[]>{
private static final String topicModel = "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{imei}}/{{topicType}}";
private static final String topicModel = "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}";
@Override
public Topic analysisTopic(String topicStr) {
// /13/jiulin/476210165B365166812345678Userdata/Json/476210165B365166812345678/pub_data
Topic topic = TopicUtil.initTopicFromModelStr(topicStr,topicModel);
return topic;
}
... ...
package com.zhonglai.luhui.device.protocol.factory.config;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.zhonglai.luhui.device.protocol.factory.analysis.ProtocolParserFactory;
import com.zhonglai.luhui.device.protocol.factory.plugins.InitPlugins;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.FileCopyUtils;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
/**
* 自定义的类加载器
*/
public class PluginsClassLoader extends URLClassLoader {
/**
* 存放类
*/
private static Map<String,PluginsClassLoader> jarMap = new HashMap<>();
private static Map<String,Class<?>> classMap = new HashMap<>();
private PluginsClassLoader(URL[] urls) {
super(urls,PluginsClassLoader.class.getClassLoader());
}
/**
* 卸载jar
* @throws IOException
*/
private static void unloadJar(String... filePaths) throws IOException {
for (String filePath:filePaths)
{
String key = InitPlugins.toJarPath(filePath);
if(jarMap.containsKey(key))
{
PluginsClassLoader pluginsClassLoader = jarMap.get(key);
jarMap.remove(key);
pluginsClassLoader.close();
}
File file = new File(key);
if(file.exists())
{
file.delete();
}
}
}
/**
* 更新jar
* @throws IOException
*/
public static void uploadJar(File... filePaths) {
try {
for (File file:filePaths)
{
String filePath = file.getAbsolutePath();
unloadJar(filePath);
String key = InitPlugins.toJarPath(filePath);
FileCopyUtils.copy(new File(filePath),new File(key));
PluginsClassLoader pluginsClassLoader = new PluginsClassLoader(new URL[]{new URL("file:"+key)});
jarMap.put(key,pluginsClassLoader);
laodJar(new File(key),pluginsClassLoader);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static void laodJar(File jarfile,ClassLoader classLoader)
{
JarFile jarFile = null;
try {
jarFile = new JarFile(jarfile);
Enumeration<JarEntry> entries = jarFile.entries();
while (entries.hasMoreElements())
{
JarEntry jarEntry = entries.nextElement();
String entryName = jarEntry.getName();
if (entryName != null && entryName.endsWith(".class")) {
entryName = entryName.replace("/", ".").substring(0, entryName.lastIndexOf("."));
Class<?> clas = classLoader.loadClass(entryName);
System.out.println(ProtocolParserFactory.class.isAssignableFrom(clas));;
if(SpringUtils.isSpringBean())
{
Object object = loadBean(clas);
}
classMap.put(entryName,clas);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
private static <T> T loadBean(Class<T> clas)
{
if(isInstantiable(clas))
{
try {
Object object = clas.newInstance();
if(clas.isAnnotationPresent(Component.class) || clas.isAnnotationPresent(Service.class))
{
SpringUtils.registerBean(null,object);
return SpringUtils.getBean(clas);
}else {
return (T) object;
}
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return null;
}
private static boolean isInstantiable(Class<?> clazz) {
// 检查类是否是抽象类或接口
if (Modifier.isAbstract(clazz.getModifiers()) || clazz.isInterface()) {
return false;
}
// 尝试查找并访问默认的无参构造器
try {
clazz.getDeclaredConstructor((Class[]) null);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
public static <T> T getJarClass(Class<T> tClass,String classname) {
Class clazz = classMap.get(classname);
if (null != clazz)
{
System.out.println("接口的类加载器:"+tClass.getClassLoader());
System.out.println("实现类的类加载器:"+clazz.getClassLoader());
try {
if (tClass.isAssignableFrom(clazz) && !clazz.equals(tClass)) {
return tClass.cast(clazz.getDeclaredConstructor().newInstance());
}else if(tClass.isAssignableFrom(clazz) && clazz.equals(tClass))
{
return tClass.newInstance();
}
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
return null;
}
}
... ...
package com.zhonglai.luhui.device.protocol.factory.control;
import com.zhonglai.luhui.device.analysis.comm.clien.ClienConnection;
import com.zhonglai.luhui.device.analysis.comm.clien.impl.ClienConnectionImpl;
import com.zhonglai.luhui.device.analysis.dto.Message;
import com.zhonglai.luhui.device.analysis.dto.MessageCode;
import net.jodah.expiringmap.ExpirationListener;
import net.jodah.expiringmap.ExpirationPolicy;
import net.jodah.expiringmap.ExpiringMap;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* 控制工厂
*/
public abstract class ControlFactory implements RocketMQReplyListener<MessageExt, Message> {
private static final Logger log = LoggerFactory.getLogger(ControlFactory.class);
protected static ExpiringMap<String, ClienConnection> clienConnectionMap = ExpiringMap.builder().maxSize(20000).expiration(15, TimeUnit.SECONDS)
.asyncExpirationListener(new ExpirationListener<String, ClienConnection>() {
@Override
public void expired(String s, ClienConnection clienConnection) {
log.info("{} 通道消失了>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>",s);
clienConnection.close();
}
})
.expirationPolicy(ExpirationPolicy.CREATED).build();
@Override
public Message onMessage(MessageExt message) {
log.info("监听到消息{}",message);
String str = new String(message.getBody());
log.info("消息body{}",str);
try {
return sendMessage(str);
} catch (InterruptedException e) {
log.error("消息发送异常",e);
return new Message(MessageCode.DEFAULT_FAIL_CODE,"系统异常,请联系管理员");
}
}
private Message sendMessage(String str) throws InterruptedException {
//设置通知渠道
ClienConnection clienConnection = new ClienConnectionImpl();
String key = analysisMessageKey();
log.info("设置通知渠道 {} {}",key,str);
if(clienConnectionMap.containsKey(key))
{
return new Message(MessageCode.DEFAULT_FAIL_CODE,"有人正在操作,请稍后尝试");
}
clienConnectionMap.put(key,clienConnection);
if(transmitMessage(str))
{
synchronized(clienConnection)
{
log.info("{}等待通知",key);
clienConnection.wait(getOperationTime()+3000l);
}
//清楚通道
clienConnectionMap.remove(key);
log.info("{}收到通知{}",key,clienConnection.getReplyMessage().getMessage());
Message message = clienConnection.getReplyMessage();
log.info("{}返回通知{}",key,message);
return message;
}
return new Message(MessageCode.DEFAULT_FAIL_CODE,"消息转发失败,联系管理员");
}
/**
* 获取消息key,一般使用imei号
* @return
*/
protected abstract String analysisMessageKey();
/**
* 转发消息
* @param str
* @return
*/
protected abstract Boolean transmitMessage(String str);
/**
* 客户端操作等待时间,推荐10000
* @return
*/
protected abstract long getOperationTime(); //客户端操作时间
}
... ...
package com.zhonglai.luhui.device.protocol.factory.control.clien;
import com.zhonglai.luhui.device.analysis.comm.dto.ApiClientRePlyDto;
import com.zhonglai.luhui.device.analysis.dto.Message;
/**
* 客户端链接
*/
public interface ClienConnection {
public void close();
/**
* 回复
*/
public void reply(ApiClientRePlyDto apiClientRePlyDto);
/**
* 回复
*/
public Message getReplyMessage();
}
... ...
package com.zhonglai.luhui.device.protocol.factory.control.clien.impl;
import com.zhonglai.luhui.device.analysis.comm.dto.ApiClientRePlyDto;
import com.zhonglai.luhui.device.analysis.dto.Message;
import com.zhonglai.luhui.device.analysis.dto.MessageCode;
import com.zhonglai.luhui.device.analysis.dto.MessageCodeType;
import com.zhonglai.luhui.device.protocol.factory.control.clien.ClienConnection;
public class ClienConnectionImpl implements ClienConnection {
private Message message = new Message();
@Override
public void close() {
this.message.setCode(MessageCode.DEFAULT_FAIL_CODE);
this.message.setMessage("链接超时关闭");
this.notify();
}
@Override
public void reply(ApiClientRePlyDto apiClientRePlyDto) {
apiClientRePlyDto.setReplyMessage(message);
this.notify();
}
@Override
public Message getReplyMessage() {
return message;
}
public ClienConnectionImpl setCode(MessageCodeType code)
{
this.message.setCode(code);
return this;
}
public ClienConnectionImpl setData(Object data)
{
this.message.setData(data);
return this;
}
public ClienConnectionImpl setMessage(String message)
{
this.message.setMessage(message);
return this;
}
}
... ...
... ... @@ -21,21 +21,23 @@ public class AnalysisDto {
return this;
}
public ParserDeviceHostDto toParserDeviceHostDto(String id)
public ParserDeviceHostDto toParserDeviceHostDto(String id,Integer time)
{
ParserDeviceHostDto parserDeviceHostDto = new ParserDeviceHostDto();
parserDeviceHostDto.setData(things_model_value);
parserDeviceHostDto.setConfig(things_model_config);
parserDeviceHostDto.setId(id);
parserDeviceHostDto.setUpdateTime(time);
return parserDeviceHostDto;
}
public ParserDeviceInfoDto toParserDeviceInfoDto(String id)
public ParserDeviceInfoDto toParserDeviceInfoDto(String id,Integer time)
{
ParserDeviceInfoDto parserDeviceInfoDto = new ParserDeviceInfoDto();
parserDeviceInfoDto.setData(things_model_value);
parserDeviceInfoDto.setConfig(things_model_config);
parserDeviceInfoDto.setId(id);
parserDeviceInfoDto.setUpdateTime(time);
return parserDeviceInfoDto;
}
}
... ...
package com.zhonglai.luhui.device.protocol.factory.dto;
import com.google.gson.JsonObject;
import com.zhonglai.luhui.device.domain.IotProduct;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
... ... @@ -30,28 +31,6 @@ public class ParserDeviceHostDto {
* 数据类型
*/
private String device_type;
/**
*同步数据的
*/
private String sync_db;
/**
*是否同步数据
*/
private boolean is_sync_db;
/**
*产品id
*/
private Integer product_id;
/**
* 解析服务
*/
private String analysis_clas;
/**
* 清洗服务
*/
private String purification_clas;
/**
* 设备摘要,格式{{"name":"device"},{"chip":"esp8266"}}
... ... @@ -61,4 +40,6 @@ public class ParserDeviceHostDto {
private boolean is_config_up;
private boolean is_data_up;
private boolean is_summary_up;
private IotProduct iotProduct;
}
... ...
package com.zhonglai.luhui.device.protocol.factory.plugins;
import com.zhonglai.luhui.device.protocol.factory.analysis.ProtocolParserFactory;
import com.zhonglai.luhui.device.protocol.factory.config.PluginsClassLoader;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.util.ArrayList;
import java.util.List;
@Component
public class FileChangeListener {
private WatchService watchService;
private Thread watcherThread;
private volatile boolean running = true;
public void startFileWatcher() throws IOException {
//初始化插件
InitPlugins.init();
//注册文件监听
registerListener();
//启动监听线程
startThread();
System.out.println("启动成功");
ProtocolParserFactory protocolParserFactory = PluginsClassLoader.getJarClass(ProtocolParserFactory.class,"com.zhonglai.luhui.device.protocol.http.analysis.analysis.ProtocolParserServiceImpl");
System.out.println(protocolParserFactory.analysisTopic("/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{imei}}/{{topicType}}"));
}
@EventListener(ContextRefreshedEvent.class)
public void onApplicationEvent(ContextRefreshedEvent event) throws IOException {
startFileWatcher();
}
private void registerListener() throws IOException {
// 创建WatchService,它是对操作系统的文件监视器的封装,相对之前,不需要遍历文件目录,效率要高很多
watchService = FileSystems.getDefault().newWatchService();
// 注册指定目录使用的监听器,监视目录下文件的变化;
// PS:Path必须是目录,不能是文件;
// StandardWatchEventKinds.ENTRY_MODIFY,表示监视文件的修改事件
Path path = Paths.get(InitPlugins.getPluginsPath());
path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
}
private void startThread()
{
// 创建并启动监听线程
watcherThread = new Thread(this::watchFileChanges);
watcherThread.start();
}
private void watchFileChanges() {
while (running) {
try {
// 获取目录的变化:
// take()是一个阻塞方法,会等待监视器发出的信号才返回。
// 还可以使用watcher.poll()方法,非阻塞方法,会立即返回当时监视器中是否有信号。
// 返回结果WatchKey,是一个单例对象,与前面的register方法返回的实例是同一个;
WatchKey key = watchService.take();
// 处理文件变化事件:
// key.pollEvents()用于获取文件变化事件,只能获取一次,不能重复获取,类似队列的形式。
for (WatchEvent<?> event : key.pollEvents()) {
// event.kind():事件类型
if (event.kind() == StandardWatchEventKinds.OVERFLOW) {
//事件可能lost or discarded
continue;
}
// 返回触发事件的文件或目录的路径(相对路径)
Path fileName = (Path) event.context();
System.out.println("文件更新: " + fileName);
PluginsClassLoader.uploadJar(new File(key.watchable().toString()+"/"+fileName));
}
// 每次调用WatchService的take()或poll()方法时需要通过本方法重置
if (!key.reset()) {
break;
}
} catch (Exception e) {
e.printStackTrace();
// 如果有必要,可以在这里优雅地处理异常,比如重新注册WatchKey
break;
}
}
}
@PreDestroy
public void stopFileWatcher() {
running = false; // 设置标志以停止线程
// 等待线程完成(可选,但可能不总是需要的,取决于你的需求)
try {
watcherThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭WatchService
try {
if (watchService != null) {
watchService.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
FileChangeListener fileChangeListener = new FileChangeListener();
fileChangeListener.startFileWatcher();
System.out.println("启动成功");
}
}
... ...
package com.zhonglai.luhui.device.protocol.factory.plugins;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.zhonglai.luhui.device.analysis.comm.factory.Topic;
import com.zhonglai.luhui.device.protocol.factory.analysis.DefaultProtocolParserFactoryImpl;
import com.zhonglai.luhui.device.protocol.factory.analysis.ProtocolParserFactory;
import com.zhonglai.luhui.device.protocol.factory.config.PluginsClassLoader;
import com.zhonglai.luhui.device.protocol.factory.purification.DefaultProtocolPurificationFactoryImpl;
import io.swagger.v3.oas.annotations.servers.Server;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.FileCopyUtils;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
public class InitPlugins {
private static final String pluginsPath = "plugins";
private static final String pluginsJarPath = "pluginsjar";
public static void init()
{
// //加载默认的class
// //默认协议解析
// classMap.put("com.zhonglai.luhui.device.protocol.factory.analysis.DefaultProtocolParserFactoryImpl", DefaultProtocolParserFactoryImpl.class);
// //默认协议清洗
// classMap.put("com.zhonglai.luhui.device.protocol.factory.purification.DefaultProtocolPurificationFactoryImpl", DefaultProtocolPurificationFactoryImpl.class);
//
// File[] files = getJarFiles(getPluginsPath());
// loaderJar(files);
File[] files = getJarFiles(getPluginsPath());
PluginsClassLoader.uploadJar(files);
}
public static String getPluginsPath()
{
String jarFilePath = System.getProperty("jarFilePath");
String path = null != jarFilePath?jarFilePath:System.getProperty("user.dir")+"/"+pluginsPath;
return path;
}
public static String getPluginsJarPath()
{
String path = System.getProperty("user.dir")+"/"+pluginsJarPath;
return path;
}
public static File[] getJarFiles(String jarFilePath)
{
File file = new File(jarFilePath);
if(file.exists() && file.isDirectory())
{
File[] files = file.listFiles((dir, name) -> {
if(name.endsWith(".jar"))
{
return true;
}
return false;
});
return files;
}
return null;
}
public static void main(String[] args) throws MalformedURLException {
File[] files = getJarFiles(getPluginsPath());
PluginsClassLoader.uploadJar(files);
// String jarFilePath = "E:\\work\\idea\\Luhui\\lh-modules\\lh-device-protocol-parser\\lh-device-xinjie\\target";
// File[] files = getJarFiles(jarFilePath);
// loaderJar(files);
// try {
// ProtocolParserFactory protocolParserFactory = getJarClass(ProtocolParserFactory.class,"com.zhonglai.luhui.device.protocol.xinjie.analysis.ProtocolParserServiceImpl");
// Topic topic = protocolParserFactory.analysisTopic("/13/jiulin/476210165B365166812345678Userdata/Json/476210165B365166812345678/pub_data");
// System.out.println(topic);
// } catch (InstantiationException e) {
// throw new RuntimeException(e);
// } catch (IllegalAccessException e) {
// throw new RuntimeException(e);
// }
}
// public static <T> T getJarClass(Class<T> clazz, String className) {
// try {
// Class<?> loadedClass = Class.forName(className);
// System.out.println("接口的类加载器:"+clazz.getClassLoader());
// System.out.println("实现类的类加载器:"+loadedClass.getClassLoader());
// if (clazz.isAssignableFrom(loadedClass)) {
// return clazz.cast(loadedClass.getDeclaredConstructor().newInstance());
// }
// } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
// // Handle exceptions here
// }
// return null;
// }
public static String toJarPath(String filePath)
{
return filePath.replace(pluginsPath,pluginsJarPath);
}
}
... ...
package com.zhonglai.luhui.device.protocol.factory.purification;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.common.utils.GsonConstructor;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.zhonglai.luhui.device.analysis.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.device.analysis.comm.dto.thingsmodels.ThingsModelBase;
import com.zhonglai.luhui.device.analysis.comm.dto.thingsmodels.ThingsModelDataTypeEnum;
import com.zhonglai.luhui.device.analysis.comm.dto.thingsmodels.ThingsModelItemBase;
import com.zhonglai.luhui.device.analysis.comm.factory.Topic;
import com.zhonglai.luhui.device.analysis.comm.util.DateUtils;
import com.zhonglai.luhui.device.domain.IotThingsModel;
import com.zhonglai.luhui.device.protocol.factory.dto.AnalysisDto;
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto;
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceInfoDto;
import com.zhonglai.luhui.device.protocol.factory.dto.ProtocolPurificationModel;
import com.zhonglai.luhui.device.protocol.factory.service.IotThingsModelService;
import com.zhonglai.luhui.device.protocol.factory.service.PersistenceDBService;
import org.apache.commons.lang3.EnumUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Service("default_purification")
/**
* 默认的清洗服务
*/
public class DefaultProtocolPurificationFactoryImpl implements ProtocolPurificationFactory{
@Autowired
private IotThingsModelService iotThingsModelService;
@Override
public ProtocolPurificationModel purification(Integer product_id,Topic topic, JsonObject protocolParserModel ) {
... ... @@ -40,7 +34,7 @@ public class DefaultProtocolPurificationFactoryImpl implements ProtocolPurificat
ParserDeviceHostDto parserDeviceHostDto = analysisHost(product_id,topic,protocolParserModel,protocolPurificationModel.getDeviceSensorDataList());
protocolPurificationModel.setParserDeviceHostDto(parserDeviceHostDto);
List<ParserDeviceInfoDto> parserDeviceInfoDtoList = analysisDeviceInfo(product_id,topic,protocolParserModel,protocolPurificationModel.getDeviceSensorDataList());
List<ParserDeviceInfoDto> parserDeviceInfoDtoList = analysisDeviceInfo(parserDeviceHostDto.getUpdateTime(),product_id,topic,protocolParserModel,protocolPurificationModel.getDeviceSensorDataList());
protocolPurificationModel.setParserDeviceInfoDtoList(parserDeviceInfoDtoList);
return protocolPurificationModel;
}
... ... @@ -59,10 +53,11 @@ public class DefaultProtocolPurificationFactoryImpl implements ProtocolPurificat
summary = jsonObjectData.get("summary").getAsJsonObject();
jsonObjectData.remove("summary");
}
int time = DateUtils.getNowTimeMilly();
ParserDeviceHostDto parserDeviceHostDto = null;
if(null != jsonObjectData && jsonObjectData.size() != 0)
{
parserDeviceHostDto = analysisJsonData(product_id,topic,jsonObjectData).pushDeviceSensorData(deviceSensorDataList).toParserDeviceHostDto(topic.getClientid());
parserDeviceHostDto = analysisJsonData(time,"0",product_id,topic,jsonObjectData).pushDeviceSensorData(deviceSensorDataList).toParserDeviceHostDto(topic.getClientid(),time);
}
if(null != summary && summary.size() != 0)
... ... @@ -74,12 +69,13 @@ public class DefaultProtocolPurificationFactoryImpl implements ProtocolPurificat
}
parserDeviceHostDto.setSummary(summary);
}
parserDeviceHostDto.setUpdateTime(time);
return parserDeviceHostDto;
}
return null;
}
private List<ParserDeviceInfoDto> analysisDeviceInfo(Integer product_id,Topic topic, JsonObject data,List<DeviceSensorData> deviceSensorDataList)
private List<ParserDeviceInfoDto> analysisDeviceInfo(Integer time,Integer product_id,Topic topic, JsonObject data,List<DeviceSensorData> deviceSensorDataList)
{
List<ParserDeviceInfoDto> list = new ArrayList<>();
... ... @@ -87,7 +83,7 @@ public class DefaultProtocolPurificationFactoryImpl implements ProtocolPurificat
{
if(!key.equals("0") && data.get(key).isJsonObject())
{
ParserDeviceInfoDto parserDeviceInfoDto = analysisJsonData(product_id,topic,data.get(key).getAsJsonObject()).pushDeviceSensorData(deviceSensorDataList).toParserDeviceInfoDto(topic.getClientid()+"_"+key);
ParserDeviceInfoDto parserDeviceInfoDto = analysisJsonData(time,key,product_id,topic,data.get(key).getAsJsonObject()).pushDeviceSensorData(deviceSensorDataList).toParserDeviceInfoDto(topic.getClientid()+"_"+key,time);
list.add(parserDeviceInfoDto);
}
}
... ... @@ -95,7 +91,7 @@ public class DefaultProtocolPurificationFactoryImpl implements ProtocolPurificat
return list;
}
private AnalysisDto analysisJsonData(Integer product_id,Topic topic, JsonObject jsonObjectData )
private AnalysisDto analysisJsonData(Integer time,String sensorNumber,Integer product_id,Topic topic, JsonObject jsonObjectData )
{
if(null != jsonObjectData && jsonObjectData.size() !=0)
{
... ... @@ -108,7 +104,7 @@ public class DefaultProtocolPurificationFactoryImpl implements ProtocolPurificat
analysisDto.setThings_model_config(things_model_config);
for (String key: jsonObjectData.keySet())
{
IotThingsModel thingsModel = iotThingsModelService.getIotThingsModel(product_id, key);
IotThingsModel thingsModel = SpringUtils.getBean(IotThingsModelService.class).getIotThingsModel(product_id, key);
JsonElement jsonElement = jsonObjectData.get(key);
if(null != jsonElement && !jsonElement.isJsonNull() )
... ... @@ -145,7 +141,7 @@ public class DefaultProtocolPurificationFactoryImpl implements ProtocolPurificat
deviceSensorDataList = new ArrayList<>();
analysisDto.setDeviceSensorDataList(deviceSensorDataList);
}
deviceSensorDataList.add(getDeviceSensorData(topic,thingsModel,thingsModelItemBase));
deviceSensorDataList.add(getDeviceSensorData(time,sensorNumber,topic,thingsModel,thingsModelItemBase));
}
}
}
... ... @@ -155,9 +151,6 @@ public class DefaultProtocolPurificationFactoryImpl implements ProtocolPurificat
return null;
}
private ThingsModelItemBase getThingsModelItemBase(IotThingsModel thingsModel,JsonElement jsonElement)
{
String data_type = thingsModel.getData_type().toUpperCase();
... ... @@ -168,14 +161,19 @@ public class DefaultProtocolPurificationFactoryImpl implements ProtocolPurificat
return ThingsModelItemBase.newhingsModelItem(Enum.valueOf(ThingsModelDataTypeEnum.class,data_type),thingsModel, jsonElement);
}
private DeviceSensorData getDeviceSensorData(Topic topic,IotThingsModel thingsModel,ThingsModelItemBase thingsModelItemBase)
private DeviceSensorData getDeviceSensorData(Integer time,String sensorNumber,Topic topic,IotThingsModel thingsModel,ThingsModelItemBase thingsModelItemBase)
{
DeviceSensorData sensorData = new DeviceSensorData();
sensorData.setDataType(thingsModel.getIdentifier());
sensorData.setDataValue(thingsModelItemBase.getSaveView());
sensorData.setCreatTime(com.zhonglai.luhui.device.analysis.comm.util.DateUtils.getNowTimeMilly());
sensorData.setCreatTime(time);
sensorData.setDeviceModel(topic.getUsername());
sensorData.setDeviceInfoId(topic.getClientid());
if("0".equals(sensorNumber))
{
sensorData.setDeviceInfoId(topic.getClientid());
}else {
sensorData.setDeviceInfoId(topic.getClientid()+"_"+sensorNumber);
}
return sensorData;
}
... ...
package com.zhonglai.luhui.device.protocol.factory.service;
import com.zhonglai.luhui.device.protocol.factory.ProtocolParserAndPurificationFactory;
import com.zhonglai.luhui.device.protocol.factory.control.ControlFactory;
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto;
import com.zhonglai.luhui.device.protocol.factory.dto.ProtocolPurificationModel;
import com.zhonglai.luhui.device.protocol.factory.sync.ProtocolSyncFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -35,10 +37,11 @@ public abstract class BaseCallback<T> {
@Autowired
protected PersistenceDBService persistenceDBService;
/**
* 数据处理的工作流
*/
protected void messageArrived(String imei,String s,T payload) throws ClassNotFoundException {
protected void messageArrived(String imei,String s,T payload) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
//判断网关是否存在
ParserDeviceHostDto oldparserDeviceHostDto = persistenceDBService.getOldParserDeviceHostDto(imei);
if(null == oldparserDeviceHostDto)
... ... @@ -48,7 +51,7 @@ public abstract class BaseCallback<T> {
}
//解析和清洗body
ProtocolPurificationModel protocolPurificationModel = protocolParserAndPurificationFactory.analysisAndPurification( oldparserDeviceHostDto.getProduct_id(),s,payload,oldparserDeviceHostDto.getAnalysis_clas(),oldparserDeviceHostDto.getPurification_clas());
ProtocolPurificationModel protocolPurificationModel = protocolParserAndPurificationFactory.analysisAndPurification( oldparserDeviceHostDto.getIotProduct(),s,payload);
//缓存更新
int i = deviceCashUpService.upProtocolPurificationModel(protocolPurificationModel,oldparserDeviceHostDto);
... ... @@ -59,5 +62,9 @@ public abstract class BaseCallback<T> {
//同步
persistenceDBService.syncDB(protocolPurificationModel);
//返回
}
}
... ...
... ... @@ -6,8 +6,10 @@ import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.zhonglai.luhui.device.analysis.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.device.analysis.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.device.analysis.comm.dto.thingsmodels.ThingsModelDataTypeEnum;
import com.zhonglai.luhui.device.domain.IotDevice;
import com.zhonglai.luhui.device.domain.IotProduct;
import com.zhonglai.luhui.device.domain.IotProtocolClass;
import com.zhonglai.luhui.device.domain.IotThingsModel;
import com.zhonglai.luhui.device.protocol.factory.config.DeviceCach;
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto;
... ... @@ -27,7 +29,7 @@ import java.util.Map;
*/
@Service
public class PersistenceDBService {
private static Map<Integer, IotProtocolClass> iotProtocolClassMap = new HashMap<>();
@Autowired
private DefaultDbService defaultDbService;
... ... @@ -60,9 +62,9 @@ public class PersistenceDBService {
public void syncDB(ProtocolPurificationModel protocolPurificationModel) throws ClassNotFoundException {
//主机
ParserDeviceHostDto parserDeviceHostDto = protocolPurificationModel.getParserDeviceHostDto();
if (parserDeviceHostDto.is_sync_db() && StringUtils.isNotEmpty(parserDeviceHostDto.getSync_db()))
if (parserDeviceHostDto.getIotProduct().getIs_sync_db()==1 && StringUtils.isNotEmpty(parserDeviceHostDto.getIotProduct().getSync_db()))
{
ProtocolSyncFactory protocolSyncFactory = getProtocolSyncService(parserDeviceHostDto.getSync_db());
ProtocolSyncFactory protocolSyncFactory = getProtocolSyncService(parserDeviceHostDto.getIotProduct().getSync_db());
if(null != protocolSyncFactory)
{
protocolSyncFactory.updateParserDeviceHostDto(parserDeviceHostDto);
... ... @@ -198,27 +200,29 @@ public class PersistenceDBService {
oldParserDeviceHostDto.setDevice_life(iotDevice.getDevice_life());
oldParserDeviceHostDto.setDevice_type(iotDevice.getMqtt_username());
oldParserDeviceHostDto.set_sync_db(iotProduct.getIs_sync_db()==1);
oldParserDeviceHostDto.setSync_db(iotProduct.getSync_db());
oldParserDeviceHostDto.setProduct_id(iotDevice.getProduct_id());
oldParserDeviceHostDto.setAnalysis_clas(iotProduct.getAnalysis_clas());
oldParserDeviceHostDto.setPurification_clas(iotProduct.getPurification_clas());
//设置同步设备
// if("1".equals(iotProduct.getIs_sync_db()) && StringUtils.isNotEmpty(iotProduct.getSync_db()))
// {
// ParserDeviceHostDto parserDeviceHostDto = getSyncDeviceById(iotDevice.getClient_id(),iotProduct.getSync_db());
// if(null != parserDeviceHostDto)
// {
// if(StringUtils.isNull(parserDeviceHostDto.getDevice_life()))
// {
// parserDeviceHostDto.setDevice_life(iotDevice.getDevice_life());
// }
// parserDeviceHostDto.setId(iotProduct.getSync_db()+"|"+parserDeviceHostDto.getId());
// DeviceCach.putDeviceHost(parserDeviceHostDto,parserDeviceHostDto.getDevice_life());
// }
// }
oldParserDeviceHostDto.setIotProduct(iotProduct);
}
return oldParserDeviceHostDto;
}
private IotProtocolClass getIotProtocolClass(Integer id)
{
if(!iotProtocolClassMap.containsKey(id))
{
IotProtocolClass iotProtocolClass = defaultDbService.getIotProtocolClass(id);
iotProtocolClass.setCase_model(null);
iotProtocolClassMap.put(id,iotProtocolClass);
}
return iotProtocolClassMap.get(id);
}
/**
* 获取产品解析插件包类名
* @param id
* @return
*/
public String getClassnameFromIotProtocolClassId(Integer id)
{
return getIotProtocolClass(id).getClassname();
}
}
... ...
... ... @@ -8,10 +8,7 @@ import com.zhonglai.luhui.device.analysis.comm.dao.BaseDao;
import com.zhonglai.luhui.device.analysis.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.device.analysis.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.device.analysis.comm.dto.TableGenerateSqlEnum;
import com.zhonglai.luhui.device.domain.IotDevice;
import com.zhonglai.luhui.device.domain.IotProduct;
import com.zhonglai.luhui.device.domain.IotTerminal;
import com.zhonglai.luhui.device.domain.IotThingsModel;
import com.zhonglai.luhui.device.domain.*;
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto;
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceInfoDto;
import org.springframework.stereotype.Service;
... ... @@ -39,6 +36,11 @@ public class DefaultDbService {
return rlist;
}
public IotProtocolClass getIotProtocolClass(Integer id)
{
return (IotProtocolClass) baseDao.get(IotProtocolClass.class,"id="+id+"","`mqtt_broker`.`iot_protocol_class`");
}
public IotDevice getDeviceById(String id)
{
return (IotDevice) baseDao.get(IotDevice.class,"client_id='"+id+"'","`mqtt_broker`.`iot_device`");
... ... @@ -67,25 +69,6 @@ public class DefaultDbService {
}
public ParserDeviceHostDto getParserDeviceHostDtoFromDb(String id) {
IotDevice iotDevice = getDeviceById(id);
ParserDeviceHostDto parserDeviceHostDto = new ParserDeviceHostDto();
parserDeviceHostDto.setId(id);
if(StringUtils.isNotEmpty(iotDevice.getSummary()))
{
parserDeviceHostDto.setData(GsonConstructor.get().fromJson(iotDevice.getSummary(),JsonObject.class));
}
parserDeviceHostDto.setUpdateTime(iotDevice.getData_update_time());
parserDeviceHostDto.setDevice_life(iotDevice.getDevice_life());
parserDeviceHostDto.setDevice_type(iotDevice.getMqtt_username());
IotProduct iotProduct = getIotProductById(iotDevice.getProduct_id());
parserDeviceHostDto.setSync_db(iotProduct.getSync_db());
parserDeviceHostDto.setProduct_id(iotDevice.getProduct_id());
return parserDeviceHostDto;
}
public ParserDeviceInfoDto getParserDeviceInfoDtoFromDb(String id,String name, ParserDeviceHostDto oldparserDeviceHostDto) {
IotTerminal iotTerminal = (IotTerminal) baseDao.get(IotTerminal.class,id);
if(null == iotTerminal)
... ... @@ -95,7 +78,7 @@ public class DefaultDbService {
iotTerminal.setDevice_id(oldparserDeviceHostDto.getId());
iotTerminal.setMqtt_username(oldparserDeviceHostDto.getDevice_type());
iotTerminal.setCreate_time(oldparserDeviceHostDto.getUpdateTime());
iotTerminal.setProduct_id(oldparserDeviceHostDto.getProduct_id());
iotTerminal.setProduct_id(oldparserDeviceHostDto.getIotProduct().getId());
if(StringUtils.isNotEmpty(name))
{
iotTerminal.setName(name);
... ...
package com.zhonglai.luhui.device.protocol.factory.sync;
import com.ruoyi.common.utils.StringUtils;
import com.zhonglai.luhui.device.analysis.comm.dao.BaseDao;
import com.zhonglai.luhui.device.analysis.comm.dto.DeviceSensorData;
import com.zhonglai.luhui.device.analysis.comm.dto.LogDeviceOperation;
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceHostDto;
import com.zhonglai.luhui.device.protocol.factory.dto.ParserDeviceInfoDto;
import com.zhonglai.luhui.device.protocol.factory.dto.ProtocolPurificationModel;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class DefaultProtocolSyncFactoryImpl implements ProtocolSyncFactory{
private BaseDao lsy_baseDao = new BaseDao();
@Override
public void updateParserDeviceHostDto(ParserDeviceHostDto parserDeviceHostDto) {
}
@Override
public void updateParserDeviceInfoDtoList(List<ParserDeviceInfoDto> parserDeviceInfoDtoList) {
}
@Override
public void updateDeviceSensorDataList(List<DeviceSensorData> deviceSensorDataList) {
}
@Override
public void updateLogDeviceOperationList(List<LogDeviceOperation> logDeviceOperationList) {
}
}
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-device-protocol-parser</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>lh-device-uyu</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-device-protocol-factory</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
... ...
package com.zhonglai.luhui.device.protocol.uyu.analysis;
import com.google.gson.JsonObject;
import com.ruoyi.common.utils.ByteUtil;
import com.zhonglai.luhui.device.analysis.comm.factory.Topic;
import com.zhonglai.luhui.device.analysis.util.TopicUtil;
import com.zhonglai.luhui.device.protocol.factory.analysis.ProtocolParserFactory;
public class UyuProtocolParserFactoryImpl implements ProtocolParserFactory<byte[]> {
private static final String topicModel = "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{imei}}/{{topicType}}";
@Override
public Topic analysisTopic(String topicStr) {
Topic topic = TopicUtil.initTopicFromModelStr(topicStr,topicModel);
return topic;
}
@Override
public JsonObject analysisPayload(Topic topic, byte[] payload) {
JsonObject jsonObject = new JsonObject();
int addres = 1; //应该用第3字节:地址(目前定为0x80),但是下位机没有用到 我们现在也不用就写死
JsonObject senserData = new JsonObject();
jsonObject.add(addres+"",senserData);
byte b5 = payload[4];
int dangwei = ByteUtil.getBitsValue(b5,0,3);
int kaiguan = ByteUtil.getBitValue(b5,7);
int zhuanshu = new Long(ByteUtil.bytesToLongDESC(payload,5,2)).intValue();
int dianliu = new Long(ByteUtil.bytesToLongDESC(payload,7,2)).intValue();
int dianya = new Long(ByteUtil.bytesToLongDESC(payload,9,2)).intValue();
int gonglv = new Long(ByteUtil.bytesToLongDESC(payload,11,2)).intValue();
//第14字节:故障码 (转成2进制对应下表)
byte b14 = payload[13];
int qianyabaoh = ByteUtil.getBitValue(b14,0);
int duanlubaohu = ByteUtil.getBitValue(b14,1);
int qidongyichangbaohu = ByteUtil.getBitValue(b14,2);
int guowenbaohu = ByteUtil.getBitValue(b14,2);
int qiexiangbaohu = ByteUtil.getBitValue(b14,2);
jsonObject.addProperty("106",dangwei);
jsonObject.addProperty("3",kaiguan);
jsonObject.addProperty("67",zhuanshu);
jsonObject.addProperty("61",dianliu);
jsonObject.addProperty("57",dianya);
jsonObject.addProperty("59",gonglv);
jsonObject.addProperty("107",qianyabaoh);
jsonObject.addProperty("108",duanlubaohu);
jsonObject.addProperty("109",qidongyichangbaohu);
jsonObject.addProperty("110",guowenbaohu);
jsonObject.addProperty("111",qiexiangbaohu);
return jsonObject;
}
}
... ...
package com.zhonglai.luhui.device.protocol.uyu.control;
import com.zhonglai.luhui.device.protocol.factory.control.ControlFactory;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(consumerGroup = "lh-device-uyu", topic = "control-device-uyu",selectorType = SelectorType.TAG,selectorExpression = "1",messageModel = MessageModel.BROADCASTING)
public class UyuControlFactoryImpl extends ControlFactory {
@Override
protected String analysisMessageKey() {
return null;
}
@Override
protected Boolean transmitMessage(String str) {
return null;
}
@Override
protected long getOperationTime() {
return 10000;
}
}
... ...
... ... @@ -16,12 +16,12 @@ import com.zhonglai.luhui.device.protocol.xinjie.analysis.topic.WriteData;
import com.zhonglai.luhui.device.protocol.xinjie.analysis.topic.WriteReply;
import com.zhonglai.luhui.device.protocol.xinjie.dto.Variant;
import com.zhonglai.luhui.device.protocol.xinjie.dto.XinJieDto;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class ProtocolParserServiceImpl implements ProtocolParserFactory<byte[]> {
private static final String topicModel = "/{{roleid}}/{{username}}/{{password}}/{{payloadtype}}/{{clientid}}/{{topicType}}";
... ...
... ... @@ -17,7 +17,6 @@ import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class LsyProtocolSyncFactoryImpl implements ProtocolSyncFactory {
private BaseDao lsy_baseDao = new BaseDao(new LsyDBFactoryImp());
... ...
... ... @@ -24,6 +24,8 @@
<module>lh-device-xinjie</module>
<module>lh-device-protocol-factory</module>
<module>lh-device-http-public</module>
<module>lh-device-modbus</module>
<module>lh-device-uyu</module>
</modules>
<description>
设备协议解析工厂
... ...
... ... @@ -116,10 +116,6 @@
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-device-protocol-factory</artifactId>
</dependency>
<dependency>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-device-http-public</artifactId>
</dependency>
</dependencies>
<build>
... ...
... ... @@ -36,6 +36,7 @@ public class ShutdownManager
{
logger.info("====关闭后台任务任务线程池====");
asyncManager.shutdown();
logger.info("====关闭成功====");
}
catch (Exception e)
{
... ...
... ... @@ -28,8 +28,8 @@ public class HttpCallback extends BaseCallback<JsonObject> {
{
try {
HttpCallback.super.messageArrived(imei,"/"+role+"/"+username+"/"+imei+"/Json",data);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} catch (Exception e) {
log.error(imei+"解析数据失败",e);
}
// asyncManager.execute(new TimerTask() {
// @Override
... ...
... ... @@ -141,10 +141,6 @@
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-jar-device-service</artifactId>
</dependency>
... ... @@ -152,10 +148,15 @@
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-device-protocol-factory</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.zhonglai.luhui</groupId>-->
<!-- <artifactId>lh-device-xinjie</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-device-xinjie</artifactId>
<artifactId>lh-device-modbus</artifactId>
</dependency>
</dependencies>
<build>
... ...
... ... @@ -31,8 +31,23 @@ public class MqttServiceListenApplication {
public static void main(String[] args) {
log.info("启动服务");
System.setProperty("RunInIDEA",checkRunInIDEA());
SpringApplicationBuilder builder = new SpringApplicationBuilder(MqttServiceListenApplication.class);
builder.run( args);
}
/**
* 判断是否是idea里面启动
* @return true:是 false:否
*/
private static String checkRunInIDEA() {
try {
Class.forName("com.intellij.rt.execution.application.AppMainV2");
return "1";
} catch (ClassNotFoundException ignored) {
return null;
}
}
}
... ...
package com.zhonglai.luhui.mqtt.service.proxy.comm.service;
import com.ruoyi.common.utils.StringUtils;
import com.zhonglai.luhui.device.analysis.comm.config.SysParameter;
import com.zhonglai.luhui.device.protocol.factory.service.impl.DefaultDbService;
import org.eclipse.paho.client.mqttv3.MqttClient;
... ... @@ -13,6 +14,8 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
... ... @@ -34,13 +37,6 @@ public class TerminalService {
@Autowired
private SysParameter sysParameter;
//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50000));
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.clientId}")
... ... @@ -49,6 +45,8 @@ public class TerminalService {
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.textTopic:null}")
private String textTopic;
private MqttClient mqttclient;
... ... @@ -84,6 +82,12 @@ public class TerminalService {
public List<String> getCompletionTopics()
{
if ("1".equals(System.getProperty("RunInIDEA")) && StringUtils.isNotEmpty(textTopic))
{
List<String> list = new ArrayList<>();
list.add(textTopic);
return list;
}
return dbService.getTopicFromRole();
}
... ... @@ -99,4 +103,13 @@ public class TerminalService {
log.info("-----------订阅{}成功--------------------",topics);
}
@PreDestroy
public void stop() throws MqttException {
if(null != mqttclient)
{
mqttclient.disconnect();
mqttclient.close();
}
}
}
... ...
... ... @@ -27,4 +27,5 @@ mqtt:
password: "!@#1qaz"
client:
#客户端操作时间
operationTime: 10
\ No newline at end of file
operationTime: 10
textTopic: "/2/6_WP/866520067451609/#"
\ No newline at end of file
... ...
## /{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/ 由代理服务器做权限控制,终端不需要传递
## 终端订阅
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/PUT/+ 写数据,需要返回执行结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/GET_REQ/+ 获取数据的返回结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/READ/+ 读数据,需要返回执行结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/PUT/+ 写数据,需要返回执行结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/GET_REQ/+ 获取数据的返回结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/READ/+ 读数据,需要返回执行结果
## 终端发布
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/ALL_POST/+ 全量上报数据,不需要返回
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/ADD_POST/+ 增量上报数据,不需要返回
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/PUT_REQ/+ 写数据的执行结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/GET/+ 获取数据
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/READ_REQ/+ 读数据的执行结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/ALL_POST/+ 全量上报数据,不需要返回
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/ADD_POST/+ 增量上报数据,不需要返回
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}//PUT_REQ/+ 写数据的执行结果
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/GET/+ 获取数据
/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/READ_REQ/+ 读数据的执行结果
##代理通知
/${{roleid}}/${{username}}/${{clientid}}/online 上下线通知
... ...
... ... @@ -12,6 +12,7 @@ import org.springframework.context.annotation.ComponentScan;
@ComponentScan(basePackages = {
"com.zhonglai.luhui.device.analysis",
"com.zhonglai.luhui.redis.service",
"com.zhonglai.luhui.mqtt.service",
"com.zhonglai.luhui.mqtt.comm.service",
"com.zhonglai.luhui.mqtt.comm.rocketMq",
... ...
... ... @@ -147,7 +147,7 @@ public class MqttDeviceService extends DeviceService{
ThingsModelItemBase thingsModelItemBase = ThingsModelItemBase.newhingsModelItem(Enum.valueOf(ThingsModelDataTypeEnum.class,data_type),thingsModel,gsonobject.get(skey));
jsonObject.put(skey,thingsModelItemBase.getCmdView(object));
thingsModelItemBase.setValue(thingsModelItemBase.getCmdView(object));
// thingsModelItemBase.setValue(thingsModelItemBase.getCmdView(object));
String id = clienid+"_"+key;
logDeviceOperationList.add(dviceLogService.newLogDeviceOperation(id,thingsModelItemBase.getSaveView(),null,"远程控制"+thingsModelItemBase.getName()+"为"+thingsModelItemBase.getView(),null));
... ...
... ... @@ -2,6 +2,9 @@ package com.zhonglai.luhui.mqtt.service.topic;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonElement;
import com.ruoyi.common.utils.GsonConstructor;
import com.zhonglai.luhui.device.analysis.comm.dto.thingsmodels.ThingsModelItemBase;
import com.zhonglai.luhui.device.analysis.comm.service.BusinessDataUpdateService;
import com.zhonglai.luhui.device.analysis.dto.topic.AddPostDto;
import com.zhonglai.luhui.device.domain.IotThingsModel;
... ... @@ -71,11 +74,13 @@ public class ReadReqTopic implements BusinessAgreement<ReadReqDto> {
{
data_type = ThingsModelDataTypeEnum.STRING.name();
}
Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,data_type).getaClass();
ThingsModelBase thingsModelBase = JSON.parseObject(thingsModel.getSpecs(),aClass);
thingsModelBase.conversionThingsModel(thingsModel);
ThingsModelBase thingsModelBase = ThingsModelItemBase.newhingsModelItem(Enum.valueOf(ThingsModelDataTypeEnum.class,data_type),thingsModel, GsonConstructor.get().fromJson(jsData.get(key).toString(), JsonElement.class));
// Class<ThingsModelBase> aClass = Enum.valueOf(ThingsModelDataTypeEnum.class,data_type).getaClass();
// ThingsModelBase thingsModelBase = JSON.parseObject(thingsModel.getSpecs(),aClass);
// thingsModelBase.conversionThingsModel(thingsModel);
//
// thingsModelBase.addValue(jsData.get(key));
thingsModelBase.addValue(jsData.get(key));
jsData.put(key,thingsModelBase);
}
vjsonObject.put(vkey,jsData);
... ...
... ... @@ -49,7 +49,7 @@ mqtt:
mqtt_usernames: 6_WP
#订阅的topic
topics: ADD_POST,ALL_POST,DB_TOPIC_DISTRIBUTE,GET/+,online,PUT_REQ/+,READ_REQ/+
sub_clientid: '+'
sub_clientid: '863482065281251'
topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/{{messageid}}"
top_return_map: '{"PUT":"PUT_REQ","READ":"READ_REQ"}'
username: sysuser
... ...
... ... @@ -31,6 +31,7 @@
<module>lh-http-service-proxy</module>
<module>lh-mqtt-service-listen</module>
<module>lh-device-protocol-parser</module>
<module>lh-device-operation-service</module>
</modules>
<properties>
... ...
... ... @@ -373,6 +373,12 @@
</dependency>
<dependency>
<groupId>com.zhonglai.luhui</groupId>
<artifactId>lh-device-modbus</artifactId>
<version>${ruoyi.version}</version>
</dependency>
<dependency>
<groupId>com.zhonglai</groupId>
<artifactId>ServiceDao</artifactId>
<version>1.4.3</version>
... ...