作者 钟来

Default Changelist

... ... @@ -167,4 +167,7 @@
{
"1":"id1,id2,id3",
"0":"id1,id2,id3"
}
\ No newline at end of file
}
3、订阅主题 POST_REQ/+,下位机定时提交数据时,上位机返回数据解析情况
payload返回一个int数据:0 topic解析为空,不做消息解析;1 设备不存在,不做解析;2 消息正常解析;3 消息异常
\ No newline at end of file
... ...
... ... @@ -13,4 +13,5 @@ public interface ThingsModelBase<T> {
String getView();
String getSaveView();
Object getCmdView(Object value);
boolean checkValue();
}
... ...
... ... @@ -38,4 +38,9 @@ public class ArrayModelOutput extends ThingsModelItemBase<JSONArray>
public Object getCmdView(Object object) {
return object;
}
@Override
public boolean checkValue() {
return true;
}
}
... ...
... ... @@ -42,4 +42,9 @@ public class BoolModelOutput extends ThingsModelItemBase<Boolean>
public Object getCmdView(Object object) {
return object;
}
@Override
public boolean checkValue() {
return true;
}
}
... ...
... ... @@ -43,4 +43,14 @@ public class DecimalModelOutput extends ThingsModelItemBase<BigDecimal>
public Object getCmdView(Object object) {
return object;
}
@Override
public boolean checkValue() {
BigDecimal bigDecimal = getValue();
if(null != bigDecimal && bigDecimal.compareTo(min)>=0 && bigDecimal.compareTo(max)<=0)
{
return true;
}
return false;
}
}
... ...
... ... @@ -67,6 +67,11 @@ public class EnumModelOutput extends ThingsModelItemBase<Object>
return conversion(object,clas);
}
@Override
public boolean checkValue() {
return true;
}
private static Object conversion(Object data ,String clas) {
try {
return stringToTarget(String.valueOf(data),Class.forName(clas));
... ...
... ... @@ -57,4 +57,19 @@ public class IntegerModelOutput extends ThingsModelItemBase<Integer>
BigDecimal bigDecimal = new BigDecimal(object.toString());
return bigDecimal.multiply(new BigDecimal(acy)).intValue();
}
@Override
public boolean checkValue() {
if(null != getValue() )
{
BigDecimal bigDecimal = new BigDecimal(getValue());
if(bigDecimal.compareTo(min)>=0 && bigDecimal.compareTo(max)<=0)
{
return true;
}
}
return false;
}
}
... ...
... ... @@ -7,7 +7,7 @@ import lombok.Data;
* 字符串可以适配所有
*/
@Data
public class StringModelOutput extends ThingsModelItemBase<Object>
public class StringModelOutput extends ThingsModelItemBase<String>
{
private int maxLength;
... ... @@ -30,4 +30,13 @@ public class StringModelOutput extends ThingsModelItemBase<Object>
public Object getCmdView(Object object) {
return object;
}
@Override
public boolean checkValue() {
if(null != getValue() && getValue().length()<maxLength)
{
return true;
}
return false;
}
}
... ...
... ... @@ -97,6 +97,10 @@ public class DataModeAnalysisService {
thingsModelBase.addValue(jsData.get(key));
if(!thingsModelBase.checkValue())
{
continue;
}
ThingsModelItemBase thingsModelItemBase = (ThingsModelItemBase) thingsModelBase;
//记录数据日志
if(1==thingsModelItemBase.getIs_save_log() && null != serverDto.getDeviceSensorDataList())
... ...
... ... @@ -10,6 +10,7 @@ import com.zhonglai.luhui.mqtt.comm.factory.Topic;
import com.zhonglai.luhui.mqtt.comm.util.ByteUtil;
import com.zhonglai.luhui.mqtt.service.db.DeviceService;
import lombok.SneakyThrows;
import org.apache.commons.beanutils.BeanUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
... ... @@ -53,6 +54,9 @@ public class MqttCallback implements MqttCallbackExtended {
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
Topic desttopic = new Topic(s);
desttopic.setPayloadtype("POST_REQ");
//接收到消息
StringBuffer buffer = new StringBuffer();
buffer.append("topic:");
... ... @@ -65,30 +69,33 @@ public class MqttCallback implements MqttCallbackExtended {
buffer.append(ByteUtil.hexStringToSpace(ByteUtil.toHexString(mqttMessage.getPayload())));
buffer.append("\r\n");
buffer.append("\r\n");
Topic topic = new Topic(s);
if(null == topic)
{
log.error("消息{},topic为空,不做解析");
log.error("消息《"+s+"》解析为空 》》》内容:\r\n"+buffer.toString());
return;
}
//日志记录
log.info(buffer.toString());
//准备数据
byte[] data = mqttMessage.getPayload();
IotDevice iotDevice = deviceService.getDeviceById(topic.getClientid());
if(null == iotDevice)
{
log.info("设备{}不存在",topic.getClientid());
return;
}
if("ONLINE".equals(topic.getTopicType().toUpperCase()))
{
topic.setPayloadtype("String");
}
try {
Topic topic = new Topic(s);
if(null == topic)
{
log.error("消息{},topic为空,不做解析");
log.error("消息《"+s+"》解析为空 》》》内容:\r\n"+buffer.toString());
terminalService.publish(desttopic.generateSendMessageTopic(),"0");
return;
}
//日志记录
log.info(buffer.toString());
//准备数据
byte[] data = mqttMessage.getPayload();
IotDevice iotDevice = deviceService.getDeviceById(topic.getClientid());
if(null == iotDevice)
{
log.info("设备{}不存在",topic.getClientid());
terminalService.publish(desttopic.generateSendMessageTopic(),"1");
return;
}
if("ONLINE".equals(topic.getTopicType().toUpperCase()))
{
topic.setPayloadtype("String");
}
//转化为协议对象
BusinessDto businessDto = BusinessDtoClassNew.newBean(topic.getPayloadtype(),data).analyticalModel(iotDevice.getThings_model_value());
... ... @@ -106,8 +113,17 @@ public class MqttCallback implements MqttCallbackExtended {
//数据持久化
dataPersistenceService.persistence(topic,dto);
terminalService.publish(desttopic.generateSendMessageTopic(),"2");
log.info("{}payload解析完成",s);
} catch (Exception e) {
log.error(s+"消息解析异常",e);
try {
terminalService.publish(desttopic.generateSendMessageTopic(),"3");
} catch (MqttException ex) {
log.error(s+"消息解析异常时返回的执行结果消息异常",ex);
}
}
}
... ...
... ... @@ -45,10 +45,10 @@ mqtt:
clientId: ${random.uuid}
#公司id
roleid: 2
mqtt_usernames: 6_WP
mqtt_usernames: NWDB_2023
#订阅的topic
topics: ADD_POST,ALL_POST,DB_TOPIC_DISTRIBUTE,GET/+,online,PUT_REQ/+,READ_REQ/+
sub_clientid: '866569062386039'
sub_clientid: '+'
topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/{{messageid}}"
top_return_map: '{"PUT":"PUT_REQ","READ":"READ_REQ"}'
username: sysuser
... ...