diff --git a/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java b/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java index c9c8dc8..1ce43aa 100644 --- a/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java +++ b/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java @@ -1,25 +1,43 @@ package com.iot.modbus_rtcp.config; -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; +import cn.hutool.core.map.MapUtil; +import com.iot.modbus_rtcp.utils.HexUtil; +import lombok.RequiredArgsConstructor; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; -import java.util.HashSet; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -@Configuration -@Data -@ConfigurationProperties(prefix = "server.netty") +@Service +@RequiredArgsConstructor public class EquipmentIPProperties { - private Map identifiers; - - public HashSet keys() { - return new HashSet<>(this.identifiers.keySet()); - } + private final JdbcTemplate jdbcTemplate; + private final Map identifiers = new ConcurrentHashMap<>(); public String get(String key) { return this.identifiers.get(key); } + public String put(String gatewaySn) { + String gatewayHeartbeat = HexUtil.bytesToHexString(gatewaySn.getBytes()).toUpperCase(); + this.identifiers.put(gatewayHeartbeat, gatewaySn); + return gatewayHeartbeat; + } + + public boolean contains(String gatewayHeartbeat) { + if (this.identifiers.containsKey(gatewayHeartbeat)) { + return true; + } + gatewayHeartbeat = gatewayHeartbeat.toUpperCase(); + String gatewaySn = new String(HexUtil.hexStringToBytes(gatewayHeartbeat)); + Map countMap = this.jdbcTemplate.queryForMap("select count(*) as ctn from device where gateway_sn = '" + gatewaySn + "'"); + if (MapUtil.getInt(countMap, "ctn") > 0) { + this.identifiers.put(gatewayHeartbeat, gatewaySn); + return true; + } + return false; + } + } diff --git a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java index 9d3ebfa..6906a25 100644 --- a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java +++ b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java @@ -1,6 +1,7 @@ package com.iot.modbus_rtcp.jobs; import cn.hutool.core.map.MapUtil; +import com.iot.modbus_rtcp.config.EquipmentIPProperties; import com.iot.modbus_rtcp.controller.ModbusTCPController; import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.ModbusCommandDto; @@ -64,6 +65,7 @@ public class AutoCollectJobs { private final JdbcTemplate jdbcTemplate; private final ModbusTCPController controller; + private final EquipmentIPProperties equipmentIPProperties; private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors()) , Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5000), new ThreadFactory() { @@ -91,11 +93,14 @@ public class AutoCollectJobs { } resultList.stream() .filter(item -> { - String identifier = MapUtil.getStr(item, "identifier"); - if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup().get(identifier))) { + String gatewaySn = MapUtil.getStr(item, "identifier"); + String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn); + if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup() + .get(gatewayIdentifier))) { + item.put("identifier", gatewayIdentifier); return true; } - log.warn("Gateway {} is disconnected and does not collect data", identifier); + log.warn("Gateway {} is disconnected and does not collect data", gatewaySn); return false; }) .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "deviceId"))) diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java index 81513a8..88d5ce7 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java @@ -14,20 +14,16 @@ import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.core.RabbitAdmin; import java.net.InetSocketAddress; -import java.util.HashSet; import java.util.List; -import java.util.Objects; @Slf4j public class ModbusDecoder extends ByteToMessageDecoder { private final ChannelGroup channelGroup; - private final HashSet identityList; private final EquipmentIPProperties equipmentIPProperties; public ModbusDecoder(ChannelGroup channelGroup) { this.channelGroup = channelGroup; this.equipmentIPProperties = SpringUtil.getBean(EquipmentIPProperties.class); - this.identityList = this.equipmentIPProperties.keys(); } @Override @@ -41,32 +37,28 @@ public class ModbusDecoder extends ByteToMessageDecoder { String hex = HexUtil.bytesToHexString(b); log.info("解析到:{}", hex); - if (this.identityList.contains(hex.toUpperCase())) { // 心跳 + if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳 String msgUpperCase = hex.toUpperCase(); - for (String identity : this.identityList) { - if (Objects.equals(identity, msgUpperCase)) { - log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase); - String currentAddress = ChannelGroup.getKey(ip, port); - SyncPriorityChannel channel = this.channelGroup.get(currentAddress); - channel.setIdentifier(msgUpperCase); - this.channelGroup.put(msgUpperCase, channel); - SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(channel.getCollectQueue()).build()); + log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase); + String currentAddress = ChannelGroup.getKey(ip, port); + SyncPriorityChannel channel = this.channelGroup.get(currentAddress); + channel.setIdentifier(msgUpperCase); + this.channelGroup.put(msgUpperCase, channel); + SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(channel.getCollectQueue()).build()); - IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase)); - String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase); - if (StringUtils.isNotBlank(currentAddress) && - StringUtils.isNotBlank(oldAddress) && - !currentAddress.equals(oldAddress)) { - //说明设备重连后IP+端口发生了改变 - String[] split = oldAddress.split(":"); - //删除老连接 - this.channelGroup.remove(split[0], Integer.parseInt(split[1])); - log.info("删除channelGroup通道:{}连接:{},新增该通道连接:{}", this.equipmentIPProperties.get(msgUpperCase), oldAddress, currentAddress); - } - IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress); - return; - } + IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase)); + String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase); + if (StringUtils.isNotBlank(currentAddress) && + StringUtils.isNotBlank(oldAddress) && + !currentAddress.equals(oldAddress)) { + //说明设备重连后IP+端口发生了改变 + String[] split = oldAddress.split(":"); + //删除老连接 + this.channelGroup.remove(split[0], Integer.parseInt(split[1])); + log.info("删除channelGroup通道:{}连接:{},新增该通道连接:{}", this.equipmentIPProperties.get(msgUpperCase), oldAddress, currentAddress); } + IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress); + return; } SyncPriorityChannel channel = this.channelGroup.get(ip, port); diff --git a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java index 40e4877..5f47cee 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java @@ -2,6 +2,7 @@ package com.iot.modbus_rtcp.netty; import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.ModbusCommandDto; +import com.iot.modbus_rtcp.utils.HexUtil; import io.netty.channel.ChannelPromise; import io.netty.channel.socket.SocketChannel; import lombok.Getter; @@ -140,6 +141,7 @@ public class SyncPriorityChannel implements Runnable { } public void setCollectQueue() { - this.collectQueue = "/modbus/device/" + this.identifier + "/collect"; +// this.collectQueue = "/modbus/device/" + this.identifier + "/collect"; + this.collectQueue = "/modbus/collect/" + HexUtil.hashPartition(this.identifier, 10); } } diff --git a/src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java b/src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java index c7bb3a3..ffa3b83 100644 --- a/src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java +++ b/src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java @@ -22,6 +22,10 @@ public class HexUtil { return sb.toString().trim().toUpperCase(); } + public static int hashPartition(String key, int partition) { + return Math.abs(key.hashCode() % partition); + } + public static void main(String[] args) { System.out.println(hexStringToBytes("0D")); System.out.println(new byte[]{(byte) Integer.parseInt("0D", 16)}); diff --git a/src/test/java/com/iot/modbus_rtcp/GatewayTest.java b/src/test/java/com/iot/modbus_rtcp/GatewayTest.java new file mode 100644 index 0000000..d02d12c --- /dev/null +++ b/src/test/java/com/iot/modbus_rtcp/GatewayTest.java @@ -0,0 +1,40 @@ +package com.iot.modbus_rtcp; + +import com.iot.modbus_rtcp.utils.HexUtil; +import org.junit.jupiter.api.Test; + +/** + * @author 王仕龙 + * 2024/11/28 18:52 + */ +public class GatewayTest { + + @Test + public void testConverterHeartbeat() { + String s1 = "4B454E454E4731343030303030333538"; + String s2 = "KENENG1400000358"; + System.out.println(new String(HexUtil.hexStringToBytes(s1))); + System.out.println(HexUtil.bytesToHexString(s2.getBytes())); + + + String hex = "333538"; + int decimal = 0; + int power = 0; + for (int i = hex.length() - 1; i >= 0; i--) { + char digit = hex.charAt(i); + if (digit >= '0' && digit <= '9') { + decimal += (digit - '0') * Math.pow(16, power); + } else if (digit >= 'A' && digit <= 'F') { + decimal += (digit - 'A' + 10) * Math.pow(16, power); + } else if (digit >= 'a' && digit <= 'f') { + decimal += (digit - 'a' + 10) * Math.pow(16, power); + } else { + throw new IllegalArgumentException("Invalid hex digit: " + digit); + } + power++; + } + System.out.println(decimal); + } + + +} diff --git a/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java b/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java index 5510ad1..39982c8 100644 --- a/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java +++ b/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java @@ -1,5 +1,7 @@ package com.iot.modbus_rtcp; +import com.iot.modbus_rtcp.utils.HexUtil; + import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -15,7 +17,7 @@ import java.util.concurrent.locks.LockSupport; public class NonBlockingSocketTest { private static final String HOST = "127.0.0.1"; - private static final Integer PORT = 1200; + private static final Integer PORT = 502; public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(); @@ -48,7 +50,7 @@ public class NonBlockingSocketTest { lastSentHeartBeatTime = nowTime; socketChannel.write(heartBeatBuffer); } - LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500)); + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); continue; } String line = byteBufferToHexString(readBuffer).trim(); @@ -56,27 +58,21 @@ public class NonBlockingSocketTest { System.out.println(LocalDateTime.now() + "<==:收到服务器端请求:" + line); - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); - // 接收到请求 - switch (line.toUpperCase()) { - case "01040000001531C5" -> - // 发送响应字符串 - socketChannel.write(ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes())); + switch (line.toUpperCase().substring(0, line.length() - 4)) { + case "010200140004" -> + socketChannel.write(ByteBuffer.wrap(HexUtil.hexStringToBytes("01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066"))); + case "0103006A002A" -> + socketChannel.write(ByteBuffer.wrap(HexUtil.hexStringToBytes("0103A400010000000000000000000000000000000000000001000100000000000A0005001E000B001E00000011002D00000012000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001001E00000017001E00000000000100230000000003E800640000000A0000000000000000000003E7003B003B00000001000103E7003B003B0000000100019014"))); + case "0104000A001A" -> + socketChannel.write(ByteBuffer.wrap(HexUtil.hexStringToBytes("01043400020016002B0034000100070001000F001E0016000A00000000000000000000000000000000000003080000153D000000001365B5E3"))); // 接收到开井请求 - case "01050001FF00DDFA" -> - // 发送响应字符串 - socketChannel.write(ByteBuffer.wrap("01050001FF00DDFA".getBytes())); - + case "01050000FF00" -> + socketChannel.write(ByteBuffer.wrap(HexUtil.hexStringToBytes("01050000FF00DDFA"))); // 接收到关井请求 - case "0105000100009C0A" -> - // 发送响应字符串 - socketChannel.write(ByteBuffer.wrap("0105000100009C0A".getBytes())); + case "01050001FF00" -> + socketChannel.write(ByteBuffer.wrap(HexUtil.hexStringToBytes("01050001FF00DDFA"))); - // 接收到读取运行模式请求 - case "010300640001C5D5" -> - // 发送响应字符串 - socketChannel.write(ByteBuffer.wrap("01 03 02 00 03 F8 45".replaceAll(" ", "").getBytes())); // 接收到退出请求 case "exit" -> socketChannel.close(); }