diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusDeviceTypeEnum.java b/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusDeviceTypeEnum.java new file mode 100644 index 0000000..cb98cba --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusDeviceTypeEnum.java @@ -0,0 +1,40 @@ +package com.isu.gaswellwatch.modbus.data; + +import lombok.Getter; + +import java.util.HashMap; +import java.util.Map; + +@Getter +public enum ModbusDeviceTypeEnum { + + KNPCV1("4", PersistenceHandler.KNPCV1_MODBUS_TYPE), + ETC("5", PersistenceHandler.ETC_MODBUS_TYPE), + SCSS("6", PersistenceHandler.SCSS_MODBUS_TYPE); + + private final String id; + private final String code; + + ModbusDeviceTypeEnum(String id, String code) { + this.id = id; + this.code = code; + } + + @Override + public String toString() { + return getCode(); + } + + public static String getCodeById(String id) { + return enumMap.get(id); + } + + private static final Map enumMap = new HashMap(); + + static { + for (ModbusDeviceTypeEnum type : ModbusDeviceTypeEnum.values()) { + enumMap.put(type.id, type.code); + } + } + +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java index 1c5efd0..e6a8118 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java @@ -1,5 +1,18 @@ package com.isu.gaswellwatch.modbus.data; +import cn.hutool.core.map.MapUtil; +import org.apache.commons.io.IOUtils; +import org.springframework.util.ResourceUtils; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Map; +import java.util.Objects; + /** * 数据持久化处理器 * @@ -7,11 +20,35 @@ package com.isu.gaswellwatch.modbus.data; * 2024/11/23 11:53 */ public interface PersistenceHandler { - public static final String MODBUS_DEVICE_TYPE = "KNPCV1"; + String KNPCV1_MODBUS_TYPE = "knpc"; + String ETC_MODBUS_TYPE = "etc"; + String SCSS_MODBUS_TYPE = "scss"; + + + String DEVICE_INFO_CACHE = "info:device:"; String DEVICE_DATA_CACHE = "data:device:"; void createTable(String tableName, Long deviceId); void insert(String tableName, String cacheKey); + default void setValue(PreparedStatement ps, Map row, int index, String key, int sqlType) throws SQLException { + String value = MapUtil.getStr(row, key); + if (Objects.isNull(value)) { + ps.setNull(index, sqlType); + } else { + ps.setObject(index, value); + } + } + + static String getResource(String classPath) { + try { + File file = ResourceUtils.getFile("classpath:" + classPath); + return IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java similarity index 68% rename from src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceService.java rename to src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java index d999fc0..0c80e1b 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceService.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java @@ -1,8 +1,7 @@ -package com.isu.gaswellwatch.modbus.data.impl; +package com.isu.gaswellwatch.modbus.data; import cn.hutool.core.map.MapUtil; import com.isu.gaswellwatch.entity.Response; -import com.isu.gaswellwatch.modbus.data.PersistenceHandler; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; @@ -34,8 +33,10 @@ import java.util.stream.Collectors; @SuppressWarnings("all") public class Redis2DBPersistenceService { - private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA" + - " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'"; + public static final String DEFAULT_DATA_TABLE = "t_device_data_"; + private static final String DEVICE_INFO_SQL = "SELECT * from device where id = "; + private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA" + + " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'"; @Resource private JdbcTemplate jdbcTemplate; @Resource(name = "stringRedisTemplate") @@ -65,11 +66,20 @@ public class Redis2DBPersistenceService { continue; } if (Objects.nonNull(idGatewayMappingMap)) { - operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, - "online", String.valueOf(idGatewayMappingMap.containsKey(deviceId))); + operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", + String.valueOf(idGatewayMappingMap.containsKey(deviceId))); } - persistenceHandler = persistenceHandlerMap.get("KNPCV1"); - tableName = "t_device_data_" + deviceId; + String modbusDeviceTypeId = (String) operations.get(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "product"); + if (StringUtils.isEmpty(modbusDeviceTypeId)) { + Map deviceInfo = this.jdbcTemplate.queryForMap(DEVICE_INFO_SQL + deviceId); + Map cacheDeviceInfo = new HashMap<>(deviceInfo.size()); + deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value))); + modbusDeviceTypeId = (String) cacheDeviceInfo.get("product"); + operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo); + } + + persistenceHandler = persistenceHandlerMap.get(ModbusDeviceTypeEnum.getCodeById(modbusDeviceTypeId)); + tableName = DEFAULT_DATA_TABLE + deviceId; existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); if (ObjectUtils.isEmpty(existsTableList) || ObjectUtils.isEmpty(existsTableList.get(0)) @@ -84,18 +94,14 @@ public class Redis2DBPersistenceService { private Map getOnlineGateway() { try { RequestEntity request = RequestEntity.get("http://127.0.0.1:9999/modbus-tcp/online").build(); - ResponseEntity>> response = this.restTemplate.exchange(request, - new ParameterizedTypeReference>>() { - }); - if (Objects.isNull(response) - || Objects.isNull(response.getBody()) - || ObjectUtils.isEmpty(response.getBody().getData())) { + ResponseEntity>> response = this.restTemplate.exchange(request, new ParameterizedTypeReference>>() { + }); + if (Objects.isNull(response) || Objects.isNull(response.getBody()) || ObjectUtils.isEmpty(response.getBody().getData())) { return null; } - List> idGatewayMappingList = this.jdbcTemplate - .queryForList("select id, gateway_sn from devices where gateway_sn in (" + - response.getBody().getData().stream().map(value -> "'" + value + "'") - .collect(Collectors.joining(",")) + ")"); + List> idGatewayMappingList = this.jdbcTemplate.queryForList("select id, gateway_sn" + + " from device where gateway_sn in (" + + response.getBody().getData().stream().map(value -> "'" + value + "'").collect(Collectors.joining(",")) + ")"); if (ObjectUtils.isEmpty(idGatewayMappingList)) { return null; } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java index 070d73c..5164bb1 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java @@ -13,6 +13,7 @@ import java.util.Map; * 2024/11/23 11:20 */ public interface DecodeHandler { + String DECODE_NAME = "DecodeHandler"; Logger logger = LoggerFactory.getLogger("com.isu.gaswellwatch.decode"); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/ComposeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/ComposeDecodeHandler.java new file mode 100644 index 0000000..2504825 --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/ComposeDecodeHandler.java @@ -0,0 +1,35 @@ +package com.isu.gaswellwatch.modbus.data.decode.impl; + +import com.isu.gaswellwatch.modbus.data.ModbusMessage; +import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; + +import java.util.Collection; +import java.util.Map; + +/** + * @author 王仕龙 + * 2024/11/25 16:31 + */ +public class ComposeDecodeHandler implements DecodeHandler { + + public final Collection collection; + + public ComposeDecodeHandler(Collection collection) { + this.collection = collection; + } + + @Override + public String decode(Map commandPointMap, String value) { + for (DecodeHandler handler : this.collection) { + value = handler.decode(commandPointMap, value); + } + return value; + } + + @Override + public void decode(Map commandPointMap, ModbusMessage.MessagePoint point) { + for (DecodeHandler handler : this.collection) { + handler.decode(commandPointMap, point); + } + } +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/FactorDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/FactorDecodeHandler.java index 7400676..e1d1f3b 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/FactorDecodeHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/FactorDecodeHandler.java @@ -3,7 +3,6 @@ package com.isu.gaswellwatch.modbus.data.decode.impl; import cn.hutool.core.map.MapUtil; import com.isu.gaswellwatch.modbus.data.ModbusMessage; import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; @@ -23,10 +22,10 @@ import java.util.Map; * @author 王仕龙 * 2024/11/24 21:23 */ -@Slf4j -@SuppressWarnings("all") -@Component("factorDecodeHandler") +@Component(FactorDecodeHandler.NAME + DecodeHandler.DECODE_NAME) public class FactorDecodeHandler implements DecodeHandler { + public static final String NAME = "factor"; + @Override public String decode(Map commandPointMap, String value) { @@ -35,11 +34,11 @@ public class FactorDecodeHandler implements DecodeHandler { @Override public void decode(Map commandPointMap, ModbusMessage.MessagePoint point) { - point.setValue(decode(commandPointMap, point.getParseValue())); + point.setValue(this.decode(commandPointMap, point.getParseValue())); String factorStr = MapUtil.getStr(commandPointMap, "factor"); if (StringUtils.isNotBlank(factorStr) && StringUtils.isNotBlank(point.getParseValue())) { BigDecimal factor = new BigDecimal(factorStr); - BigDecimal value = new BigDecimal(point.getParseValue()); + BigDecimal value = new BigDecimal(point.getValue()); int precision = MapUtil.getInt(commandPointMap, "precision", 0); switch (factor.compareTo(BigDecimal.ZERO)) { case -1 -> point.setValue(value.multiply(factor.abs()).toString()); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/HighLowBinDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/HighLowBinDecodeHandler.java new file mode 100644 index 0000000..70e1d4e --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/HighLowBinDecodeHandler.java @@ -0,0 +1,30 @@ +package com.isu.gaswellwatch.modbus.data.decode.impl; + +import com.isu.gaswellwatch.modbus.data.ModbusMessage; +import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * 高低位4字节解析器 + * + * @author 王仕龙 + * 2024/11/25 16:36 + */ +@Component(HighLowBinDecodeHandler.NAME + DecodeHandler.DECODE_NAME) +public class HighLowBinDecodeHandler implements DecodeHandler { + + public static final String NAME = "highLowBin"; + + @Override + public String decode(Map commandPointMap, String value) { + return String.valueOf(Integer.parseInt(value, 16)); + } + + @Override + public void decode(Map commandPointMap, ModbusMessage.MessagePoint point) { + throw new RuntimeException("not supported"); + } + +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateDecodeHandler.java index 4b78c13..52ea072 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateDecodeHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateDecodeHandler.java @@ -2,7 +2,6 @@ package com.isu.gaswellwatch.modbus.data.decode.impl; import com.isu.gaswellwatch.modbus.data.ModbusMessage; import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.time.LocalDate; @@ -15,10 +14,10 @@ import java.util.Map; * @author 王仕龙 * 2024/11/25 11:51 */ -@Slf4j -@SuppressWarnings("all") -@Component("localDateDecodeHandler") +@Component(LocalDateDecodeHandler.NAME + DecodeHandler.DECODE_NAME) public class LocalDateDecodeHandler implements DecodeHandler { + public static final String NAME = "localDate"; + public static final DateTimeFormatter IN_FORMATTER = DateTimeFormatter.ofPattern("yyyy-M-d"); public static final DateTimeFormatter OUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); @@ -30,7 +29,7 @@ public class LocalDateDecodeHandler implements DecodeHandler { @Override public void decode(Map commandPointMap, ModbusMessage.MessagePoint point) { - point.setValue(decode(commandPointMap, point.getParseValue())); + point.setValue(this.decode(commandPointMap, point.getValue())); } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java index 8c86993..ba81c52 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java @@ -2,7 +2,6 @@ package com.isu.gaswellwatch.modbus.data.decode.impl; import com.isu.gaswellwatch.modbus.data.ModbusMessage; import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @@ -15,10 +14,10 @@ import java.util.Map; * @author 王仕龙 * 2024/11/24 21:34 */ -@Slf4j -@SuppressWarnings("all") -@Component("localDateTimeDecodeHandler") +@Component(LocalDateTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME) public class LocalDateTimeDecodeHandler implements DecodeHandler { + public static final String NAME = "fullYearLocalDateTime"; + public static final DateTimeFormatter IN_FORMATTER = DateTimeFormatter.ofPattern("yyyy-M-d H:m:s"); public static final DateTimeFormatter OUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @@ -30,7 +29,7 @@ public class LocalDateTimeDecodeHandler implements DecodeHandler { @Override public void decode(Map commandPointMap, ModbusMessage.MessagePoint point) { - point.setValue(decode(commandPointMap, point.getParseValue())); + point.setValue(this.decode(commandPointMap, point.getValue())); } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalTimeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalTimeDecodeHandler.java index 37ed01a..9474305 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalTimeDecodeHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalTimeDecodeHandler.java @@ -2,7 +2,6 @@ package com.isu.gaswellwatch.modbus.data.decode.impl; import com.isu.gaswellwatch.modbus.data.ModbusMessage; import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.time.LocalTime; @@ -15,10 +14,9 @@ import java.util.Map; * @author 王仕龙 * 2024/11/25 11:58 */ -@Slf4j -@SuppressWarnings("all") -@Component("localTimeDecodeHandler") +@Component(LocalTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME) public class LocalTimeDecodeHandler implements DecodeHandler { + public static final String NAME = "localTime"; public static final DateTimeFormatter IN_FORMATTER = DateTimeFormatter.ofPattern("H:m:s"); public static final DateTimeFormatter OUT_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss"); @@ -30,7 +28,7 @@ public class LocalTimeDecodeHandler implements DecodeHandler { @Override public void decode(Map commandPointMap, ModbusMessage.MessagePoint point) { - point.setValue(decode(commandPointMap, point.getParseValue())); + point.setValue(this.decode(commandPointMap, point.getValue())); } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SignedNumberDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SignedNumberDecodeHandler.java new file mode 100644 index 0000000..acb0ddc --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SignedNumberDecodeHandler.java @@ -0,0 +1,34 @@ +package com.isu.gaswellwatch.modbus.data.decode.impl; + +import com.isu.gaswellwatch.modbus.data.ModbusMessage; +import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; +import com.isu.gaswellwatch.utils.ReverseComplementCodeUtils; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * 有符号整数转换 + * + * @author 王仕龙 + * 2024/11/25 16:24 + */ +@Component(SignedNumberDecodeHandler.NAME + DecodeHandler.DECODE_NAME) +public class SignedNumberDecodeHandler implements DecodeHandler { + public static final String NAME = "signedNumber"; + + @Override + public String decode(Map commandPointMap, String value) { + int decimalNumber = Integer.parseInt(value, 16); + String binaryNumber = Integer.toBinaryString(decimalNumber); + binaryNumber = ReverseComplementCodeUtils.binOriginalToReverse(binaryNumber); + binaryNumber = ReverseComplementCodeUtils.binReverseToComplement(binaryNumber, "1"); + return ReverseComplementCodeUtils.binComplementToDec(binaryNumber); + } + + @Override + public void decode(Map commandPointMap, ModbusMessage.MessagePoint point) { + point.setValue(this.decode(commandPointMap, point.getParseValue())); + } + +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java new file mode 100644 index 0000000..b92cd7a --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java @@ -0,0 +1,35 @@ +package com.isu.gaswellwatch.modbus.data.decode.impl; + +import com.isu.gaswellwatch.modbus.data.ModbusMessage; +import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; + +/** + * 本地日期时间类型解析器 + * + * @author 王仕龙 + * 2024/11/24 21:34 + */ +@Component(SimpleLocalDateTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME) +public class SimpleLocalDateTimeDecodeHandler implements DecodeHandler { + public static final String NAME = "simpleLocalDateTime"; + + public static final DateTimeFormatter IN_FORMATTER = DateTimeFormatter.ofPattern("yy-M-d H:m:s"); + public static final DateTimeFormatter OUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + + @Override + public String decode(Map commandPointMap, String value) { + return LocalDateTime.parse(value, IN_FORMATTER).format(OUT_FORMATTER); + } + + @Override + public void decode(Map commandPointMap, ModbusMessage.MessagePoint point) { + point.setValue(this.decode(commandPointMap, point.getValue())); + } + +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/StringTimeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/StringTimeDecodeHandler.java index 3458295..b9e013f 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/StringTimeDecodeHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/StringTimeDecodeHandler.java @@ -2,7 +2,6 @@ package com.isu.gaswellwatch.modbus.data.decode.impl; import com.isu.gaswellwatch.modbus.data.ModbusMessage; import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; @@ -17,10 +16,9 @@ import java.util.stream.Collectors; * @author 王仕龙 * 2024/11/24 21:38 */ -@Slf4j -@SuppressWarnings("all") -@Component("stringTimeDecodeHandler") +@Component(StringTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME) public class StringTimeDecodeHandler implements DecodeHandler { + public static final String NAME = "stringTime"; @Override public String decode(Map commandPointMap, String value) { @@ -34,7 +32,7 @@ public class StringTimeDecodeHandler implements DecodeHandler { @Override public void decode(Map commandPointMap, ModbusMessage.MessagePoint point) { - point.setValue(decode(commandPointMap, point.getParseValue())); + point.setValue(this.decode(commandPointMap, point.getValue())); } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java new file mode 100644 index 0000000..b00e3df --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java @@ -0,0 +1,106 @@ +package com.isu.gaswellwatch.modbus.data.impl; + +import com.isu.gaswellwatch.config.SnowflakeConfig; +import com.isu.gaswellwatch.modbus.data.PersistenceHandler; +import jakarta.annotation.Resource; +import org.apache.commons.lang3.StringUtils; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.PreparedStatementCallback; +import org.springframework.stereotype.Component; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Map; + +/** + * @author 王仕龙 + * 2024/11/24 17:33 + */ +@Component(PersistenceHandler.ETC_MODBUS_TYPE) +public class EtcPersistenceHandler implements PersistenceHandler { + + @Resource + private JdbcTemplate jdbcTemplate; + @Resource + private SnowflakeConfig snowflakeConfig; + @Resource(name = "stringRedisTemplate") + private RedisTemplate redisTemplate; + + @Override + public void createTable(String tableName, Long deviceId) { + String createTableSQL = PersistenceHandler.getResource("sql/CREATE_ETC.sql"); + createTableSQL = StringUtils.replace(createTableSQL, "$TableName$", tableName); + createTableSQL = StringUtils.replace(createTableSQL, "$DeviceId$", String.valueOf(deviceId)); + this.jdbcTemplate.execute(createTableSQL); + } + + @Override + public void insert(String tableName, String cacheKey) { + String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_ETC.sql"); + insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName); + + Map row = this.redisTemplate.opsForHash().entries(cacheKey); + + this.jdbcTemplate.execute(insertTableSQL, new PreparedStatementCallback() { + @Override + public Object doInPreparedStatement(PreparedStatement ps) throws SQLException { + ps.setLong(1, EtcPersistenceHandler.this.snowflakeConfig.snowflakeId()); + EtcPersistenceHandler.this.setValue(ps, row, 2, "deviceId", Types.BIGINT); + EtcPersistenceHandler.this.setValue(ps, row, 3, "collectionTime", Types.TIMESTAMP); + EtcPersistenceHandler.this.setValue(ps, row, 4, "receiveTime", Types.TIMESTAMP); + EtcPersistenceHandler.this.setValue(ps, row, 5, "deviceTime", Types.TIMESTAMP); + EtcPersistenceHandler.this.setValue(ps, row, 6, "runMode", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 7, "wellStatus", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 8, "plugStatus", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 9, "statusOpenTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 10, "statusCloseTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 11, "oilPressure", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 12, "casPressure", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 13, "prePressure", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 14, "pipePressure", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 15, "liquidLevel", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 16, "temperature", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 17, "humidity", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 18, "opmode", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 19, "timer1", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 20, "timer2", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 21, "timer1Open", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 22, "timer1Close", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 23, "timer2Open", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 24, "timer2Close", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 25, "timingOpen", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 26, "timingClose", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 27, "timingCelay", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 28, "presource", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 29, "pressureOpen", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 30, "pressureClose", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 31, "triggerType", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 32, "stabilityTime", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 33, "maxOpenWell", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 34, "maxCloseWell", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 35, "minOpenWell", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 36, "minCloseWell", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 37, "presproTect", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 38, "presproSource", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 39, "openWellLimitMax", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 40, "openWellLimitMin", Types.DECIMAL); + EtcPersistenceHandler.this.setValue(ps, row, 41, "plugInitStatus", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 42, "plugSustainTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 43, "plugCloseTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 44, "tpInitStatus", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 45, "tpOpenSource", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 46, "tpOpenTrigger", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 47, "tpOpenPressure", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 48, "tpOpenTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 49, "tpCloseSource", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 50, "tpCloseTrigger", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 51, "tpClosePressure", Types.INTEGER); + EtcPersistenceHandler.this.setValue(ps, row, 52, "tpCloseTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, row, 53, "tpStabilityTime", Types.INTEGER); + return ps.executeUpdate(); + } + }); + } +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Knpcv1PersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Knpcv1PersistenceHandler.java index 81e29a2..85da219 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Knpcv1PersistenceHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Knpcv1PersistenceHandler.java @@ -1,32 +1,24 @@ package com.isu.gaswellwatch.modbus.data.impl; -import cn.hutool.core.map.MapUtil; import com.isu.gaswellwatch.config.SnowflakeConfig; import com.isu.gaswellwatch.modbus.data.PersistenceHandler; import jakarta.annotation.Resource; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCallback; import org.springframework.stereotype.Component; -import org.springframework.util.ResourceUtils; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; import java.util.Map; -import java.util.Objects; /** * @author 王仕龙 * 2024/11/24 17:33 */ -@Component(PersistenceHandler.MODBUS_DEVICE_TYPE) +@Component(PersistenceHandler.KNPCV1_MODBUS_TYPE) public class Knpcv1PersistenceHandler implements PersistenceHandler { @Resource @@ -38,7 +30,7 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler { @Override public void createTable(String tableName, Long deviceId) { - String createTableSQL = getResource("sql/CREATE_KNPCV1.sql"); + String createTableSQL = PersistenceHandler.getResource("sql/CREATE_KNPCV1.sql"); createTableSQL = StringUtils.replace(createTableSQL, "$TableName$", tableName); createTableSQL = StringUtils.replace(createTableSQL, "$DeviceId$", String.valueOf(deviceId)); this.jdbcTemplate.execute(createTableSQL); @@ -46,7 +38,7 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler { @Override public void insert(String tableName, String cacheKey) { - String insertTableSQL = getResource("sql/INSERT_KNPCV1.sql"); + String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_KNPCV1.sql"); insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName); Map row = this.redisTemplate.opsForHash().entries(cacheKey); @@ -62,8 +54,8 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler { Knpcv1PersistenceHandler.this.setValue(ps, row, 6, "runMode", Types.INTEGER); Knpcv1PersistenceHandler.this.setValue(ps, row, 7, "wellStatus", Types.INTEGER); Knpcv1PersistenceHandler.this.setValue(ps, row, 8, "plugStatus", Types.INTEGER); - Knpcv1PersistenceHandler.this.setValue(ps, row, 9, "statusOpenTime", Types.TIME); - Knpcv1PersistenceHandler.this.setValue(ps, row, 10, "statusCloseTime", Types.TIME); + Knpcv1PersistenceHandler.this.setValue(ps, row, 9, "statusOpenTime", Types.VARCHAR); + Knpcv1PersistenceHandler.this.setValue(ps, row, 10, "statusCloseTime", Types.VARCHAR); Knpcv1PersistenceHandler.this.setValue(ps, row, 11, "oilPressure", Types.DECIMAL); Knpcv1PersistenceHandler.this.setValue(ps, row, 12, "casPressure", Types.DECIMAL); Knpcv1PersistenceHandler.this.setValue(ps, row, 13, "prePressure", Types.DECIMAL); @@ -74,10 +66,10 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler { Knpcv1PersistenceHandler.this.setValue(ps, row, 18, "opmode", Types.INTEGER); Knpcv1PersistenceHandler.this.setValue(ps, row, 19, "timer1", Types.INTEGER); Knpcv1PersistenceHandler.this.setValue(ps, row, 20, "timer2", Types.INTEGER); - Knpcv1PersistenceHandler.this.setValue(ps, row, 21, "timer1Open", Types.TIME); - Knpcv1PersistenceHandler.this.setValue(ps, row, 22, "timer1Close", Types.TIME); - Knpcv1PersistenceHandler.this.setValue(ps, row, 23, "timer2Open", Types.TIME); - Knpcv1PersistenceHandler.this.setValue(ps, row, 24, "timer2Close", Types.TIME); + Knpcv1PersistenceHandler.this.setValue(ps, row, 21, "timer1Open", Types.VARCHAR); + Knpcv1PersistenceHandler.this.setValue(ps, row, 22, "timer1Close", Types.VARCHAR); + Knpcv1PersistenceHandler.this.setValue(ps, row, 23, "timer2Open", Types.VARCHAR); + Knpcv1PersistenceHandler.this.setValue(ps, row, 24, "timer2Close", Types.VARCHAR); Knpcv1PersistenceHandler.this.setValue(ps, row, 25, "timingOpen", Types.VARCHAR); Knpcv1PersistenceHandler.this.setValue(ps, row, 26, "timingClose", Types.VARCHAR); Knpcv1PersistenceHandler.this.setValue(ps, row, 27, "timingCelay", Types.VARCHAR); @@ -110,25 +102,5 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler { return ps.executeUpdate(); } }); - } - - public void setValue(PreparedStatement ps, Map row, int index, String key, int sqlType) throws SQLException { - String value = MapUtil.getStr(row, key); - if (Objects.isNull(value)) { - ps.setNull(index, sqlType); - } else { - ps.setObject(index, value); - } - } - - public static String getResource(String classPath) { - try { - File file = ResourceUtils.getFile("classpath:" + classPath); - return IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java new file mode 100644 index 0000000..8654968 --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java @@ -0,0 +1,138 @@ +package com.isu.gaswellwatch.modbus.data.impl; + +import com.isu.gaswellwatch.config.SnowflakeConfig; +import com.isu.gaswellwatch.modbus.data.PersistenceHandler; +import jakarta.annotation.Resource; +import org.apache.commons.lang3.StringUtils; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.PreparedStatementCallback; +import org.springframework.stereotype.Component; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Map; + +/** + * @author 王仕龙 + * 2024/11/24 17:33 + */ +@Component(PersistenceHandler.SCSS_MODBUS_TYPE) +public class ScssPersistenceHandler implements PersistenceHandler { + + @Resource + private JdbcTemplate jdbcTemplate; + @Resource + private SnowflakeConfig snowflakeConfig; + @Resource(name = "stringRedisTemplate") + private RedisTemplate redisTemplate; + + @Override + public void createTable(String tableName, Long deviceId) { + String createTableSQL = PersistenceHandler.getResource("sql/CREATE_SCSS.sql"); + createTableSQL = StringUtils.replace(createTableSQL, "$TableName$", tableName); + createTableSQL = StringUtils.replace(createTableSQL, "$DeviceId$", String.valueOf(deviceId)); + this.jdbcTemplate.execute(createTableSQL); + } + + @Override + public void insert(String tableName, String cacheKey) { + String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_SCSS.sql"); + insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName); + + Map row = this.redisTemplate.opsForHash().entries(cacheKey); + + this.jdbcTemplate.execute(insertTableSQL, new PreparedStatementCallback() { + @Override + public Object doInPreparedStatement(PreparedStatement ps) throws SQLException { + ps.setLong(1, ScssPersistenceHandler.this.snowflakeConfig.snowflakeId()); + ScssPersistenceHandler.this.setValue(ps, row, 2, "deviceId", Types.BIGINT); + ScssPersistenceHandler.this.setValue(ps, row, 3, "collectionTime", Types.TIMESTAMP); + ScssPersistenceHandler.this.setValue(ps, row, 4, "receiveTime", Types.TIMESTAMP); + ScssPersistenceHandler.this.setValue(ps, row, 5, "casPressure", Types.DECIMAL); + ScssPersistenceHandler.this.setValue(ps, row, 6, "oilPressure", Types.DECIMAL); + ScssPersistenceHandler.this.setValue(ps, row, 7, "firstSolenoidStatus", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 8, "batteryVoltage", Types.DECIMAL); + ScssPersistenceHandler.this.setValue(ps, row, 9, "solarVoltage", Types.DECIMAL); + ScssPersistenceHandler.this.setValue(ps, row, 10, "remainingTimeAction", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 11, "secondSolenoidStatus", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 12, "preTransmission", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 13, "internetTraffic", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 14, "loadFactor", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 15, "dataTime", Types.TIMESTAMP); + ScssPersistenceHandler.this.setValue(ps, row, 16, "showDelay", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 17, "openWellSamplingInterval", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 18, "closeWellSamplingInterval", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 19, "ctlModel", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 20, "minPressure", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 21, "maxPressure", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 22, "pressureMinVoltage", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 23, "pressureMaxVoltage", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 24, "oilMin", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 25, "oliMax", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 26, "oilMinVoltage", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 27, "oilMaxVoltage", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 28, "inputPressureMinValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 29, "inputPressureMaxValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 30, "inputVoltageMinValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 31, "inputVoltageMaxValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 32, "flowRateMinValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 33, "flowRateMaxValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 34, "flowVoltageMinValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 35, "flowVoltageMaxValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 36, "continuousSamplingIntervalDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 37, "sensorSignalEffectiveLevel", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 38, "pressureCompensationPolarityFlag", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 39, "pressureCompensationValueSetting", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 40, "oilPressureCompensationPolarityFlag", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 41, "oilPressureCompensationValueSetting", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 42, "inputPressureCompensationPolarityFlag", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 43, "inputPressureCompensationValueSetting", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 44, "flowCompensationPolarityFlag", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 45, "flowCompensationValueSetting", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 46, "wellOpenTimeTimestamp", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 47, "wellOpenPressureValue", Types.DECIMAL); + ScssPersistenceHandler.this.setValue(ps, row, 48, "wellOpenOilPressureValue", Types.DECIMAL); + ScssPersistenceHandler.this.setValue(ps, row, 49, "wellOpenLoadFactorPresets", Types.DECIMAL); + ScssPersistenceHandler.this.setValue(ps, row, 50, "wellCloseTimeTimestamp", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 51, "wellClosePressureValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 52, "wellCloseOilPressureValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 53, "wellCloseFlowValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 54, "minWellOpenTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 55, "maxWellOpenTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 56, "minWellCloseTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 57, "maxWellCloseTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 58, "pressureStabilizationDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 59, "flowStabilizationDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 60, "loadFactorStabilizationDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 61, "plungerDelayDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 62, "plungerRiseDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 63, "continuosFlowDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 64, "wellCloseTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 65, "wellCloseTimeNotReachedDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 66, "wellCloseNotReachedCountValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 67, "plungerDelayDurationRepeat", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 68, "targetTimeTimestamp", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 69, "targetTimeRangeValue", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 70, "continuosFlowIncreaseDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 71, "continuosFlowDecreaseDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 72, "wellCloseIncreaseDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 73, "wellCloseDecreaseDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 74, "minWellCloseTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 75, "maxWellCloseTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 76, "minContinuosFlowTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 77, "maxContinuosFlowTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 78, "minWellOpenTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 79, "maxWellOpenTimeDuration", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 80, "wellOpenPressureValueAtOpen", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 81, "wellOpenOilPressureValueAtOpen", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 82, "wellOpenLoadFactorPresetsAtOpen", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 83, "wellClosePressureValueAtClose", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 84, "wellCloseOilPressureValueAtClose", Types.INTEGER); + ScssPersistenceHandler.this.setValue(ps, row, 85, "wellCloseFlowValueAtClose", Types.INTEGER); + return ps.executeUpdate(); + } + }); + } +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ComposeModbusMessageListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ComposeModbusMessageListener.java similarity index 94% rename from src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ComposeModbusMessageListener.java rename to src/main/java/com/isu/gaswellwatch/modbus/data/listener/ComposeModbusMessageListener.java index 834ca26..f2f384b 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ComposeModbusMessageListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ComposeModbusMessageListener.java @@ -1,4 +1,4 @@ -package com.isu.gaswellwatch.modbus.data.decode.listener; +package com.isu.gaswellwatch.modbus.data.listener; import com.isu.gaswellwatch.utils.SpringUtil; import org.springframework.amqp.core.BatchMessageListener; diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/DynamicRabbitListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java similarity index 75% rename from src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/DynamicRabbitListener.java rename to src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java index 99891a9..39ab1f6 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/DynamicRabbitListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java @@ -1,10 +1,11 @@ -package com.isu.gaswellwatch.modbus.data.decode.listener; +package com.isu.gaswellwatch.modbus.data.listener; /** * @author 王仕龙 * 2024/11/23 0:32 */ +import cn.hutool.core.map.MapUtil; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; @@ -12,17 +13,22 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @Component public class DynamicRabbitListener implements ApplicationRunner { + private static final String GATEWAY_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d ORDER BY d.id "; + private final AmqpAdmin amqpAdmin; + private final JdbcTemplate jdbcTemplate; private final ComposeModbusMessageListener composeListener; private final SimpleMessageListenerContainer listenerContainer; - public DynamicRabbitListener(ConnectionFactory connectionFactory) { + public DynamicRabbitListener(ConnectionFactory connectionFactory, JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; this.amqpAdmin = new RabbitAdmin(connectionFactory); this.listenerContainer = new SimpleMessageListenerContainer(); this.listenerContainer.setConnectionFactory(connectionFactory); @@ -35,17 +41,14 @@ public class DynamicRabbitListener implements ApplicationRunner { public void run(ApplicationArguments args) throws Exception { this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener()); this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener()); - // TODO 根据设备自动绑定队列 - this.addListenerQueue("/modbus/device/4B454E454E4731343030303030333538/collect"); + + this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item -> { + this.addListenerQueue(String.format("/modbus/device/%s/collect", MapUtil.getStr(item, "identifier")), null, null); + }); } -// @RabbitListener(queues = "/modbus/device/4B454E454E4731343030303030333538/collect") -// public void modbusMessage(Message message) { -// System.out.println("modbusMessage:" + new String(message.getBody())); -// } - - public void addListenerQueue(String queueName) { - this.addListenerQueue(queueName, null, null); + public void addListenerQueue(String gatewaySn) { + this.addListenerQueue(String.format("/modbus/device/%s/collect", gatewaySn), null, null); } public void addListenerQueue(String queueName, String exchangeName, String routingKey) { diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessageBackupListener.java similarity index 97% rename from src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java rename to src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessageBackupListener.java index dd604c0..0e52b8b 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessageBackupListener.java @@ -1,4 +1,4 @@ -package com.isu.gaswellwatch.modbus.data.decode.listener; +package com.isu.gaswellwatch.modbus.data.listener; import lombok.extern.slf4j.Slf4j; import org.apache.commons.compress.utils.IOUtils; diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java similarity index 64% rename from src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java rename to src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java index 5ee9e8a..538acf9 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java @@ -1,10 +1,12 @@ -package com.isu.gaswellwatch.modbus.data.decode.listener; +package com.isu.gaswellwatch.modbus.data.listener; import cn.hutool.core.map.MapUtil; import cn.hutool.json.JSONUtil; import com.isu.gaswellwatch.modbus.data.ModbusMessage; import com.isu.gaswellwatch.modbus.data.PersistenceHandler; import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; +import com.isu.gaswellwatch.modbus.data.decode.impl.ComposeDecodeHandler; +import com.isu.gaswellwatch.modbus.data.decode.impl.HighLowBinDecodeHandler; import com.isu.gaswellwatch.modbus.data.decode.impl.LocalDateTimeDecodeHandler; import com.serotonin.modbus4j.code.FunctionCode; import com.serotonin.modbus4j.msg.*; @@ -27,10 +29,7 @@ import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; /** * @author 王仕龙 @@ -49,6 +48,9 @@ public class ModbusMessagePersistListener implements BatchMessageListener { private JdbcTemplate jdbcTemplate; @Resource(name = "stringRedisTemplate") private RedisTemplate redisTemplate; + + @Resource(name = HighLowBinDecodeHandler.NAME + DecodeHandler.DECODE_NAME) + private DecodeHandler highLowBinDecodeHandler; @Resource private Map decodeHandlerMap; @@ -85,12 +87,11 @@ public class ModbusMessagePersistListener implements BatchMessageListener { log.error("指令[{}]不存在,数据: {}", commandId, messageString); continue; } - this.redisTemplate.opsForValue() - .setIfAbsent(COMMAND_CACHE + commandId, JSONUtil.toJsonStr(commandMap), Duration.ofDays(1)); + this.redisTemplate.opsForValue().setIfAbsent(COMMAND_CACHE + commandId, JSONUtil.toJsonStr(commandMap), Duration.ofDays(1)); } try { String address; - int index = MapUtil.getInt(commandMap, "start_address"), stepSize = 0; + int startAddress = MapUtil.getInt(commandMap, "start_address"), index = 0, stepSize = 0; ByteQueue byteQueue = new ByteQueue(collectionMessage); RtuMessageParser masterParser = new RtuMessageParser(true); RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue); @@ -110,21 +111,22 @@ public class ModbusMessagePersistListener implements BatchMessageListener { case FunctionCode.READ_HOLDING_REGISTERS -> { values = ((ReadHoldingRegistersResponse) modbusResponse).getShortData(); } -// case FunctionCode.READ_EXCEPTION_STATUS -> { -// values = new short[]{((ReadExceptionStatusResponse) modbusResponse).getExceptionStatus()}; -// } + case FunctionCode.READ_EXCEPTION_STATUS -> { + values = new short[]{((ReadExceptionStatusResponse) modbusResponse).getExceptionStatus()}; + } default -> { throw new RuntimeException("Funcetion code not supported: " + modbusResponse.getFunctionCode()); } } for (short value : values) { stepSize = index * 4; - messagePointMap.put(StringUtils.leftPad(String.valueOf(index), 4, '0'), + messagePointMap.put(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'), ModbusMessage.MessagePoint.builder() .parseValue(String.valueOf(value)) .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize)) .build()); index++; + startAddress++; } } catch (Exception e) { log.error("初始数据解析异常: {}", messageString, e); @@ -168,11 +170,11 @@ public class ModbusMessagePersistListener implements BatchMessageListener { String deviceDataCacheName = PersistenceHandler.DEVICE_DATA_CACHE + modbusMessage.getDeviceId(); HashOperations hashOperations = this.redisTemplate.opsForHash(); hashOperations.put(deviceDataCacheName, "deviceId", String.valueOf(modbusMessage.getDeviceId())); - hashOperations.put(deviceDataCacheName, "receiveTime", LocalDateTime.ofInstant(Instant - .ofEpochMilli(modbusMessage.getReceiveTime()), ZoneOffset.systemDefault()) + hashOperations.put(deviceDataCacheName, "receiveTime", LocalDateTime + .ofInstant(Instant.ofEpochMilli(modbusMessage.getReceiveTime()), ZoneOffset.systemDefault()) .format(LocalDateTimeDecodeHandler.OUT_FORMATTER)); - hashOperations.put(deviceDataCacheName, "collectionTime", LocalDateTime.ofInstant(Instant - .ofEpochMilli(modbusMessage.getCollectionTime()), ZoneOffset.systemDefault()) + hashOperations.put(deviceDataCacheName, "collectionTime", LocalDateTime + .ofInstant(Instant.ofEpochMilli(modbusMessage.getCollectionTime()), ZoneOffset.systemDefault()) .format(LocalDateTimeDecodeHandler.OUT_FORMATTER)); for (Map point : commandPointList) { fieldName = MapUtil.getStr(point, "field"); @@ -180,13 +182,17 @@ public class ModbusMessagePersistListener implements BatchMessageListener { decodeName = MapUtil.getStr(point, "decode_name"); startAddress = MapUtil.getInt(point, "start_address"); String value; - if (stepSize <= 1) { + if (StringUtils.equals(decodeName, HighLowBinDecodeHandler.NAME)) { + value = decodeHighLowCommandPoint(modbusMessage.getMessagePointMap(), decodeName, point, startAddress); + } else if (StringUtils.contains(decodeName, ",") && StringUtils.startsWith(decodeName, HighLowBinDecodeHandler.NAME)) { + value = decodeHighLowStepCommandPoint(modbusMessage.getMessagePointMap(), decodeName, point, startAddress, stepSize / 2); + } else if (stepSize <= 1) { messagePoint = modbusMessage.getMessagePointMap() .get(StringUtils.leftPad(String.valueOf(startAddress), 4, '0')); decodeMessage(decodeName, point, messagePoint); value = messagePoint.getValue(); } else { - value = decodeCommandPoint(modbusMessage.getMessagePointMap(), decodeName, point, startAddress, stepSize); + value = decodeStepCommandPoint(modbusMessage.getMessagePointMap(), decodeName, point, startAddress, stepSize); } hashOperations.put(deviceDataCacheName, fieldName, value); } @@ -194,15 +200,45 @@ public class ModbusMessagePersistListener implements BatchMessageListener { private void decodeMessage(String decodeName, Map commandPointMap, ModbusMessage.MessagePoint messagePoint) { if (StringUtils.isNotBlank(decodeName)) { - DecodeHandler decodeHandler = this.decodeHandlerMap.get(decodeName + "DecodeHandler"); + DecodeHandler decodeHandler; + if (StringUtils.contains(decodeName, ",")) { + decodeHandler = new ComposeDecodeHandler(Arrays.stream(StringUtils.split(decodeName, ",")) + .map(name -> this.decodeHandlerMap.get(name + DecodeHandler.DECODE_NAME)).filter(Objects::nonNull).toList()); + } else { + decodeHandler = this.decodeHandlerMap.get(decodeName + DecodeHandler.DECODE_NAME); + } if (Objects.nonNull(decodeHandler)) { decodeHandler.decode(commandPointMap, messagePoint); } } } - private String decodeCommandPoint(Map pointMap, String decodeName, - Map commandPointMap, int startAddress, int stepSize) { + private String decodeMessage(String decodeName, Map commandPointMap, String value) { + if (StringUtils.isNotBlank(decodeName)) { + DecodeHandler decodeHandler; + if (StringUtils.contains(decodeName, ",")) { + List handlers = Arrays.stream(StringUtils.split(decodeName, ",")) + .filter(name -> !StringUtils.equals(name, HighLowBinDecodeHandler.NAME)) + .map(name -> this.decodeHandlerMap.get(name + DecodeHandler.DECODE_NAME)) + .filter(Objects::nonNull) + .toList(); + if (handlers.size() == 1) { + decodeHandler = handlers.get(0); + } else { + decodeHandler = new ComposeDecodeHandler(handlers); + } + } else { + decodeHandler = this.decodeHandlerMap.get(decodeName + DecodeHandler.DECODE_NAME); + } + if (Objects.nonNull(decodeHandler)) { + return decodeHandler.decode(commandPointMap, value); + } + } + return value; + } + + private String decodeStepCommandPoint(Map pointMap, String decodeName, + Map commandPointMap, int startAddress, int stepSize) { String[] values = new String[stepSize]; ModbusMessage.MessagePoint messagePoint; for (int i = 0; i < stepSize; i++) { @@ -210,14 +246,36 @@ public class ModbusMessagePersistListener implements BatchMessageListener { values[i] = messagePoint.getValue(); } String format = MapUtil.getStr(commandPointMap, "format"); - String result = StringUtils.isBlank(format) ? StringUtils.join(values) : String.format(format, values); - if (StringUtils.isNotBlank(decodeName)) { - DecodeHandler decodeHandler = this.decodeHandlerMap.get(decodeName + "DecodeHandler"); - if (Objects.nonNull(decodeHandler)) { - return decodeHandler.decode(commandPointMap, result); - } - } - return result; + return decodeMessage(decodeName, commandPointMap, + StringUtils.isBlank(format) ? StringUtils.join(values) : String.format(format, values)); } + private String decodeHighLowCommandPoint(Map pointMap, String decodeName, + Map commandPointMap, int startAddress) { + String[] values = new String[2]; + ModbusMessage.MessagePoint messagePoint; + for (int i = 0; i < 2; i++) { + messagePoint = pointMap.get(StringUtils.leftPad(String.valueOf(startAddress + i), 4, '0')); + values[i] = messagePoint.getOriginalValue(); + } + return highLowBinDecodeHandler.decode(commandPointMap, StringUtils.join(values)); + } + + private String decodeHighLowStepCommandPoint(Map pointMap, String decodeName, + Map commandPointMap, int startAddress, int stepSize) { + String[] values = new String[stepSize]; + ModbusMessage.MessagePoint highPoint; + ModbusMessage.MessagePoint lowPoint; + for (int i = 0; i < stepSize; i++) { + highPoint = pointMap.get(StringUtils.leftPad(String.valueOf(startAddress + i * 2), 4, '0')); + lowPoint = pointMap.get(StringUtils.leftPad(String.valueOf(startAddress + i * 2 + 1), 4, '0')); + values[i] = highLowBinDecodeHandler.decode(commandPointMap, highPoint.getOriginalValue() + lowPoint.getOriginalValue()); + } + + String format = MapUtil.getStr(commandPointMap, "format"); + return decodeMessage(decodeName, commandPointMap, + StringUtils.isBlank(format) ? StringUtils.join(values) : String.format(format, values)); + } + + } diff --git a/src/main/java/com/isu/gaswellwatch/utils/ReverseComplementCodeUtils.java b/src/main/java/com/isu/gaswellwatch/utils/ReverseComplementCodeUtils.java new file mode 100644 index 0000000..7f99a71 --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/utils/ReverseComplementCodeUtils.java @@ -0,0 +1,102 @@ +package com.isu.gaswellwatch.utils; + +/** + * @author 王仕龙 + * 2024/11/25 16:07 + */ +public class ReverseComplementCodeUtils { + + /** + * 原码转反码 + * + * @param source 二进制原码 + * @return + */ + public static String binOriginalToReverse(String source) { + if (source.startsWith("0")) { + return source; + } else { + StringBuffer sbf = new StringBuffer(); + sbf.append("1"); + String f_str = source.substring(1); + for (int i = 0; i < f_str.length(); i++) { + String s_str = String.valueOf(f_str.charAt(i)); + if (s_str.equals("0")) { + sbf.append("1"); + } else if (s_str.equals("1")) { + sbf.append("0"); + } + } + return sbf.toString(); + } + } + + /** + * 反码转补码, 默认补1 + * + * @param reverse 二进制反码字符串 + * @return + */ + public static String binReverseToComplement(String reverse) { + return binReverseToComplement(reverse, "1"); + } + + /** + * 反码转补码 + * + * @param reverse 二进制反码字符串 + * @param complement 二进制补数字符串 + * @return + */ + public static String binReverseToComplement(String reverse, String complement) { + if (reverse.startsWith("0")) { + return reverse; + } + StringBuilder sb = new StringBuilder(); + int x = 0; + int y = 0; + int pre = 0;//进位 + int sum = 0;//存储进位和另两个位的和 + while (reverse.length() != complement.length()) {//将两个二进制的数位数补齐,在短的前面添0 + if (reverse.length() > complement.length()) { + complement = "0" + complement; + } else { + reverse = "0" + reverse; + } + } + for (int i = reverse.length() - 1; i >= 0; i--) { + x = reverse.charAt(i) - '0'; + y = complement.charAt(i) - '0'; + sum = x + y + pre;//从低位做加法 + if (sum >= 2) { + pre = 1;//进位 + sb.append(sum - 2); + } else { + pre = 0; + sb.append(sum); + } + } + if (pre == 1) { + sb.append("1"); + } + // 翻转返回 + return sb.reverse().toString(); + } + + /** + * 补码转十进制 + * + * @param + * @return + */ + + public static String binComplementToDec(String complement) { + String h_number = complement.substring(1); + if (complement.startsWith("0")) { + return "" + Long.valueOf(h_number, 2); + } else if (complement.startsWith("1")) { + return "-" + Long.valueOf(h_number, 2); + } + return null; + } +} diff --git a/src/main/resources/sql/CREATE_KNPCV1.sql b/src/main/resources/sql/CREATE_KNPCV1.sql index 3a4534b..1b60b07 100644 --- a/src/main/resources/sql/CREATE_KNPCV1.sql +++ b/src/main/resources/sql/CREATE_KNPCV1.sql @@ -9,8 +9,8 @@ CREATE TABLE `$TableName$` `run_mode` smallint NULL DEFAULT NULL COMMENT '运行模式:\r\n0:手动模式 \r\n1:定时器模式 \r\n2:计时器模式 \r\n3:压力模式\r\n4:柱塞模式\r\n5:时压模式', `gas_status` smallint NULL DEFAULT NULL COMMENT '气井状态:\r\n0:关闭 \r\n1:打开', `plug_status` smallint NULL DEFAULT NULL COMMENT '柱塞状态:\r\n0:离开\r\n1:上升中\r\n2:到达', - `status_start_time` time NULL DEFAULT NULL COMMENT '当前状态开始时间', - `status_end_time` time NULL DEFAULT NULL COMMENT '当前状态结束时间', + `status_start_time` varchar(10) NULL DEFAULT NULL COMMENT '当前状态开始时间', + `status_end_time` varchar(10) NULL DEFAULT NULL COMMENT '当前状态结束时间', `oil_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '油压:单位Mpa', `cas_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '套压:单位Mpa', `pre_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '输压:单位Mpa', @@ -21,10 +21,10 @@ CREATE TABLE `$TableName$` `op_mode` smallint NULL DEFAULT NULL COMMENT '运行模式:\r\n0: 手动模式 hand_mode\r\n1:定时器模式 timer_mode\r\n2:计时器模式 t2mode\r\n3:压力模式 pressure_mode\r\n4:柱塞模式 piston_mode\r\n5:时压模式 tp_mode', `timer_able1` smallint NULL DEFAULT NULL COMMENT '定时模式定时器1使能:\r\n0:禁止 disable\r\n1:使能 enable', `timer_able2` smallint NULL DEFAULT NULL COMMENT '定时模式定时器2使能:\r\n0:禁止 disable\r\n1:使能 ', - `timer_open1` time NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', - `timer_close1` time NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', - `timer_open2` time NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', - `timer_close2` time NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', + `timer_open1` varchar(10) NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', + `timer_close1` varchar(10) NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', + `timer_open2` varchar(10) NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', + `timer_close2` varchar(10) NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', `timing_open` varchar(10) NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', `timing_close` varchar(10) NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', `timing_delay` varchar(10) NULL DEFAULT NULL COMMENT '时0~23:分0~59:秒0~59', diff --git a/src/main/resources/sql/CREATE_SCSS.sql b/src/main/resources/sql/CREATE_SCSS.sql new file mode 100644 index 0000000..4370088 --- /dev/null +++ b/src/main/resources/sql/CREATE_SCSS.sql @@ -0,0 +1,91 @@ +CREATE TABLE `$TableName$` +( + `id` bigint NOT NULL COMMENT '主键', + `device_id` int NOT NULL COMMENT '设备标识', + `created_time` datetime NOT NULL COMMENT '数据落库时间', + `collection_time` datetime NOT NULL COMMENT '采集指令下发时间', + `receive_time` datetime NOT NULL COMMENT '接收到数据时间', + `cas_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '套压', + `oil_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '油压', + `first_solenoid_status` int NULL DEFAULT NULL COMMENT '当前第一个电磁阀状态', + `battery_voltage` decimal(10, 1) NULL DEFAULT NULL COMMENT '电池电压', + `solar_voltage` decimal(10, 1) NULL DEFAULT NULL COMMENT '太阳能电压', + `remaining_time_action` int NULL DEFAULT NULL COMMENT '动作剩余时间', + `second_solenoid_status` int NULL DEFAULT NULL COMMENT '第二个电磁阀状态', + `pre_transmission` int NULL DEFAULT NULL COMMENT '输压', + `internet_traffic` int NULL DEFAULT NULL COMMENT '流量', + `load_factor` int NULL DEFAULT NULL COMMENT '载荷因子', + `data_time` datetime NULL DEFAULT NULL COMMENT '日期时间', + `show_delay` int NULL DEFAULT NULL COMMENT '显示延时', + `open_well_sampling_interval` int NULL DEFAULT NULL COMMENT '开井采样间隔', + `close_well_sampling_interval` int NULL DEFAULT NULL COMMENT '关井采样间隔', + `ctl_model` int NULL DEFAULT NULL COMMENT '控制模式', + `min_pressure` int NULL DEFAULT NULL COMMENT '套压最小值', + `max_pressure` int NULL DEFAULT NULL COMMENT '套压最大值', + `pressure_min_voltage` int NULL DEFAULT NULL COMMENT '套压最小电压', + `pressure_max_voltage` int NULL DEFAULT NULL COMMENT '套压最大电压', + `oil_min` int NULL DEFAULT NULL COMMENT '油压最小值', + `oil_max` int NULL DEFAULT NULL COMMENT '油压最大值', + `oil_min_voltage` int NULL DEFAULT NULL COMMENT '油压最小电压', + `oil_max_voltage` int NULL DEFAULT NULL COMMENT '油压最大电压', + `input_pressure_min_value` int NULL DEFAULT NULL COMMENT '输压最小值', + `input_pressure_max_value` int NULL DEFAULT NULL COMMENT '输压最大值', + `input_voltage_min_value` int NULL DEFAULT NULL COMMENT '输压最小电压', + `input_voltage_max_value` int NULL DEFAULT NULL COMMENT '输压最大电压', + `flow_rate_min_value` int NULL DEFAULT NULL COMMENT '流量最小值', + `flow_rate_max_value` int NULL DEFAULT NULL COMMENT '流量最大值', + `flow_voltage_min_value` int NULL DEFAULT NULL COMMENT '流量最小电压', + `flow_voltage_max_value` int NULL DEFAULT NULL COMMENT '流量最大电压', + `continuous_sampling_interval_duration` int NULL DEFAULT NULL COMMENT '连续采样间隔', + `sensor_signal_effective_level` int NULL DEFAULT NULL COMMENT '到达传感器有效电平', + `pressure_compensation_polarity_flag` int NULL DEFAULT NULL COMMENT '套压补偿极性', + `pressure_compensation_value_setting` int NULL DEFAULT NULL COMMENT '套压补偿值', + `oil_pressure_compensation_polarity_flag` int NULL DEFAULT NULL COMMENT '油压补偿极性', + `oil_pressure_compensation_value_setting` int NULL DEFAULT NULL COMMENT '油压补偿值', + `input_pressure_compensation_polarity_flag` int NULL DEFAULT NULL COMMENT '输压补偿极性', + `input_pressure_compensation_value_setting` int NULL DEFAULT NULL COMMENT '输压补偿值', + `flow_compensation_polarity_flag` int NULL DEFAULT NULL COMMENT '流量补偿极性', + `flow_compensation_value_setting` int NULL DEFAULT NULL COMMENT '流量补偿值', + `well_open_time_timestamp` int NULL DEFAULT NULL COMMENT '开井时间', + `well_open_pressure_value` decimal(10, 2) NULL DEFAULT NULL COMMENT '开井套压', + `well_open_oil_pressure_value` decimal(10, 2) NULL DEFAULT NULL COMMENT '开井油压', + `well_open_load_factor_presets` decimal(10, 2) NULL DEFAULT NULL COMMENT '开井载荷因子预设值', + `well_close_time_timestamp` int NULL DEFAULT NULL COMMENT '关井时间时间戳', + `well_close_pressure_value` int NULL DEFAULT NULL COMMENT '关井压力值', + `well_close_oil_pressure_value` int NULL DEFAULT NULL COMMENT '关井油压值', + `well_close_flow_value` int NULL DEFAULT NULL COMMENT '关井流量值', + `min_well_open_time_duration` int NULL DEFAULT NULL COMMENT '最小开井时间持续时长', + `max_well_open_time_duration` int NULL DEFAULT NULL COMMENT '最大开井时间持续时长', + `min_well_close_time_duration` int NULL DEFAULT NULL COMMENT '最小关井时间持续时长', + `max_well_close_time_duration` int NULL DEFAULT NULL COMMENT '最大关井时间持续时长', + `pressure_stabilization_duration` int NULL DEFAULT NULL COMMENT '压力稳定持续时长', + `flow_stabilization_duration` int NULL DEFAULT NULL COMMENT '流量稳定持续时长', + `load_factor_stabilization_duration` int NULL DEFAULT NULL COMMENT '载荷因子稳定持续时长', + `plunger_delay_duration` int NULL DEFAULT NULL COMMENT '柱塞延迟时长', + `plunger_rise_duration` int NULL DEFAULT NULL COMMENT '柱塞上升时长', + `continuos_flow_duration` int NULL DEFAULT NULL COMMENT '连续流量持续时长', + `well_close_time_duration` int NULL DEFAULT NULL COMMENT '关井时间持续时长', + `well_close_time_not_reached_duration` int NULL DEFAULT NULL COMMENT '关井时间未达到持续时长', + `well_close_not_reached_count_value` int NULL DEFAULT NULL COMMENT '关井未达到次数值', + `plunger_delay_duration_repeat` int NULL DEFAULT NULL COMMENT '柱塞延迟时长重复次数', + `target_time_timestamp` int NULL DEFAULT NULL COMMENT '目标时间时间戳', + `target_time_range_value` int NULL DEFAULT NULL COMMENT '目标时间范围值', + `continuos_flow_increase_duration` int NULL DEFAULT NULL COMMENT '连续流量增加持续时长', + `continuos_flow_decrease_duration` int NULL DEFAULT NULL COMMENT '连续流量减少持续时长', + `well_close_increase_duration` int NULL DEFAULT NULL COMMENT '关井增加持续时长', + `well_close_decrease_duration` int NULL DEFAULT NULL COMMENT '关井减少持续时长', + `min_well_close_time_duration2` int NULL DEFAULT NULL COMMENT '最小关井时间持续时长', + `max_well_close_time_duration2` int NULL DEFAULT NULL COMMENT '最大关井时间持续时长', + `min_continous_flow_time_duration` int NULL DEFAULT NULL COMMENT '最小连续流量时间持续时长', + `max_continous_flow_time_duration` int NULL DEFAULT NULL COMMENT '最大连续流量时间持续时长', + `min_well_open_time_duration2` int NULL DEFAULT NULL COMMENT '最小开井时间持续时长', + `max_well_open_time_duration2` int NULL DEFAULT NULL COMMENT '最大开井时间持续时长', + `well_open_pressure_value_at_open` int NULL DEFAULT NULL COMMENT '开井时压力值(开井瞬间)', + `well_open_oil_pressure_value_at_open` int NULL DEFAULT NULL COMMENT '开井时油压值(开井瞬间)', + `well_open_load_factor_presets_at_open` int NULL DEFAULT NULL COMMENT '开井时载荷因子预设值(开井瞬间)', + `well_close_pressure_value_at_close` int NULL DEFAULT NULL COMMENT '关井时压力值(关井瞬间)', + `well_close_oil_ppressure_value_at_close` int NULL DEFAULT NULL COMMENT '关井时油压值(关井瞬间)', + `well_close_flow_value_at_close` int NULL DEFAULT NULL COMMENT '关井时流量值(关井瞬间)', + PRIMARY KEY (`id`) USING BTREE, + UNIQUE INDEX `udx_device_create_time` (`device_id` ASC, `collection_time` ASC) USING BTREE COMMENT '设备采集数据唯一键' +) ENGINE = InnoDB COMMENT = '设备ID:$DeviceId$的采集数据' \ No newline at end of file diff --git a/src/main/resources/sql/INSERT_SCSS.sql b/src/main/resources/sql/INSERT_SCSS.sql new file mode 100644 index 0000000..1b13154 --- /dev/null +++ b/src/main/resources/sql/INSERT_SCSS.sql @@ -0,0 +1,120 @@ +INSERT INTO `$TableName$`(`id`, `device_id`, `created_time`, `collection_time`, `receive_time`, `cas_pressure`, + `oil_pressure`, `first_solenoid_status`, `battery_voltage`, `solar_voltage`, + `remaining_time_action`, `second_solenoid_status`, `pre_transmission`, `internet_traffic`, + `load_factor`, `data_time`, `show_delay`, `open_well_sampling_interval`, + `close_well_sampling_interval`, `ctl_model`, `min_pressure`, + `max_pressure`, `pressure_min_voltage`, `pressure_max_voltage`, `oil_min`, `oil_max`, + `oil_min_voltage`, `oil_max_voltage`, `input_pressure_min_value`, `input_pressure_max_value`, + `input_voltage_min_value`, `input_voltage_max_value`, `flow_rate_min_value`, + `flow_rate_max_value`, + `flow_voltage_min_value`, `flow_voltage_max_value`, `continuous_sampling_interval_duration`, + `sensor_signal_effective_level`, `pressure_compensation_polarity_flag`, + `pressure_compensation_value_setting`, + `oil_pressure_compensation_polarity_flag`, `oil_pressure_compensation_value_setting`, + `input_pressure_compensation_polarity_flag`, `input_pressure_compensation_value_setting`, + `flow_compensation_polarity_flag`, `flow_compensation_value_setting`, + `well_open_time_timestamp`, + `well_open_pressure_value`, `well_open_oil_pressure_value`, `well_open_load_factor_presets`, + `well_close_time_timestamp`, `well_close_pressure_value`, `well_close_oil_pressure_value`, + `well_close_flow_value`, `min_well_open_time_duration`, `max_well_open_time_duration`, + `min_well_close_time_duration`, `max_well_close_time_duration`, + `pressure_stabilization_duration`, + `flow_stabilization_duration`, `load_factor_stabilization_duration`, `plunger_delay_duration`, + `plunger_rise_duration`, `continuos_flow_duration`, `well_close_time_duration`, + `well_close_time_not_reached_duration`, `well_close_not_reached_count_value`, + `plunger_delay_duration_repeat`, `target_time_timestamp`, `target_time_range_value`, + `continuos_flow_increase_duration`, `continuos_flow_decrease_duration`, + `well_close_increase_duration`, + `well_close_decrease_duration`, `min_well_close_time_duration2`, + `max_well_close_time_duration2`, + `min_continous_flow_time_duration`, `max_continous_flow_time_duration`, + `min_well_open_time_duration2`, + `max_well_open_time_duration2`, `well_open_pressure_value_at_open`, + `well_open_oil_pressure_value_at_open`, `well_open_load_factor_presets_at_open`, + `well_close_pressure_value_at_close`, `well_close_oil_ppressure_value_at_close`, + `well_close_flow_value_at_close`) +VALUES (?, ?, NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time), + cas_pressure=VALUES(cas_pressure), + oil_pressure=VALUES(oil_pressure), + first_solenoid_status=VALUES(first_solenoid_status), + battery_voltage=VALUES(battery_voltage), + solar_voltage=VALUES(solar_voltage), + remaining_time_action=VALUES(remaining_time_action), + second_solenoid_status=VALUES(second_solenoid_status), + pre_transmission=VALUES(pre_transmission), + internet_traffic=VALUES(internet_traffic), + load_factor=VALUES(load_factor), + data_time=VALUES(data_time), + show_delay=VALUES(show_delay), + open_well_sampling_interval=VALUES(open_well_sampling_interval), + close_well_sampling_interval=VALUES(close_well_sampling_interval), + ctl_model=VALUES(ctl_model), + min_pressure=VALUES(min_pressure), + max_pressure=VALUES(max_pressure), + pressure_min_voltage=VALUES(pressure_min_voltage), + pressure_max_voltage=VALUES(pressure_max_voltage), + oil_min=VALUES(oil_min), + oil_max=VALUES(oil_max), + oil_min_voltage=VALUES(oil_min_voltage), + oil_max_voltage=VALUES(oil_max_voltage), + input_pressure_min_value=VALUES(input_pressure_min_value), + input_pressure_max_value=VALUES(input_pressure_max_value), + input_voltage_min_value=VALUES(input_voltage_min_value), + input_voltage_max_value=VALUES(input_voltage_max_value), + flow_rate_min_value=VALUES(flow_rate_min_value), + flow_rate_max_value=VALUES(flow_rate_max_value), + flow_voltage_min_value=VALUES(flow_voltage_min_value), + flow_voltage_max_value=VALUES(flow_voltage_max_value), + continuous_sampling_interval_duration=VALUES(continuous_sampling_interval_duration), + sensor_signal_effective_level=VALUES(sensor_signal_effective_level), + pressure_compensation_polarity_flag=VALUES(pressure_compensation_polarity_flag), + pressure_compensation_value_setting=VALUES(pressure_compensation_value_setting), + oil_pressure_compensation_polarity_flag=VALUES(oil_pressure_compensation_polarity_flag), + oil_pressure_compensation_value_setting=VALUES(oil_pressure_compensation_value_setting), + input_pressure_compensation_polarity_flag=VALUES(input_pressure_compensation_polarity_flag), + input_pressure_compensation_value_setting=VALUES(input_pressure_compensation_value_setting), + flow_compensation_polarity_flag=VALUES(flow_compensation_polarity_flag), + flow_compensation_value_setting=VALUES(flow_compensation_value_setting), + well_open_time_timestamp=VALUES(well_open_time_timestamp), + well_open_pressure_value=VALUES(well_open_pressure_value), + well_open_oil_pressure_value=VALUES(well_open_oil_pressure_value), + well_open_load_factor_presets=VALUES(well_open_load_factor_presets), + well_close_time_timestamp=VALUES(well_close_time_timestamp), + well_close_pressure_value=VALUES(well_close_pressure_value), + well_close_oil_pressure_value=VALUES(well_close_oil_pressure_value), + well_close_flow_value=VALUES(well_close_flow_value), + min_well_open_time_duration=VALUES(min_well_open_time_duration), + max_well_open_time_duration=VALUES(max_well_open_time_duration), + min_well_close_time_duration=VALUES(min_well_close_time_duration), + max_well_close_time_duration=VALUES(max_well_close_time_duration), + pressure_stabilization_duration=VALUES(pressure_stabilization_duration), + flow_stabilization_duration=VALUES(flow_stabilization_duration), + load_factor_stabilization_duration=VALUES(load_factor_stabilization_duration), + plunger_delay_duration=VALUES(plunger_delay_duration), + plunger_rise_duration=VALUES(plunger_rise_duration), + continuos_flow_duration=VALUES(continuos_flow_duration), + well_close_time_duration=VALUES(well_close_time_duration), + well_close_time_not_reached_duration=VALUES(well_close_time_not_reached_duration), + well_close_not_reached_count_value=VALUES(well_close_not_reached_count_value), + plunger_delay_duration_repeat=VALUES(plunger_delay_duration_repeat), + target_time_timestamp=VALUES(target_time_timestamp), + target_time_range_value=VALUES(target_time_range_value), + continuos_flow_increase_duration=VALUES(continuos_flow_increase_duration), + continuos_flow_decrease_duration=VALUES(continuos_flow_decrease_duration), + well_close_increase_duration=VALUES(well_close_increase_duration), + well_close_decrease_duration=VALUES(well_close_decrease_duration), + min_well_close_time_duration2=VALUES(min_well_close_time_duration2), + max_well_close_time_duration2=VALUES(max_well_close_time_duration2), + min_continous_flow_time_duration=VALUES(min_continous_flow_time_duration), + max_continous_flow_time_duration=VALUES(max_continous_flow_time_duration), + min_well_open_time_duration2=VALUES(min_well_open_time_duration2), + max_well_open_time_duration2=VALUES(max_well_open_time_duration2), + well_open_pressure_value_at_open=VALUES(well_open_pressure_value_at_open), + well_open_oil_pressure_value_at_open=VALUES(well_open_oil_pressure_value_at_open), + well_open_load_factor_presets_at_open=VALUES(well_open_load_factor_presets_at_open), + well_close_pressure_value_at_close=VALUES(well_close_pressure_value_at_close), + well_close_oil_ppressure_value_at_close=VALUES(well_close_oil_ppressure_value_at_close), + well_close_flow_value_at_close=VALUES(well_close_flow_value_at_close) diff --git a/src/test/java/com/isu/gaswellwatch/ModbusTest.java b/src/test/java/com/isu/gaswellwatch/ModbusTest.java new file mode 100644 index 0000000..9efbc27 --- /dev/null +++ b/src/test/java/com/isu/gaswellwatch/ModbusTest.java @@ -0,0 +1,64 @@ +package com.isu.gaswellwatch; + +import com.serotonin.modbus4j.code.FunctionCode; +import com.serotonin.modbus4j.msg.*; +import com.serotonin.modbus4j.serial.rtu.RtuMessageParser; +import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse; +import com.serotonin.modbus4j.sero.util.queue.ByteQueue; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +public class ModbusTest { + + public static void main(String[] args) throws Exception { + test(); + } + + public static void test() throws Exception { + // "01 03 40 0000 0960 0000 0258 0000 0258 0000 0064 000299A0 0000012C0000012C0000012C0000070800002A3000000E1000007E900000000A0000000A0000000A0000000044D0" + String collectionMessage = "01034000000960000002580000025800000064000299A00000012C0000012C0000012C0000070800002A3000000E1000007E900000000A0000000A0000000A0000000044D0"; + + ByteQueue byteQueue = new ByteQueue(collectionMessage); + RtuMessageParser masterParser = new RtuMessageParser(true); + RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue); + ModbusResponse modbusResponse = response.getModbusResponse(); + + short[] values = null; + switch (modbusResponse.getFunctionCode()) { + case FunctionCode.READ_COILS -> { + values = ((ReadCoilsResponse) modbusResponse).getShortData(); + } + case FunctionCode.READ_DISCRETE_INPUTS -> { + values = ((ReadDiscreteInputsResponse) modbusResponse).getShortData(); + } + case FunctionCode.READ_INPUT_REGISTERS -> { + values = ((ReadInputRegistersResponse) modbusResponse).getShortData(); + } + case FunctionCode.READ_HOLDING_REGISTERS -> { + values = ((ReadHoldingRegistersResponse) modbusResponse).getShortData(); + } + case FunctionCode.READ_EXCEPTION_STATUS -> { + values = new short[]{((ReadExceptionStatusResponse) modbusResponse).getExceptionStatus()}; + } + default -> { + throw new RuntimeException("Funcetion code not supported: " + modbusResponse.getFunctionCode()); + } + } + String address; + int startAddress = 150, index = 0, stepSize = 0; + Map messagePointMap = new HashMap<>(); + for (short value : values) { + stepSize = index * 4; + address = StringUtils.leftPad(String.valueOf(startAddress), 4, '0'); + System.out.println(address + ": " + + StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize) + + " -> " + String.valueOf(value)); + startAddress++; + index++; + } + + System.out.println(); + } +}