diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java b/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java
index 7bbc459..4293d67 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java
@@ -14,7 +14,7 @@ import java.util.Objects;
* 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
* 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/24 10:11
*/
@Getter
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java
index 1a5414e..32501eb 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java
@@ -1,22 +1,11 @@
package com.isu.gaswellwatch.modbus.data;
-import cn.hutool.core.map.MapUtil;
-import org.apache.commons.io.IOUtils;
-import org.springframework.util.ResourceUtils;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
import java.util.Map;
-import java.util.Objects;
/**
* 数据持久化处理器
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/23 11:53
*/
public interface PersistenceHandler {
@@ -30,25 +19,6 @@ public interface PersistenceHandler {
void createTable(String tableName, Long deviceId);
- Map insert(String tableName, String cacheKey);
-
- default void setValue(PreparedStatement ps, Map row, int index, String key, int sqlType) throws SQLException {
- String value = MapUtil.getStr(row, key);
- if (Objects.isNull(value)) {
- ps.setNull(index, sqlType);
- } else {
- ps.setObject(index, value);
- }
- }
-
- static String getResource(String classPath) {
- try {
- File file = ResourceUtils.getFile("classpath:" + classPath);
- return IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
+ Map> insert(String tableName, String cacheKey);
}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java
index b5424a9..c484a2f 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java
@@ -28,7 +28,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
/**
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/23 11:55
*/
@Slf4j
@@ -94,9 +94,9 @@ public class Redis2DBPersistenceService {
|| StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) {
persistenceHandler.createTable(tableName, deviceId);
}
- Map rowData = persistenceHandler.insert(tableName, cacheKey);
- if (ObjectUtils.isNotEmpty(rowData)) {
- String rowDataJson = JSONUtil.toJsonStr(rowData);
+ Map> changeData = persistenceHandler.insert(tableName, cacheKey);
+ if (ObjectUtils.isNotEmpty(changeData)) {
+ String rowDataJson = JSONUtil.toJsonStr(changeData);
String queueName = String.format(Queues.DEVICE_EVENTS, deviceId);
try {
log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson);
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java
index 5164bb1..07465a9 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java
@@ -9,7 +9,7 @@ import java.util.Map;
/**
* Modbus 数据解析器
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/23 11:20
*/
public interface DecodeHandler {
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/ComposeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/ComposeDecodeHandler.java
index 2504825..8abaa8b 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/ComposeDecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/ComposeDecodeHandler.java
@@ -7,7 +7,7 @@ import java.util.Collection;
import java.util.Map;
/**
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/25 16:31
*/
public class ComposeDecodeHandler implements DecodeHandler {
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/FactorDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/FactorDecodeHandler.java
index e1d1f3b..b40a2c2 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/FactorDecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/FactorDecodeHandler.java
@@ -19,7 +19,7 @@ import java.util.Map;
* 例如:输入值:2580,系数(factor):100,精度(precision):2,输出值:2580 * 100 = 258000
*
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/24 21:23
*/
@Component(FactorDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/HighLowBinDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/HighLowBinDecodeHandler.java
index 70e1d4e..1b9bb60 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/HighLowBinDecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/HighLowBinDecodeHandler.java
@@ -9,7 +9,7 @@ import java.util.Map;
/**
* 高低位4字节解析器
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/25 16:36
*/
@Component(HighLowBinDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateDecodeHandler.java
index 52ea072..4477dcf 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateDecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateDecodeHandler.java
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* 本地日期类型解析器
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/25 11:51
*/
@Component(LocalDateDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java
index ba81c52..667e89f 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* 本地日期时间类型解析器
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/24 21:34
*/
@Component(LocalDateTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalTimeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalTimeDecodeHandler.java
index 9474305..4b82791 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalTimeDecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalTimeDecodeHandler.java
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* 本地时间类型解析器
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/25 11:58
*/
@Component(LocalTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SignedNumberDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SignedNumberDecodeHandler.java
index acb0ddc..105f838 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SignedNumberDecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SignedNumberDecodeHandler.java
@@ -10,7 +10,7 @@ import java.util.Map;
/**
* 有符号整数转换
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/25 16:24
*/
@Component(SignedNumberDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java
index b92cd7a..8db655f 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* 本地日期时间类型解析器
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/24 21:34
*/
@Component(SimpleLocalDateTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/StringTimeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/StringTimeDecodeHandler.java
index b9e013f..e30623c 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/StringTimeDecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/StringTimeDecodeHandler.java
@@ -13,7 +13,7 @@ import java.util.stream.Collectors;
* 时间类型解析器
* 支持解析:999:59:59, 实际使用中请不要转为LocalTime
*
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/24 21:38
*/
@Component(StringTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/AbstractPersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/AbstractPersistenceHandler.java
new file mode 100644
index 0000000..ae4faad
--- /dev/null
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/AbstractPersistenceHandler.java
@@ -0,0 +1,67 @@
+package com.isu.gaswellwatch.modbus.data.impl;
+
+import cn.hutool.core.map.MapUtil;
+import com.isu.gaswellwatch.config.SnowflakeConfig;
+import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
+import jakarta.annotation.Resource;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.util.ResourceUtils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @author 王仕龙
+ * 2024/11/26 15:42
+ */
+public abstract class AbstractPersistenceHandler implements PersistenceHandler {
+
+ @Resource
+ protected JdbcTemplate jdbcTemplate;
+ @Resource
+ protected SnowflakeConfig snowflakeConfig;
+ @Resource(name = "stringRedisTemplate")
+ protected RedisTemplate redisTemplate;
+
+ protected void createTable(String filePath, String tableName, Long deviceId) {
+ String createTableSQL = this.getResource(filePath);
+ createTableSQL = StringUtils.replace(createTableSQL, "$TableName$", tableName);
+ createTableSQL = StringUtils.replace(createTableSQL, "$DeviceId$", String.valueOf(deviceId));
+ this.jdbcTemplate.execute(createTableSQL);
+ }
+
+ protected Map getLastRow(String tableName) {
+ return this.jdbcTemplate.queryForList("SELECT * FROM " + tableName + " ORDER BY ID DESC LIMIT 1")
+ .stream()
+ .findFirst()
+ .orElse(null);
+ }
+
+ protected void setValue(PreparedStatement ps, Map row, int index, String key, int sqlType) throws SQLException {
+ String value = MapUtil.getStr(row, key);
+ if (Objects.isNull(value)) {
+ ps.setNull(index, sqlType);
+ } else {
+ ps.setObject(index, value);
+ }
+ }
+
+ protected String getResource(String classPath) {
+ try {
+ File file = ResourceUtils.getFile("classpath:" + classPath);
+ return IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java
index 86ceb4d..0957618 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java
@@ -13,13 +13,14 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Map;
+import java.util.Objects;
/**
- * @author 王仕龙
+ * @author 王仕龙
* 2024/11/24 17:33
*/
@Component(PersistenceHandler.ETC_MODBUS_TYPE)
-public class EtcPersistenceHandler implements PersistenceHandler {
+public class EtcPersistenceHandler extends AbstractPersistenceHandler {
@Resource
private JdbcTemplate jdbcTemplate;
@@ -30,78 +31,79 @@ public class EtcPersistenceHandler implements PersistenceHandler {
@Override
public void createTable(String tableName, Long deviceId) {
- String createTableSQL = PersistenceHandler.getResource("sql/CREATE_ETC.sql");
- createTableSQL = StringUtils.replace(createTableSQL, "$TableName$", tableName);
- createTableSQL = StringUtils.replace(createTableSQL, "$DeviceId$", String.valueOf(deviceId));
- this.jdbcTemplate.execute(createTableSQL);
+ this.createTable("sql/CREATE_ETC.sql", tableName, deviceId);
}
@Override
- public Map insert(String tableName, String cacheKey) {
- String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_ETC.sql");
+ public Map> insert(String tableName, String cacheKey) {
+ String insertTableSQL = this.getResource("sql/INSERT_ETC.sql");
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
- Map row = this.redisTemplate.opsForHash().entries(cacheKey);
+ Map oldRow = this.getLastRow(tableName);
+ Map newRow = this.redisTemplate.opsForHash().entries(cacheKey);
this.jdbcTemplate.execute(insertTableSQL, new PreparedStatementCallback