作者 钟来

历史数据存储

driverClassName=com.mysql.cj.jdbc.Driver
url=jdbc:mysql://rm-wz9740un21f09iokuao.mysql.rds.aliyuncs.com:3306/liu_yu_le?useUnicode=true&characterEncoding=utf8&autoReconnect=true
username=luhui
password=Luhui586
#url=jdbc:mysql://rm-wz9740un21f09iokuao.mysql.rds.aliyuncs.com:3306/liu_yu_le?useUnicode=true&characterEncoding=utf8&autoReconnect=true
#username=luhui
#password=Luhui586
url=jdbc:mysql://192.168.31.133:3306/ly_sensor_data_2020?useUnicode=true&characterEncoding=utf8&autoReconnect=true
username = luhui
password = Luhui586
#\u6700\u5927\u8FDE\u63A5\u6570\u91CF
maxActive=100
#\u6700\u5927\u7A7A\u95F2\u8FDE\u63A5
... ...
... ... @@ -57,5 +57,84 @@
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<exclusions>
<exclusion>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</exclusion>
<exclusion>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
</exclusion>
<exclusion>
<groupId>com.squareup.okio</groupId>
<artifactId>okio</artifactId>
</exclusion>
<exclusion>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-parent</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<finalName>lh-data-file-service</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<!--
生成的jar中,不要包含pom.xml和pom.properties这两个文件
-->
<addMavenDescriptor>false</addMavenDescriptor>
<manifest>
<!--
是否要把第三方jar放到manifest的classpath中
-->
<addClasspath>true</addClasspath>
<!--
生成的manifest中classpath的前缀,因为要把第三方jar放到lib目录下,所以classpath的前缀是lib/
-->
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.zhonglai.luhui.data.file.service.service.DataService</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<!-- The configuration of maven-assembly-plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptors>
<descriptor>${project.parent.parent.basedir}/configs/package.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
... ...
... ... @@ -2,6 +2,7 @@ package com.zhonglai.luhui.data.file.service.service;
import cn.hutool.core.io.IORuntimeException;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.core.io.file.FileWriter;
import cn.hutool.core.util.CharsetUtil;
import com.ruoyi.common.utils.DateUtils;
... ... @@ -14,6 +15,7 @@ import org.apache.commons.dbutils.BasicRowProcessor;
import org.apache.commons.dbutils.GenerousBeanProcessor;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.util.FileCopyUtils;
import java.io.BufferedReader;
import java.io.BufferedWriter;
... ... @@ -25,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 数据服务
*/
... ... @@ -40,56 +43,77 @@ public class DataService {
String yea = day.substring(0,4);
String tableName = "`ly_sensor_data_"+yea+"`.`device_sensor_data_"+day+"`";
List<DeviceSensorData> ct = baseDao.findBysql(DeviceSensorData.class,"SELECT device_info_id,data_type FROM "+tableName+" GROUP BY device_info_id,data_type");
if(null != ct && ct.size() != 0)
List<Map<String, Object>> list = baseDao.findBysql("SELECT COUNT(*) ct FROM "+tableName);
if(null != list && list.size() !=0 )
{
for (DeviceSensorData deviceSensorData:ct)
long ct = (long) list.get(0).get("ct");
long wancheng = 1;
long jd = 0;
int pagesize = 100000;
int pageNo = 1;
System.out.println(tableName);
List<DeviceSensorData> deviceSensorDataList = getDeviceSensorDataList(tableName,pagesize,pageNo++);
while (null != deviceSensorDataList && deviceSensorDataList.size() != 0 )
{
ScheduledUtil.scheduler.schedule(() -> {
String imei = deviceSensorData.getDevice_info_id().split("_")[0];
String deviceInfoId = deviceSensorData.getDevice_info_id();
String dataType = deviceSensorData.getData_type();
String deviceType = FileUtil.deviceTypeMap.get(deviceSensorData.getDevice_info_id().split("_")[0]);
List<DeviceSensorData> device_data_type_list = baseDao.findBysql(DeviceSensorData.class,"SELECT `creat_time`,`data_value` FROM "+tableName+" where device_info_id='"+deviceSensorData.getDevice_info_id()+"' and data_type='"+deviceSensorData.getData_type()+"'");
Statistics statistics = map.get(Integer.parseInt(yea));
if(null != device_data_type_list && device_data_type_list.size() != 0 )
for (DeviceSensorData deviceSensorData:deviceSensorDataList)
{
String baiduPath = FileUtil.createBaiduWangPanPat(yea,null==deviceSensorData.getDevice_model()?"device_model":deviceSensorData.getDevice_model(),deviceSensorData.getDevice_info_id().split("_")[0],deviceSensorData.getDevice_info_id(),deviceSensorData.getData_type());
File file = new File(FileUtil.tempFilePath+baiduPath);
saveDataFile(deviceSensorData,file);
long dqjd = (wancheng++*100)/ct;
if(dqjd>jd)
{
saveDataFile(device_data_type_list,imei,deviceInfoId,dataType,deviceType,yea,day);
map.put(Integer.parseInt(yea),statistics.add());
jd = dqjd;
}
System.out.println(yea+"年完成进度:"+statistics.getProgress());
},0, TimeUnit.SECONDS);
System.out.print("\r"+"完成进度:"+jd+"%");
for (int i=0;i<jd;i++)
{
System.out.print("#");
}
System.out.print("#");
}
deviceSensorDataList = getDeviceSensorDataList(tableName,pagesize,pageNo++);
}
System.out.println("完成");
}
}
private void saveDataFile(List<DeviceSensorData> device_data_type_list, String imei,String deviceInfoId,String dataType, String deviceType,String yea,String day)
private List<DeviceSensorData> getDeviceSensorDataList(String tableName,int pagesize,int pageNo)
{
List<DeviceSensorData> device_data_list = baseDao.findBysql(DeviceSensorData.class,"SELECT * FROM "+tableName+" limit "+((pageNo-1)*pagesize)+","+pagesize);
return device_data_list;
}
private void saveDataFile(DeviceSensorData deviceSensorData,File file)
{
String baiduPath = FileUtil.createBaiduWangPanPat(yea,null==deviceType?"device_model":deviceType,imei,deviceInfoId,dataType+"+"+day);
File file = new File(FileUtil.tempFilePath+baiduPath);
BufferedWriter bufferedWriter = null;
try {
bufferedWriter = FileWriter.create(file, CharsetUtil.CHARSET_UTF_8).getWriter(true);
if (null != bufferedWriter)
{
for (DeviceSensorData deviceSensorData:device_data_type_list)
{
StringBuffer line = new StringBuffer();
line.append(deviceSensorData.getCreat_time());
line.append(",");
line.append(deviceSensorData.getData_value());
bufferedWriter.write(line.toString());
//默认换行符
bufferedWriter.write(FileUtil.CRLF);
bufferedWriter.flush();
}
StringBuffer line = new StringBuffer();
line.append(deviceSensorData.getCreat_time());
line.append(",");
line.append(deviceSensorData.getData_value());
bufferedWriter.write(line.toString());
//默认换行符
bufferedWriter.write(FileUtil.CRLF);
bufferedWriter.flush();
}
} catch (IOException e) {
... ... @@ -99,33 +123,67 @@ public class DataService {
}
}
private static Map<Integer, Statistics> map = new HashMap<>();
public static void main(String[] args) {
FileUtil.initDeviceType();
DataService dataService = new DataService();
public void transformDataByDay(String dataday,int endyear)
{
Calendar calendar = Calendar.getInstance();
int year = 2020;
while (year<2024)
int startyear = Integer.parseInt(dataday.substring(0,4));
int month = Calendar.JANUARY;
if(dataday.length()>=6)
{
month = Integer.parseInt(dataday.substring(4,6))-1;
}
int date = 1;
if(dataday.length()>=8)
{
date = Integer.parseInt(dataday.substring(6,8));
}
while (startyear<endyear)
{
calendar.set(year,Calendar.JANUARY,1);
if (!map.containsKey(year))
calendar.set(startyear,month,date);
if (!map.containsKey(startyear))
{
Statistics statistics = new Statistics();
statistics.setTotal(calendar.getActualMaximum(Calendar.DAY_OF_YEAR));
statistics.setFinished(0);
map.put(year,statistics);
map.put(startyear,statistics);
}
while (calendar.get(Calendar.YEAR)==year)
while (calendar.get(Calendar.YEAR)==startyear)
{
String day = DateUtils.parseDateToStr("yyyyMMdd",calendar.getTime());
dataService.saveOneDayData(day);
// ScheduledUtil.scheduler.schedule(() ->,0,TimeUnit.SECONDS);
saveOneDayData(day);
calendar.add(Calendar.DAY_OF_MONTH, 1);
}
year++;
startyear++;
date = 1;
month = Calendar.JANUARY;
}
}
public void saveerr()
{
File file = new File(FileUtil.tempFilePath+"/errtable.txt");
File copyfile = new File(FileUtil.tempFilePath+"/errtable1.txt");
try {
FileCopyUtils.copy(file,copyfile);
} catch (IOException e) {
throw new RuntimeException(e);
}
file.delete();
List<String> list= FileReader.create(copyfile).readLines();
for (String tablename:list)
{
String day = tablename.substring(tablename.lastIndexOf("_")+1,tablename.lastIndexOf("`"));
saveOneDayData(day);
}
}
private static Map<Integer, Statistics> map = new HashMap<>();
public static void main(String[] args) {
DataService dataService = new DataService();
dataService.transformDataByDay("20230117",2024);
}
}
... ...
package com.zhonglai.luhui.data.file.service.service;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.Bucket;
import com.influxdb.client.domain.DeletePredicateRequest;
import com.influxdb.client.domain.Query;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.common.utils.GsonConstructor;
import com.zhonglai.dao.BaseDao;
import com.zhonglai.luhui.data.file.service.dto.DeviceSensorData;
import com.zhonglai.luhui.data.file.service.util.InfluxDBFluxExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InfluxDB2Service {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static BaseDao baseDao = new BaseDao(new DataDBFactoryImp());
private static final String token = "YjJgRuCDnypQV4pHlzoixvdoiv237ybVvZ8zzOBfLdbXPbzmYYRi2uWGzXONqqLllhVq3wm03lOF2pl0e3uQHQ==";
private static final String orgID = "dfed6796541746a2";
private static final String org = "luhui";
private static final String url = "http://192.168.31.133:8086";
private Map<String,String> bucketMap= new HashMap<>();
{
InfluxDBClient influxDBClient = connect();
List<Bucket> list = influxDBClient.getBucketsApi().findBucketsByOrgName(org);
close(influxDBClient);
if(null != list && list.size() != 0)
{
for (Bucket bucket:list)
{
bucketMap.put(bucket.getName(),bucket.getId());
}
}
}
/**
* 建立连接
* @return
*/
private InfluxDBClient connect()
{
return connect(null);
}
/**
* 建立连接
* @return
*/
private InfluxDBClient connect(Integer writeTimeOut )
{
String connectUrl = url;
if(null != writeTimeOut)
{
connectUrl = url + "?writeTimeout="+writeTimeOut;
}
InfluxDBClient client = InfluxDBClientFactory.create(connectUrl, token.toCharArray());
return client;
}
/**
* 关闭连接
* @param client
*/
private void close(InfluxDBClient client)
{
if(null != client)
{
client.close();
}
}
/**
* 检测存储桶是否存在
* @param bucketName
* @return
*/
private boolean checkAndCreateBucket( String bucketName) {
if (null != bucketMap.get(bucketName) && bucketMap.containsKey(bucketName))
{
return true;
}
InfluxDBClient client = connect();
Bucket buckets = client.getBucketsApi().findBucketByName(bucketName);
if (null == buckets) {
// 没有设置任何保留策略,所以这个bucket的数据将被永久保留
buckets = client.getBucketsApi().createBucket(bucketName,orgID);
logger.info("Bucket 创建成功: " + buckets.getName());
}
close(client);
bucketMap.put(bucketName,buckets.getId());
return true;
}
private Point deviceSensorDataToPoint( DeviceSensorData deviceSensorData)
{
String[] separationstr = separationDeviceInfoId(deviceSensorData.getDevice_info_id());
Point point = Point.measurement(separationstr[0])
.addTag("sn",separationstr[1])
.addTag("type",deviceSensorData.getData_type())
.time(deviceSensorData.getCreat_time(),WritePrecision.S)
;
if(isInteger(deviceSensorData.getData_value()))
{
point.addField("value",Double.parseDouble(deviceSensorData.getData_value()));
}else if(isDecimal(deviceSensorData.getData_value()))
{
point.addField("value",Double.parseDouble(deviceSensorData.getData_value()));
}
else {
point .addField("value",deviceSensorData.getData_value());
}
return point;
}
private String[] separationDeviceInfoId(String deviceInfoId)
{
int i = deviceInfoId.indexOf("_");
String measurementName = "";
String baseTag = "";
if(i>0)
{
measurementName = deviceInfoId.substring(0,i);
baseTag = deviceInfoId.substring(i+1,deviceInfoId.length());
}else {
measurementName = deviceInfoId;
}
return new String[]{measurementName,baseTag};
}
public static boolean isInteger(String input) {
Pattern pattern = Pattern.compile("^[-+]?\\d+$");
Matcher matcher = pattern.matcher(input);
return matcher.matches();
}
public static boolean isDecimal(String input) {
Pattern pattern = Pattern.compile("^[-+]?\\d+\\.\\d+$");
Matcher matcher = pattern.matcher(input);
return matcher.matches();
}
public void writeData( String bucket,List<DeviceSensorData> dataList)
{
if(null == dataList && dataList.size()==0)
{
return ;
}
InfluxDBClient influxDBClient = connect(60000);
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
List<Point> saveList = new ArrayList<>();
for (DeviceSensorData deviceSensorData:dataList)
{
Point data = deviceSensorDataToPoint(deviceSensorData);
saveList.add(data);
}
writeApi.writePoints(bucket, orgID, saveList);
close(influxDBClient);
}
private void mysqlToInfluxDB(String databaseName, String tableName)
{
logger.info("开始时间:"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
long time = System.currentTimeMillis();
List<Map<String, Object>> device_modellist = baseDao.findBysql("SELECT DISTINCT device_model FROM "+databaseName+"."+tableName);
logger.info("查询设备类型用时:"+(System.currentTimeMillis()-time)/1000+"s");
if(null != device_modellist && device_modellist.size() != 0)
{
for (Map<String, Object> map:device_modellist)
{
String device_model = map.get("device_model")+"";
checkAndCreateBucket(device_model);
String sql = "select * from "+databaseName+"."+tableName +" where device_model='"+device_model+"'";
int pageNo = 1;
int pageSize = 10000;
time = System.currentTimeMillis();
List<DeviceSensorData> list = baseDao.findBysql(DeviceSensorData.class,sql+getlimit(pageNo++,pageSize));
writeData(device_model,list);
logger.info("处理第"+(pageNo-1)+"页时间用时:"+(System.currentTimeMillis()-time)/1000+"s");
while (null != list && list.size()>0)
{
time = System.currentTimeMillis();
list = baseDao.findBysql(DeviceSensorData.class,sql+getlimit(pageNo++,pageSize));
writeData(device_model,list);
logger.info("处理第"+(pageNo-1)+"页时间用时:"+(System.currentTimeMillis()-time)/1000+"s");
}
}
}
logger.info("结束时间:"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
public void synchronousMysqlToInfluxDBByTime(String time)
{
Calendar calendar = Calendar.getInstance();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat( "yyyyMMdd");
try {
calendar.setTime(simpleDateFormat.parse(time));
} catch (ParseException e) {
throw new RuntimeException(e);
}
while (System.currentTimeMillis()-calendar.getTime().getTime()>=86400000) //一天前
{
Integer yea = calendar.get(Calendar.YEAR);
String day = simpleDateFormat.format(calendar.getTime());
String databaseName = "`ly_sensor_data_"+yea+"`";
String tableName = "`device_sensor_data_"+day+"`";
logger.info(databaseName+"."+tableName);
mysqlToInfluxDB(databaseName,tableName);
calendar.add(Calendar.DATE,1);
}
}
private String getlimit(int pageNo,int pageSize)
{
String limint = " limit " + (pageNo - 1) * pageSize + "," + pageSize;
logger.info(limint);
return limint;
}
public String getSenserData(String deviceInfoId,String dataType,Integer startTime,Integer endTime)
{
InfluxDBClient influxDBClient = connect(60000);
QueryApi queryApi = influxDBClient.getQueryApi();
String[] separationstr = separationDeviceInfoId(deviceInfoId);
List<FluxTable> list = queryApi.query("select * from `"+separationstr[0]+"` where sn='"+separationstr[1]+"' and type='"+dataType+"' and time>="+startTime*1000l*1000l*1000l+" and time<"+endTime*1000l*1000l*1000l,org);
close(influxDBClient);
return "";
}
/**
* 查询所有数据库
* @return
*/
public List<Bucket> findBuckets()
{
InfluxDBClient influxDBClient = connect();
List<Bucket> buckets = influxDBClient.getBucketsApi().findBuckets();
close(influxDBClient);
return buckets;
}
/**
* 查询数据库下所有表
* @param bucket
* @return
*/
public List<FluxTable> findMeasurements(String bucket)
{
InfluxDBClient influxDBClient = connect();
List<FluxTable> tables = influxDBClient.getQueryApi().query("import \"influxdata/influxdb/v1\"\n"
+ "v1.measurements(bucket:\""+bucket+"\")", org);
close(influxDBClient);
return tables;
}
public void queryMeasurementDataWithPaging(String bucket,String measurement, int limit, int offset) {
InfluxDBClient influxDBClient = connect();
QueryApi queryApi = influxDBClient.getQueryApi();
String flux = String.format("from(bucket:\""+bucket+"\") |> range(start: -5y) |> filter(fn: (r) => r._measurement == \"%s\") |> last() ", measurement, limit, offset);
List<FluxTable> tables = queryApi.query(flux, org);
for (FluxTable table : tables) {
List<FluxRecord> records = table.getRecords();
for (FluxRecord record : records) {
System.out.println(record.getTime() + ": " + record.getValue());
}
}
close(influxDBClient);
}
/**
* 带时区信息的UTC格式
*/
public static final String UTC_ZONE_FORMATER = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
/**
*查询数据
*/
public void select(String bucketName, String tableName,Integer start,Integer stop){
InfluxDBClient influxDBClient = connect();
StringBuffer stringBuilder = new StringBuffer();
InfluxDBFluxExpression.appendCommonFlux(stringBuilder, bucketName, tableName, Instant.ofEpochMilli(start*1000l).toString(), Instant.ofEpochMilli(stop*1000l).toString());
// InfluxDBFluxExpression.appendTagFlux(stringBuilder, map.get("sn").toString());
// InfluxDBFluxExpression.appendTimeShiftFlux(stringBuilder);
logger.info("查询sql :{}", stringBuilder.toString());
// 通过时间分组 查询时间段的数据
List<FluxTable> tables = influxDBClient.getQueryApi().query(stringBuilder.toString(),org);
List<Map<String, Object>> list = new ArrayList<>();
for (FluxTable table : tables) {
List<FluxRecord> records = table.getRecords();
for (FluxRecord record : records) {
logger.info("{}---{}---{}---{}", record.getMeasurement(),record.getField(),record.getValue(),record.getTime());
}
}
close(influxDBClient);
}
public void queryData(String bucket,String tableName,Integer start, Integer stop) {
InfluxDBClient influxDBClient = connect();
String predicate = "_measurement=\""+tableName+"\"";
QueryApi queryApi = influxDBClient.getQueryApi();
String query = String.format("from(bucket:\"%s\") " +
"|> range(start: %s, stop: %s) " +
"|> filter(fn: (r) => %s)", bucket, OffsetDateTime.ofInstant(Instant.ofEpochMilli(start*1000l), ZoneOffset.UTC), OffsetDateTime.ofInstant(Instant.ofEpochMilli(stop*1000l), ZoneOffset.UTC), predicate);
logger.info("查询:"+query);
List<FluxTable> tables = queryApi.query(query, org);
for (FluxTable table : tables) {
List<FluxRecord> records = table.getRecords();
for (FluxRecord record : records) {
System.out.println(record.getTime() + ": " + record.getValue());
}
}
close(influxDBClient);
}
/**
* 删除数据
*/
public void delete(String bucketName, String tableName,Integer start,Integer stop) {
InfluxDBClient influxDBClient = connect();
String predicate = "_measurement=\""+tableName+"\"";
influxDBClient.getDeleteApi().delete( OffsetDateTime.ofInstant(Instant.ofEpochMilli(start*1000l), ZoneOffset.UTC),
OffsetDateTime.ofInstant(Instant.ofEpochMilli(stop*1000l), ZoneOffset.UTC),
predicate,bucketName, org);
close(influxDBClient);
}
public static void main(String[] args) {
InfluxDB2Service influxDB2Service = new InfluxDB2Service();
// influxDB2Service.delete("6_W","865501049001200",1580918400,1581091200);
influxDB2Service.select("6_W","865501049001200",1580745600,1580832000);
}
}
... ...
package com.zhonglai.luhui.data.file.service.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class MysqlBinlogService {
public static void main(String[] args) {
// 这里的账号必须要有权限访问
BinaryLogClient client = new BinaryLogClient("rm-wz9740un21f09iokuao.mysql.rds.aliyuncs.com", 3306, "luhui", "Luhui586");
// 反序列化配置
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
// EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
// 设置反序列化配置
client.setEventDeserializer(eventDeserializer);
// 设置自己的client作为服务器的id
client.setServerId(3);
// 可选,设置start fileName+position
// client.setBinlogFilename("master-bin.000080");
// client.setBinlogPosition(219);
client.registerEventListener(event -> {
EventData data = event.getData();
String tableName;
if (data instanceof TableMapEventData) {
System.out.println("Table:");
TableMapEventData tableMapEventData = (TableMapEventData) data;
System.out.println(tableMapEventData.getTableId() + ": [" + tableMapEventData.getDatabase() + "." + tableMapEventData.getTable() + "]");
tableName = tableMapEventData.getTable();
// 如果是不处理的表,那么返回
if (!Objects.equals(tableName, "student"))
return;
}
if (data instanceof UpdateRowsEventData) {
// System.out.println("Update:");
// System.out.println(data);
// 获取对应的操作对象的json化数据
UpdateRowsEventData udata = (UpdateRowsEventData) data;
List<Map.Entry<Serializable[], Serializable[]>> rows = udata.getRows();
for (Map.Entry<Serializable[], Serializable[]> row : rows) {
List<Serializable> entries = Arrays.asList(row.getValue());
JSONObject dataObject = getDataObject(entries);
System.out.println(dataObject);
}
} else if (data instanceof WriteRowsEventData) {
WriteRowsEventData wData = new WriteRowsEventData();
wData.getIncludedColumns();
wData.getRows();
System.out.println("Insert:");
System.out.println(data);
} else if (data instanceof DeleteRowsEventData) {
System.out.println("Delete:");
System.out.println(data);
}
});
try {
client.connect();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 根据message获取对象
*/
private static JSONObject getDataObject(List<Serializable> message) {
JSONObject resultObject = new JSONObject();
String format = "{\"id\":\"0\",\"name\":\"1\",\"age\":\"2\",\"code\":\"3\"}";
JSONObject json = JSON.parseObject(format);
for (String key : json.keySet()) {
resultObject.put(key, message.get(json.getInteger(key)));
}
return resultObject;
}
}
... ...
... ... @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
public class FileUtil {
private static String LvLianFilePath = "D:/data/ly_sensor_data";
private static String BaiDuWangPanFilePath = "/禄辉/ly_sensor_data";
public static String tempFilePath = "F:/data";
public static String tempFilePath = System.getProperty("user.dir")+"/data";
public static String CRLF = "\r\n";
/**
* 读取绿联云的文件
... ...
package com.zhonglai.luhui.data.file.service.util;
import org.apache.commons.lang3.StringUtils;
import java.util.*;
import java.util.Map.Entry;
/**
* @Description:
* @author: skies
* @date:
*/
public class InfluxDBFluxExpression {
/**
* 通用表达式
*
* @param buffer
* @param bucketName 名称
* @param tableName 表名
* @param start 开始时间
* @param stop 结束时间
*/
public static void appendCommonFlux(StringBuffer buffer, String bucketName, String tableName,
String start, String stop) {
appendBucketFlux(buffer, bucketName);
appendTimeRangeFlux(buffer, start, stop);
appendTableFlux(buffer, tableName);
// if(timestampFlag) {
// appendTimestampFlux(buffer);
// }
// if(dropDefaultFlag) {
// appendDropFlux(buffer);
// }
}
/**
* 数据源(桶)表达式
*
* @param buffer
* @param bucketName 名称
*/
public static void appendBucketFlux(StringBuffer buffer, String bucketName) {
buffer.append("from(bucket: \"" + bucketName + "\") ");
}
/**
* 表名表达式
*
* @param buffer
* @param tableName 名称
*/
public static void appendTableFlux(StringBuffer buffer, String tableName) {
buffer.append("|> filter(fn: (r) => r._measurement == \"" + tableName + "\") ");
}
/**
* 表名表达式
*
* @param buffer
* @param tag 名称
*/
public static void appendTagFlux(StringBuffer buffer, String tag) {
buffer.append("|> filter(fn: (r) => r.tag == \"" + tag + "\") ");
}
/**
* field表达式
*
* @param buffer
* @param field 名称
*/
public static void appendTagField(StringBuffer buffer, String field) {
buffer.append("|> filter(fn: (r) => r._field == \"" + field + "\") ");
}
/**
* 时间范围表达式 UTC时间
*
* @param buffer
* @param start 开始时间
* @param stop 结束时间
*/
public static void appendTimeRangeFlux(StringBuffer buffer, String start, String stop) {
if (StringUtils.isBlank(start)) {
start = "1970-01-01T00:00:00.000Z";
}
if (StringUtils.isBlank(stop)) {
buffer.append("|> range(start:" + start + ") ");
} else {
buffer.append("|> range(start:" + start + ", stop:" + stop + ") ");
}
}
/**
* 删除字段表达式
*
* @param buffer
* @param args 需要删除的字段【 参数为空的话删除host字段】
*/
public static void appendDropFlux(StringBuffer buffer, String... args) {
if (args.length == 0) {
buffer.append("|> drop(columns: [\"host\"]) ");
return;
}
buffer.append("|> drop(columns: [");
for (int i = 0; i < args.length; i++) {
if (i != 0) {
buffer.append(",");
}
buffer.append("\"" + args[i] + "\"");
}
buffer.append("]) ");
}
/**
* 复制属性列表达式
*
* @param buffer
* @param oldField 原来的字段名称
* @param newField 新的字段名称
*/
public static void appendDuplicateFlux(StringBuffer buffer, String oldField, String newField) {
buffer.append("|> duplicate(column: \"" + oldField + "\", as: \"" + newField + "\") ");
}
/**
* 重命名属性列表达式
*
* @param buffer
* @param oldField 原来的字段名称
* @param newField 新的字段名称
*/
public static void appendRenameFlux(StringBuffer buffer, String oldField, String newField) {
buffer.append(" |> rename(columns: {" + oldField + ": \"" + newField + "\"}) ");
}
/**
* 最新一条数据表达式
*
* @param buffer
*/
public static void appendLastFlux(StringBuffer buffer) {
buffer.append("|> last() ");
}
/**
* 分页查询
*
* @param buffer
* @param n
* @param offset
*/
public static void appendLimitFlux(StringBuffer buffer, int n, int offset) {
buffer.append("|> limit(n:" + n + ", offset: " + offset + ") ");
}
/**
* 分组表达式
*
* @param buffer
*/
public static void appendGroupFlux(StringBuffer buffer, String... columns) {
if (columns.length == 0) {
buffer.append("|> group() ");
} else {
buffer.append("|> group(columns:[ ");
for (int i = 0; i < columns.length; i++) {
if (i != 0) {
buffer.append(",");
}
buffer.append("\"" + columns[i] + "\"");
}
buffer.append("]) ");
}
}
/**
* 去重表达式
*
* @param buffer
*/
public static void appendDistinctFlux(StringBuffer buffer, String... columns) {
if (columns.length == 0) {
buffer.append("|> distinct() ");
} else {
buffer.append("|> distinct(column:\"" + columns[0] + "\") ");
}
}
/**
* 总数表达式
*
* @param buffer
*/
public static void appendCountFlux(StringBuffer buffer) {
buffer.append("|> count() ");
}
/**
* 前几条数据
*
* @param buffer
* @param n
*/
public static void appendTopFlux(StringBuffer buffer, int n) {
buffer.append("|> top(n:" + n + ") ");
}
public static void appendBottomFlux(StringBuffer buffer, int n) {
buffer.append("|> bottom(n:" + n + ") ");
}
/**
* 排序
*
* @param buffer
* @param descFlag true 降序 ;false 升序
* @param columns
*/
public static void appendSortFlux(StringBuffer buffer, boolean descFlag, String... columns) {
if (columns.length == 0) {
buffer.append("|> sort(columns: [\"_value\"], desc: " + descFlag + ")");
} else {
buffer.append("|> sort(columns:[ ");
for (int i = 0; i < columns.length; i++) {
if (i != 0) {
buffer.append(",");
}
buffer.append("\"" + columns[i] + "\"");
}
buffer.append("], desc: " + descFlag + ") ");
}
}
/**
* 时移八小时
*
* @param buffer
*/
public static void appendTimeShiftFlux(StringBuffer buffer) {
buffer.append("|> timeShift(duration: 8h) ");
}
/**
* 过滤单个字符表达式
*
* @param buffer
* @param list
* @param operator 【== != 】
* @param join 【and or】
* @param fieldName
*/
public static void appendFilterFlux(StringBuffer buffer, List<String> list, String operator, String join, String fieldName) {
if (list == null || list.size() == 0) {
return;
}
for (int i = 0, size = list.size(); i < size; i++) {
if (i == 0) {
buffer.append("|> filter(fn: (r) =>");
} else {
buffer.append(join);
}
buffer.append(" r." + fieldName + " " + operator + " \"" + list.get(i) + "\" ");
}
buffer.append(") ");
}
/**
* 过滤表达式
*
* @param buffer
* @param map
* @param operator 【== != 】
* @param join 【and or】
*/
public static void appendFilterFlux(StringBuffer buffer, Map<String, Object> map, String operator, String join) {
Set<Entry<String, Object>> entrySet = map.entrySet();
Iterator<Entry<String, Object>> iterator = entrySet.iterator();
boolean flag = true;
while (iterator.hasNext()) {
Entry<String, Object> next = iterator.next();
String key = next.getKey();
Object value = next.getValue();
if (flag) {
buffer.append("|> filter(fn: (r) =>");
flag = false;
} else {
buffer.append(join);
}
buffer.append(" r." + key + " " + operator + " \"" + value + "\" ");
}
if (!flag) {
buffer.append(") ");
}
}
/**
* 过滤多个字段表达式
*
* @param buffer
* @param list
* @param innerJoin 【and or】
* @param operator 【== != 】
* @param outerJoin 【and or】
*/
public static void appendMulFilterFlux(StringBuffer buffer, List<Map<String, Object>> list, String innerJoin, String operator, String outerJoin) {
if (list == null || list.size() == 0) {
return;
}
buffer.append("|> filter(fn: (r) => ");
boolean outerFlag = true;
for (int i = 0; i < list.size(); i++) {
Map<String, Object> map = list.get(i);
Set<Entry<String, Object>> entrySet = map.entrySet();
Iterator<Entry<String, Object>> iterator = entrySet.iterator();
boolean innerFlag = true;
while (iterator.hasNext()) {
Entry<String, Object> next = iterator.next();
String key = next.getKey();
Object value = next.getValue();
if (innerFlag) {
if (outerFlag) {
outerFlag = false;
} else {
buffer.append(outerJoin);
}
buffer.append(" ( ");
innerFlag = false;
} else {
buffer.append(innerJoin);
}
buffer.append(" r." + key + " " + operator + " \"" + value + "\" ");
}
if (!innerFlag) {
buffer.append(" ) ");
}
}
buffer.append(" ) ");
}
/**
* 时间窗口统计
*
* @param buffer
* @param step 步长值【10m,1h,1d...】
* @param aggType 统计类型【sum,count,min,max...)
*/
public static void appendAggregateWindowFlux(StringBuffer buffer, String step, String aggType) {
buffer.append("|> aggregateWindow(every: " + step + ", fn: " + aggType + ") ");
}
public static void appendWindowFlux(StringBuffer buffer, String step) {
buffer.append("|> window(every: " + step + ") ");
}
/**
* 不带时间窗口统计
*
* @param buffer
* @param aggType 统计类型【sum,count,min,max...)
*/
public static void appendAggregateFlux(StringBuffer buffer, String aggType) {
buffer.append("|> " + aggType + "() ");
}
/**
* 多个查询结果需要指定每个输出结果名称
*
* @param buffer
* @param name
*/
public static void appendYieldFlux(StringBuffer buffer, String name) {
buffer.append("|> yield(name: \"" + name + "\") ");
}
/**
* 将时间指定为某单位
*
* @param buffer
* @param step
*/
public static void appendTruncateTimeColumn(StringBuffer buffer, String step) {
buffer.append("|> truncateTimeColumn(unit: " + step + ") ");
}
/**
* 导入包名
*
* @param buffer
* @param name 包名
*/
public static void appendImportFlux(StringBuffer buffer, String name) {
buffer.append("import \"" + name + "\" ");
}
/**
* 过滤空值
*
* @param buffer
*/
public static void appendExistsFlux(StringBuffer buffer) {
buffer.append("|> filter(fn: (r) => exists r._value ) ");
}
/**
* 过滤0值
*
* @param buffer
*/
public static void appendZeroFlux(StringBuffer buffer) {
buffer.append("|> filter(fn: (r) => r._value > 0) ");
}
}
... ...
# 项目相关配置 jhlt: # 名称 name: zhonglai # 版本 version: 3.8.2 # 版权年份 copyrightYear: 2023 # 开发环境配置 server: # 服务器的HTTP端口,默认为8080 port: 8067 servlet: # 应用的访问路径 context-path: / tomcat: # tomcat的URI编码 uri-encoding: UTF-8 # 连接数满后的排队数,默认为100 accept-count: 1000 threads: # tomcat最大线程数,默认为200 max: 800 # Tomcat启动初始化的线程数,默认值10 min-spare: 100 # 日志配置 logging: level: com.ruoyi: debug org.springframework: warn # Swagger配置 swagger: # 是否开启swagger enabled: true # 请求前缀 pathMapping: /dev-api
\ No newline at end of file
# 项目相关配置 jhlt: # 名称 name: zhonglai # 版本 version: 3.8.2 # 版权年份 copyrightYear: 2023 # 开发环境配置 server: # 服务器的HTTP端口,默认为8080 port: 8067 servlet: # 应用的访问路径 context-path: / tomcat: # tomcat的URI编码 uri-encoding: UTF-8 # 连接数满后的排队数,默认为100 accept-count: 1000 threads: # tomcat最大线程数,默认为200 max: 800 # Tomcat启动初始化的线程数,默认值10 min-spare: 100 # 日志配置 logging: level: com.ruoyi: debug org.springframework: warn # Swagger配置 swagger: # 是否开启swagger enabled: true # 请求前缀 pathMapping: /dev-api # canal配置 canal: client: instances: example: host: 192.168.94.186 port: 11111
\ No newline at end of file
... ...
url=jdbc:mysql://rm-wz9740un21f09iokuao.mysql.rds.aliyuncs.com:3306/liu_yu_le?useUnicode=true&characterEncoding=utf8&autoReconnect=true
# url=jdbc:mysql://rm-wz9740un21f09iokuao.mysql.rds.aliyuncs.com:3306/liu_yu_le?useUnicode=true&characterEncoding=utf8&autoReconnect=true
# user = luhui
# pass = Luhui586
url=jdbc:mysql://192.168.2.69:3306/ly_sensor_data_2020?useUnicode=true&characterEncoding=utf8&autoReconnect=true
user = luhui
pass = Luhui586
... ...
... ... @@ -56,6 +56,24 @@ public class DeviceProductProtocol {
deviceDataConfigList.add(new DeviceDataConfig(41,"SYS_ALARM",null,PLCDataType.故障代码,"00","设备故障报警","01","01"));
deviceDataConfigList.add(new DeviceDataConfig(73,"YC_AUTO_P002",PLCType.排污,PLCDataType.控制器状态码,"41","远程自动按钮",null,"01"));
deviceDataConfigList.add(new DeviceDataConfig(1,"C001_RUN",PLCType.曝气,PLCDataType.控制器状态码,"42","运行信号"));
deviceDataConfigList.add(new DeviceDataConfig(2,"C001_ALARM",PLCType.曝气,PLCDataType.故障代码,"42","故障信号","11","01"));
deviceDataConfigList.add(new DeviceDataConfig(1,"P003_RUN",PLCType.微滤机,PLCDataType.控制器状态码,"43","微滤反洗水泵运行信号"));
deviceDataConfigList.add(new DeviceDataConfig(2,"P003_ALARM",PLCType.微滤机,PLCDataType.故障代码,"43","微滤反洗水泵故障信号","11","01"));
deviceDataConfigList.add(new DeviceDataConfig(1,"M001_RUN",PLCType.微滤机,PLCDataType.控制器状态码,"44","微滤驱动电机运行信号"));
deviceDataConfigList.add(new DeviceDataConfig(2,"M001_ALARM",PLCType.微滤机,PLCDataType.故障代码,"44","微滤驱动电机故障信号","11","01"));
deviceDataConfigList.add(new DeviceDataConfig(1,"DV01_RUN",PLCType.蛋分,PLCDataType.控制器状态码,"45","蛋分电磁阀运行信号"));
deviceDataConfigList.add(new DeviceDataConfig(1,"DF_YW",PLCType.蛋分,PLCDataType.液位高低,"45","蛋分低液位"));
deviceDataConfigList.add(new DeviceDataConfig(1,"ZWX_RUN",PLCType.杀菌,PLCDataType.控制器状态码,"46","紫外线运行信号"));
deviceDataConfigList.add(new DeviceDataConfig(1,"JCY_RUN",PLCType.鱼儿乐,PLCDataType.控制器状态码,"47","水质监测仪运行信号"));
deviceDataConfigList.add(new DeviceDataConfig(1,"ZM_RUN",null,PLCDataType.控制器状态码,"00","照明运行信号"));
deviceDataConfigList.add(new DeviceDataConfig(1,"XHSC_YW_L",PLCType.循环水池,PLCDataType.低液位,"48","循环水池低液位"));
deviceDataConfigList.add(new DeviceDataConfig(1,"XHSC_YW_H",PLCType.循环水池,PLCDataType.高液位,"49","循环水池高液位"));
deviceDataConfigList.add(new DeviceDataConfig(1,"SWLT_YW_1",PLCType.生物滤筒,PLCDataType.液位高低,"50","生物滤筒液位1"));
deviceDataConfigList.add(new DeviceDataConfig(1,"SWLT_YW_2",PLCType.生物滤筒,PLCDataType.液位高低,"51","生物滤筒液位2"));
deviceDataConfigList.add(new DeviceDataConfig(1,"ZWX_YW",PLCType.杀菌,PLCDataType.液位高低,"52","紫外线液位"));
deviceDataConfigList.add(new DeviceDataConfig(1,"WL_YW",PLCType.微滤机,PLCDataType.高液位,"53","微滤机池高液位"));
deviceDataWriteConfigList.add(new DeviceDataConfig(27,"YC_ST_C001A",PLCType.推水机,PLCDataType.控制器状态码,"01","远程启动按钮",null,"01"));
deviceDataWriteConfigList.add(new DeviceDataConfig(28,"YC_STP_C001A",PLCType.推水机,PLCDataType.控制器状态码,"01","远程停止按钮",null,"00"));
deviceDataWriteConfigList.add(new DeviceDataConfig(29,"YC_ST_C001B",PLCType.推水机,PLCDataType.控制器状态码,"02","远程启动按钮",null,"01"));
... ...
... ... @@ -15,7 +15,9 @@ public enum PLCDataType {
PH("8"),
液位高低("100"),
复位("101"),
控制柜状态("9");
低液位("102"),
高液位("103"),
控制柜状态("104");
public String sensorDataType;
PLCDataType(String sensorDataType)
... ...
... ... @@ -11,6 +11,11 @@ public enum PLCType {
推水机(7),
中转泵(9),
中转池(10),
微滤机(11),
蛋分(12),
循环水池(13),
杀菌(14),
生物滤筒(15),
备用(8);
private Integer device_terminal_type_key;
... ...
... ... @@ -24,7 +24,7 @@ mqtt:
clientId: ${random.uuid}
#公司id
roleid: 2
mqtt_usernames: PLC_004
mqtt_usernames: PLC_006,PLC_004
#订阅的topic
topics: ALL_POST,PUT_REQ
sub_clientid: '#'
... ...
... ... @@ -148,6 +148,7 @@
<artifactId>lh-jar-device-analysis</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
... ...
package com.zhonglai.luhui.mqtt;
import com.ruoyi.common.utils.GsonConstructor;
import com.zhonglai.luhui.device.analysis.comm.service.redis.RedisService;
import com.zhonglai.luhui.mqtt.comm.service.TerminalService;
import com.zhonglai.luhui.mqtt.service.ClienNoticeService;
import org.slf4j.Logger;
... ... @@ -10,6 +12,10 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
@ComponentScan(basePackages = {
"com.zhonglai.luhui.device.analysis",
... ... @@ -22,11 +28,24 @@ import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class MqttApplication {
private static Logger log = LoggerFactory.getLogger(MqttApplication.class);
@Autowired
private RedisService redisService ;
public static void main(String[] args) {
log.info("启动服务");
SpringApplicationBuilder builder = new SpringApplicationBuilder(MqttApplication.class);
builder.run( args);
}
// @PostConstruct
// public void test()
// {
// Map<String,Object> map = new HashMap<>();
// map.put("deviceName","SC-M8 号");
// String key ="ly:x6:devices:866838067734877_1_1";
// System.out.println(GsonConstructor.get().toJson(redisService.hmget(key,"deviceName")));;
// redisService.hmset(key,map);
// }
}
... ...
package com.zhonglai.luhui.mqtt;
import com.ruoyi.common.utils.GsonConstructor;
import redis.clients.jedis.Jedis;
import java.util.HashMap;
import java.util.Map;
public class Test {
public static void main(String[] args) {
Jedis jedis = new Jedis("47.112.163.61",9527);
jedis.auth("Luhui586");
jedis.select(1);
// 连接Redis并检查连接状态
String pingResponse = jedis.ping();
System.out.println("Ping Response: " + pingResponse);
Map<String,String> map = new HashMap<>();
map.put("deviceName","SC-M8 号");
String key ="ly:x6:devices:866838067734877_1_1";
System.out.println(GsonConstructor.get().toJson(jedis.hmget(key,"deviceName")));;
jedis.hmset(key,map);
// // 更新Redis中的key-value数据
// jedis.set("key", "newValue");
//
// // 获取并输出更新后的值
// String value = jedis.get("key");
// System.out.println("Updated Value: " + value);
// 关闭连接
jedis.close();
}
}
... ...
... ... @@ -41,15 +41,15 @@ spring:
mqtt:
#链接地址
broker: tcp://175.24.61.68:1883
broker: tcp://127.0.0.1:1883
#唯一标识
clientId: ${random.uuid}
#公司id
roleid: 2
mqtt_usernames: 6_WP,12_BPQ,10_TLJ,NWDB_2023,WLJ_1,YWB_A700E,12_ZNZY
mqtt_usernames: 6_WP
#订阅的topic
topics: ADD_POST,ALL_POST,DB_TOPIC_DISTRIBUTE,GET/+,online,PUT_REQ/+,READ_REQ/+
sub_clientid: '866520063012785'
sub_clientid: '+'
topicconfig: "/{{roleid}}/{{username}}/{{clientid}}/{{payloadtype}}/{{topicType}}/{{messageid}}"
top_return_map: '{"PUT":"PUT_REQ","READ":"READ_REQ"}'
username: sysuser
... ... @@ -58,6 +58,7 @@ mqtt:
#客户端操作时间
operationTime: 10
sys:
redis:
field: "lh:mqttservice:"
... ...
... ... @@ -523,6 +523,16 @@
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.25.0</version>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.12.0</version>
</dependency>
</dependencies>
... ...