diff --git a/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java b/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java index f00cddd..1280aab 100644 --- a/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java +++ b/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java @@ -33,7 +33,7 @@ public class EquipmentIPProperties { if (this.identifiers.containsKey(gatewayHeartbeat)) { return true; } - if (gatewayHeartbeat.length() > 80) { + if (gatewayHeartbeat.length() > 100) { return false; } // 每分钟从数据库入库一次 diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java b/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java index 617257f..5a30fd5 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java @@ -3,15 +3,17 @@ package com.iot.modbus_rtcp.netty; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import java.util.ArrayList; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @Slf4j public class ChannelGroup { - private final ConcurrentHashMap mChannelMap = new ConcurrentHashMap<>(); + private final AtomicLong lastCleanupChannel = new AtomicLong(0); + private final ConcurrentHashMap mChannelMap = new ConcurrentHashMap<>(512); public void put(String identity, SyncPriorityChannel channel) { this.mChannelMap.put(identity, channel); @@ -25,6 +27,36 @@ public class ChannelGroup { return this.mChannelMap.get(getKey(ip, port)); } + public SyncPriorityChannel getAssociatedChannel(String identity, String gatewayHeartbeat, String ip, int port) { + SyncPriorityChannel channel = this.get(gatewayHeartbeat); + if (Objects.isNull(channel)) { + List sameIPKeys = this.mChannelMap.keySet().stream() + .filter(key -> StringUtils.contains(key, ":")) + .filter(key -> StringUtils.startsWith(key, ip + ":")) + .toList(); + if (sameIPKeys.size() == 1) { + channel = this.get(sameIPKeys.getFirst()); + } else { + Optional keyOptional = sameIPKeys.stream().filter(key -> StringUtils.endsWith(key, ":" + port)).findFirst(); + if (keyOptional.isPresent()) { + channel = this.get(keyOptional.get()); + } + } + if (Objects.isNull(channel)) { + if (this.lastCleanupChannel.get() == 0 || TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis() - this.lastCleanupChannel.get()) >= 1) { + log.error("存在心跳包{}[{}]对应{}:{}关系映射错误问题,距离上次清理链接已操作24小时,尝试清理链接修复", identity, gatewayHeartbeat, ip, port); + new HashSet<>(this.mChannelMap.keySet()) + .stream() + .filter(key -> StringUtils.contains(key, ":")) + .forEach(key -> Optional.ofNullable(this.mChannelMap.remove(key)).ifPresent(SyncPriorityChannel::close)); + } else { + log.error("存在心跳包{}[{}]对应{}:{}关系映射错误问题", identity, gatewayHeartbeat, ip, port); + } + } + } + return channel; + } + public void remove(String ip, int port) { this.mChannelMap.remove(getKey(ip, port)); } diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java index e532e17..75f03bf 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java @@ -10,11 +10,10 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.springframework.amqp.core.QueueBuilder; -import org.springframework.amqp.rabbit.core.RabbitAdmin; import java.net.InetSocketAddress; import java.util.List; +import java.util.Objects; @Slf4j public class ModbusDecoder extends ByteToMessageDecoder { @@ -37,35 +36,38 @@ public class ModbusDecoder extends ByteToMessageDecoder { String hex = HexUtil.bytesToHexString(b); log.info("解析到:{}", hex); + String currentAddress = ChannelGroup.getKey(ip, port); if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳 String msgUpperCase = hex.toUpperCase(); log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase); - String currentAddress = ChannelGroup.getKey(ip, port); SyncPriorityChannel channel = this.channelGroup.get(currentAddress); + if (Objects.isNull(channel)) { + channel = this.channelGroup.getAssociatedChannel(this.equipmentIPProperties.get(msgUpperCase), msgUpperCase, ip, port); + if (Objects.isNull(channel)) { + return; + } + } channel.setIdentifier(msgUpperCase); this.channelGroup.put(msgUpperCase, channel); - SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(channel.getCollectQueue()).build()); IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase)); String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase); - if (StringUtils.isNotBlank(currentAddress) && - StringUtils.isNotBlank(oldAddress) && - !currentAddress.equals(oldAddress)) { - //说明设备重连后IP+端口发生了改变 - String[] split = oldAddress.split(":"); - //删除老连接 - this.channelGroup.remove(split[0], Integer.parseInt(split[1])); - log.info("删除channelGroup通道:{}连接:{},新增该通道连接:{}", this.equipmentIPProperties.get(msgUpperCase), oldAddress, currentAddress); + // 说明设备重连后IP+端口发生了改变 + if (!StringUtils.equals(currentAddress, oldAddress) + && StringUtils.isNotBlank(currentAddress) && StringUtils.isNotBlank(oldAddress)) { + // 删除老连接 + this.channelGroup.remove(oldAddress); + log.info("更新通道{}连接为:{},移除旧连接:{}", this.equipmentIPProperties.get(msgUpperCase), currentAddress, oldAddress); } IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress); return; } - SyncPriorityChannel channel = this.channelGroup.get(ip, port); + SyncPriorityChannel channel = this.channelGroup.get(currentAddress); ModbusCommandDto message = channel.getCurrentMessage(); if (message == null) { - log.warn("非法传输:{},原报文:{}", HexUtil.bytesToHexString(b),new String(b)); + log.warn("非法传输:{},原报文:{}", HexUtil.bytesToHexString(b), new String(b)); return; } diff --git a/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java b/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java index 8289cb7..cd4e8de 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java @@ -74,7 +74,7 @@ public class NettyServer extends ChannelInitializer { //过滤掉docker 网关请求 if ("172.17.0.1".equals(ip)) return; int port = ch.remoteAddress().getPort(); - this.group.put(ip + ":" + port, new SyncPriorityChannel(ch)); + this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch)); pipeline.addLast(new ModbusEncoder()); pipeline.addLast("decoder", new ModbusDecoder(this.group)); diff --git a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java index 2ef8b31..5ae63a3 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java @@ -3,16 +3,20 @@ package com.iot.modbus_rtcp.netty; import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.utils.HexUtil; +import com.iot.modbus_rtcp.utils.SpringUtil; import io.netty.channel.ChannelPromise; import io.netty.channel.socket.SocketChannel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.core.RabbitAdmin; import java.net.InetSocketAddress; import java.util.List; import java.util.Objects; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -27,6 +31,7 @@ public class SyncPriorityChannel implements Runnable { private static final int allowRetryCount = 1; private static final boolean isControl = true; + private final AtomicBoolean close = new AtomicBoolean(Boolean.FALSE); private final SocketChannel channel; @@ -62,6 +67,9 @@ public class SyncPriorityChannel implements Runnable { @Override public void run() { + if (this.close.getAcquire()) { + return; + } this.sendNext(); } @@ -127,6 +135,12 @@ public class SyncPriorityChannel implements Runnable { this.sendNext(); } + public void close() { + this.close.setRelease(Boolean.TRUE); + this.messageQueue.clear(); + this.channel.close(); + } + public ChannelPromise getChannelPromise() { return this.channelPromiseReference.getAcquire(); } @@ -141,7 +155,8 @@ public class SyncPriorityChannel implements Runnable { } public void setCollectQueue() { -// this.collectQueue = "/modbus/device/" + this.identifier + "/collect"; this.collectQueue = "/modbus/collect/" + HexUtil.hashPartition(this.identifier, 10); + SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(this.collectQueue).build()); } + }