diff --git a/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java b/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java index c9c8dc8..1ce43aa 100644 --- a/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java +++ b/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java @@ -1,25 +1,43 @@ package com.iot.modbus_rtcp.config; -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; +import cn.hutool.core.map.MapUtil; +import com.iot.modbus_rtcp.utils.HexUtil; +import lombok.RequiredArgsConstructor; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; -import java.util.HashSet; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -@Configuration -@Data -@ConfigurationProperties(prefix = "server.netty") +@Service +@RequiredArgsConstructor public class EquipmentIPProperties { - private Map identifiers; - - public HashSet keys() { - return new HashSet<>(this.identifiers.keySet()); - } + private final JdbcTemplate jdbcTemplate; + private final Map identifiers = new ConcurrentHashMap<>(); public String get(String key) { return this.identifiers.get(key); } + public String put(String gatewaySn) { + String gatewayHeartbeat = HexUtil.bytesToHexString(gatewaySn.getBytes()).toUpperCase(); + this.identifiers.put(gatewayHeartbeat, gatewaySn); + return gatewayHeartbeat; + } + + public boolean contains(String gatewayHeartbeat) { + if (this.identifiers.containsKey(gatewayHeartbeat)) { + return true; + } + gatewayHeartbeat = gatewayHeartbeat.toUpperCase(); + String gatewaySn = new String(HexUtil.hexStringToBytes(gatewayHeartbeat)); + Map countMap = this.jdbcTemplate.queryForMap("select count(*) as ctn from device where gateway_sn = '" + gatewaySn + "'"); + if (MapUtil.getInt(countMap, "ctn") > 0) { + this.identifiers.put(gatewayHeartbeat, gatewaySn); + return true; + } + return false; + } + } diff --git a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java index 9d3ebfa..6906a25 100644 --- a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java +++ b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java @@ -1,6 +1,7 @@ package com.iot.modbus_rtcp.jobs; import cn.hutool.core.map.MapUtil; +import com.iot.modbus_rtcp.config.EquipmentIPProperties; import com.iot.modbus_rtcp.controller.ModbusTCPController; import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.ModbusCommandDto; @@ -64,6 +65,7 @@ public class AutoCollectJobs { private final JdbcTemplate jdbcTemplate; private final ModbusTCPController controller; + private final EquipmentIPProperties equipmentIPProperties; private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors()) , Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5000), new ThreadFactory() { @@ -91,11 +93,14 @@ public class AutoCollectJobs { } resultList.stream() .filter(item -> { - String identifier = MapUtil.getStr(item, "identifier"); - if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup().get(identifier))) { + String gatewaySn = MapUtil.getStr(item, "identifier"); + String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn); + if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup() + .get(gatewayIdentifier))) { + item.put("identifier", gatewayIdentifier); return true; } - log.warn("Gateway {} is disconnected and does not collect data", identifier); + log.warn("Gateway {} is disconnected and does not collect data", gatewaySn); return false; }) .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "deviceId"))) diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java index 81513a8..88d5ce7 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java @@ -14,20 +14,16 @@ import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.core.RabbitAdmin; import java.net.InetSocketAddress; -import java.util.HashSet; import java.util.List; -import java.util.Objects; @Slf4j public class ModbusDecoder extends ByteToMessageDecoder { private final ChannelGroup channelGroup; - private final HashSet identityList; private final EquipmentIPProperties equipmentIPProperties; public ModbusDecoder(ChannelGroup channelGroup) { this.channelGroup = channelGroup; this.equipmentIPProperties = SpringUtil.getBean(EquipmentIPProperties.class); - this.identityList = this.equipmentIPProperties.keys(); } @Override @@ -41,32 +37,28 @@ public class ModbusDecoder extends ByteToMessageDecoder { String hex = HexUtil.bytesToHexString(b); log.info("解析到:{}", hex); - if (this.identityList.contains(hex.toUpperCase())) { // 心跳 + if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳 String msgUpperCase = hex.toUpperCase(); - for (String identity : this.identityList) { - if (Objects.equals(identity, msgUpperCase)) { - log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase); - String currentAddress = ChannelGroup.getKey(ip, port); - SyncPriorityChannel channel = this.channelGroup.get(currentAddress); - channel.setIdentifier(msgUpperCase); - this.channelGroup.put(msgUpperCase, channel); - SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(channel.getCollectQueue()).build()); + log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase); + String currentAddress = ChannelGroup.getKey(ip, port); + SyncPriorityChannel channel = this.channelGroup.get(currentAddress); + channel.setIdentifier(msgUpperCase); + this.channelGroup.put(msgUpperCase, channel); + SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(channel.getCollectQueue()).build()); - IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase)); - String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase); - if (StringUtils.isNotBlank(currentAddress) && - StringUtils.isNotBlank(oldAddress) && - !currentAddress.equals(oldAddress)) { - //说明设备重连后IP+端口发生了改变 - String[] split = oldAddress.split(":"); - //删除老连接 - this.channelGroup.remove(split[0], Integer.parseInt(split[1])); - log.info("删除channelGroup通道:{}连接:{},新增该通道连接:{}", this.equipmentIPProperties.get(msgUpperCase), oldAddress, currentAddress); - } - IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress); - return; - } + IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase)); + String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase); + if (StringUtils.isNotBlank(currentAddress) && + StringUtils.isNotBlank(oldAddress) && + !currentAddress.equals(oldAddress)) { + //说明设备重连后IP+端口发生了改变 + String[] split = oldAddress.split(":"); + //删除老连接 + this.channelGroup.remove(split[0], Integer.parseInt(split[1])); + log.info("删除channelGroup通道:{}连接:{},新增该通道连接:{}", this.equipmentIPProperties.get(msgUpperCase), oldAddress, currentAddress); } + IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress); + return; } SyncPriorityChannel channel = this.channelGroup.get(ip, port); diff --git a/src/test/java/com/iot/modbus_rtcp/GatewayTest.java b/src/test/java/com/iot/modbus_rtcp/GatewayTest.java new file mode 100644 index 0000000..4cd1449 --- /dev/null +++ b/src/test/java/com/iot/modbus_rtcp/GatewayTest.java @@ -0,0 +1,20 @@ +package com.iot.modbus_rtcp; + +import com.iot.modbus_rtcp.utils.HexUtil; +import org.junit.jupiter.api.Test; + +/** + * @author 王仕龙 + * 2024/11/28 18:52 + */ +public class GatewayTest { + + @Test + public void testConverterHeartbeat() { + String s1 = "4B454E454E4731343030303030333538"; + String s2 = "KENENG1400000358"; + System.out.println(new String(HexUtil.hexStringToBytes(s1))); + System.out.println(HexUtil.bytesToHexString(s2.getBytes())); + } + +} diff --git a/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java b/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java index 5510ad1..a1b8dc0 100644 --- a/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java +++ b/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java @@ -15,7 +15,7 @@ import java.util.concurrent.locks.LockSupport; public class NonBlockingSocketTest { private static final String HOST = "127.0.0.1"; - private static final Integer PORT = 1200; + private static final Integer PORT = 502; public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(); @@ -56,11 +56,9 @@ public class NonBlockingSocketTest { System.out.println(LocalDateTime.now() + "<==:收到服务器端请求:" + line); - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); - // 接收到请求 switch (line.toUpperCase()) { - case "01040000001531C5" -> + case "01040000001671C4" -> // 发送响应字符串 socketChannel.write(ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes())); // 接收到开井请求