diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java index e6a8118..1a5414e 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java @@ -30,7 +30,7 @@ public interface PersistenceHandler { void createTable(String tableName, Long deviceId); - void insert(String tableName, String cacheKey); + Map insert(String tableName, String cacheKey); default void setValue(PreparedStatement ps, Map row, int index, String key, int sqlType) throws SQLException { String value = MapUtil.getStr(row, key); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java index 0c80e1b..c4a6f09 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java @@ -1,11 +1,14 @@ package com.isu.gaswellwatch.modbus.data; import cn.hutool.core.map.MapUtil; +import cn.hutool.json.JSONUtil; import com.isu.gaswellwatch.entity.Response; +import com.isu.gaswellwatch.modbus.data.listener.Queues; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.core.ParameterizedTypeReference; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.HashOperations; @@ -34,23 +37,23 @@ import java.util.stream.Collectors; public class Redis2DBPersistenceService { public static final String DEFAULT_DATA_TABLE = "t_device_data_"; - private static final String DEVICE_INFO_SQL = "SELECT * from device where id = "; - private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA" - + " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'"; + private static final String DEVICE_INFO_SQL = "SELECT d.*, dp.code as modbus_device_product_code from device d " + "join dictionary dp on dp.id = d.product where id = "; + private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA" + " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'"; @Resource private JdbcTemplate jdbcTemplate; @Resource(name = "stringRedisTemplate") private RedisTemplate redisTemplate; @Resource + private RabbitTemplate rabbitTemplate; + @Resource private Map persistenceHandlerMap; - private RestTemplate restTemplate = new RestTemplate(); + private final RestTemplate restTemplate = new RestTemplate(); @Scheduled(cron = "20/30 * * * * ? ") public void write() { Map idGatewayMappingMap = getOnlineGateway(); HashOperations operations = this.redisTemplate.opsForHash(); - try (Cursor cursor = this.redisTemplate.scan(ScanOptions.scanOptions() - .match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) { + try (Cursor cursor = this.redisTemplate.scan(ScanOptions.scanOptions().match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) { Long deviceId; String cacheKey, tableName; Map deviceMap; @@ -66,27 +69,35 @@ public class Redis2DBPersistenceService { continue; } if (Objects.nonNull(idGatewayMappingMap)) { - operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", - String.valueOf(idGatewayMappingMap.containsKey(deviceId))); + operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", String.valueOf(idGatewayMappingMap.containsKey(deviceId))); } - String modbusDeviceTypeId = (String) operations.get(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "product"); - if (StringUtils.isEmpty(modbusDeviceTypeId)) { + String modbusDeviceProductCode = (String) operations.get(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code"); + if (StringUtils.isEmpty(modbusDeviceProductCode)) { Map deviceInfo = this.jdbcTemplate.queryForMap(DEVICE_INFO_SQL + deviceId); - Map cacheDeviceInfo = new HashMap<>(deviceInfo.size()); - deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value))); - modbusDeviceTypeId = (String) cacheDeviceInfo.get("product"); - operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo); + if (ObjectUtils.isNotEmpty(deviceInfo)) { + Map cacheDeviceInfo = new HashMap<>(deviceInfo.size()); + deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value))); + modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code"); + operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo); + } } - - persistenceHandler = persistenceHandlerMap.get(ModbusDeviceTypeEnum.getCodeById(modbusDeviceTypeId)); + persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); tableName = DEFAULT_DATA_TABLE + deviceId; existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); - if (ObjectUtils.isEmpty(existsTableList) - || ObjectUtils.isEmpty(existsTableList.get(0)) - || StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) { + if (ObjectUtils.isEmpty(existsTableList) || ObjectUtils.isEmpty(existsTableList.get(0)) || StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) { persistenceHandler.createTable(tableName, deviceId); } - persistenceHandler.insert(tableName, cacheKey); + Map rowData = persistenceHandler.insert(tableName, cacheKey); + if (ObjectUtils.isNotEmpty(rowData)) { + String rowDataJson = JSONUtil.toJsonStr(rowData); + String queueName = String.format(Queues.DEVICE_EVENTS, deviceId); + try { + log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson); + this.rabbitTemplate.convertAndSend(queueName, rowDataJson); + } catch (Exception e) { + log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e); + } + } } } } @@ -99,9 +110,7 @@ public class Redis2DBPersistenceService { if (Objects.isNull(response) || Objects.isNull(response.getBody()) || ObjectUtils.isEmpty(response.getBody().getData())) { return null; } - List> idGatewayMappingList = this.jdbcTemplate.queryForList("select id, gateway_sn" + - " from device where gateway_sn in (" + - response.getBody().getData().stream().map(value -> "'" + value + "'").collect(Collectors.joining(",")) + ")"); + List> idGatewayMappingList = this.jdbcTemplate.queryForList("select id, gateway_sn" + " from device where gateway_sn in (" + response.getBody().getData().stream().map(value -> "'" + value + "'").collect(Collectors.joining(",")) + ")"); if (ObjectUtils.isEmpty(idGatewayMappingList)) { return null; } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java index b00e3df..86ceb4d 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java @@ -37,7 +37,7 @@ public class EtcPersistenceHandler implements PersistenceHandler { } @Override - public void insert(String tableName, String cacheKey) { + public Map insert(String tableName, String cacheKey) { String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_ETC.sql"); insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName); @@ -102,5 +102,6 @@ public class EtcPersistenceHandler implements PersistenceHandler { return ps.executeUpdate(); } }); + return row; } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Knpcv1PersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Knpcv1PersistenceHandler.java index 85da219..4aa6503 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Knpcv1PersistenceHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Knpcv1PersistenceHandler.java @@ -37,7 +37,7 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler { } @Override - public void insert(String tableName, String cacheKey) { + public Map insert(String tableName, String cacheKey) { String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_KNPCV1.sql"); insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName); @@ -102,5 +102,6 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler { return ps.executeUpdate(); } }); + return row; } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java index 8654968..0920205 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java @@ -37,7 +37,7 @@ public class ScssPersistenceHandler implements PersistenceHandler { } @Override - public void insert(String tableName, String cacheKey) { + public Map insert(String tableName, String cacheKey) { String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_SCSS.sql"); insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName); @@ -134,5 +134,6 @@ public class ScssPersistenceHandler implements PersistenceHandler { return ps.executeUpdate(); } }); + return row; } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java index 39ab1f6..8dab454 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java @@ -38,17 +38,17 @@ public class DynamicRabbitListener implements ApplicationRunner { } @Override - public void run(ApplicationArguments args) throws Exception { + public void run(ApplicationArguments args) { this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener()); this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener()); this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item -> { - this.addListenerQueue(String.format("/modbus/device/%s/collect", MapUtil.getStr(item, "identifier")), null, null); + this.addListenerQueue(String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(item, "identifier")), null, null); }); } public void addListenerQueue(String gatewaySn) { - this.addListenerQueue(String.format("/modbus/device/%s/collect", gatewaySn), null, null); + this.addListenerQueue(String.format(Queues.MODBUS_COLLECT_DATA, gatewaySn), null, null); } public void addListenerQueue(String queueName, String exchangeName, String routingKey) { diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java index 538acf9..dd23f43 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java @@ -169,6 +169,7 @@ public class ModbusMessagePersistListener implements BatchMessageListener { ModbusMessage.MessagePoint messagePoint; String deviceDataCacheName = PersistenceHandler.DEVICE_DATA_CACHE + modbusMessage.getDeviceId(); HashOperations hashOperations = this.redisTemplate.opsForHash(); + hashOperations.put(deviceDataCacheName, "gatewaySn", modbusMessage.getGatewayIdentifier()); hashOperations.put(deviceDataCacheName, "deviceId", String.valueOf(modbusMessage.getDeviceId())); hashOperations.put(deviceDataCacheName, "receiveTime", LocalDateTime .ofInstant(Instant.ofEpochMilli(modbusMessage.getReceiveTime()), ZoneOffset.systemDefault()) diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java new file mode 100644 index 0000000..bb3d13e --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java @@ -0,0 +1,11 @@ +package com.isu.gaswellwatch.modbus.data.listener; + +/** + * @author 王仕龙 + * 2024/11/26 12:37 + */ +public final class Queues { + + public static final String DEVICE_EVENTS = "/device/%s/events"; + public static final String MODBUS_COLLECT_DATA = "/modbus/device/%s/collect"; +} diff --git a/src/main/java/com/isu/gaswellwatch/utils/ReverseComplementCodeUtils.java b/src/main/java/com/isu/gaswellwatch/utils/ReverseComplementCodeUtils.java index 7f99a71..18e4e9c 100644 --- a/src/main/java/com/isu/gaswellwatch/utils/ReverseComplementCodeUtils.java +++ b/src/main/java/com/isu/gaswellwatch/utils/ReverseComplementCodeUtils.java @@ -1,5 +1,7 @@ package com.isu.gaswellwatch.utils; +import java.util.Objects; + /** * @author 王仕龙 * 2024/11/25 16:07 @@ -7,7 +9,9 @@ package com.isu.gaswellwatch.utils; public class ReverseComplementCodeUtils { /** - * 原码转反码 + * 原码转反码
+ * 正数的原码,反码,补码都一样(三码合一), 0的反码,补码都是0
+ * 负数的反码=它的原码符号不变,其他位取反(0 => 1 1 => 0) * * @param source 二进制原码 * @return @@ -16,7 +20,7 @@ public class ReverseComplementCodeUtils { if (source.startsWith("0")) { return source; } else { - StringBuffer sbf = new StringBuffer(); + StringBuilder sbf = new StringBuilder(source.length()); sbf.append("1"); String f_str = source.substring(1); for (int i = 0; i < f_str.length(); i++) { @@ -32,7 +36,8 @@ public class ReverseComplementCodeUtils { } /** - * 反码转补码, 默认补1 + * 反码转补码, 默认补1
+ * 负数的补码 = 它的反码 + 1,负数的反码 = 负数的补码 - 1 * * @param reverse 二进制反码字符串 * @return @@ -52,7 +57,7 @@ public class ReverseComplementCodeUtils { if (reverse.startsWith("0")) { return reverse; } - StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder(reverse.length()); int x = 0; int y = 0; int pre = 0;//进位 @@ -89,14 +94,32 @@ public class ReverseComplementCodeUtils { * @param * @return */ - public static String binComplementToDec(String complement) { - String h_number = complement.substring(1); - if (complement.startsWith("0")) { - return "" + Long.valueOf(h_number, 2); - } else if (complement.startsWith("1")) { - return "-" + Long.valueOf(h_number, 2); + String binNumber = complement.substring(1); + String firstNumber = complement.substring(0, 1); + if (Objects.equals(firstNumber, "0")) { + return "" + Long.valueOf(binNumber, 2); + } else if (Objects.equals(firstNumber, "1")) { + return "-" + Long.valueOf(binNumber, 2); } return null; } + + public static void main(String[] args) { +// String hexNumber = "F7"; // 16进制负数表示,等同于-9 + String hexNumber = "F6"; // 16进制负数表示,等同于-10 +// String hexNumber = "E6"; // 16进制负数表示,等同于-26 + int decimalNumber = Integer.parseInt(hexNumber, 16); + + String binaryNumber = Integer.toBinaryString(decimalNumber); + System.out.println("有符号16进制数:" + hexNumber + ",对应的2进制数为:" + binaryNumber); + + binaryNumber = binOriginalToReverse(binaryNumber); + System.out.println("有符号16进制数:" + hexNumber + ",对应反码2进制为:" + binaryNumber + ",值为:" + Integer.parseInt(binaryNumber, 2)); + + binaryNumber = binReverseToComplement(binaryNumber, "1"); + System.out.println("有符号16进制数:" + hexNumber + ",对应反码的补码为:" + binaryNumber + ",值为:" + Integer.parseInt(binaryNumber, 2)); + + System.out.println("有符号16进制数:" + hexNumber + ",对应的10进制数为:" + binComplementToDec(binaryNumber)); + } }