我提交了版代码,你更新一下
This commit is contained in:
parent
69bc49c89d
commit
392ccee6ba
|
@ -33,7 +33,7 @@ public class EquipmentIPProperties {
|
||||||
if (this.identifiers.containsKey(gatewayHeartbeat)) {
|
if (this.identifiers.containsKey(gatewayHeartbeat)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (gatewayHeartbeat.length() > 80) {
|
if (gatewayHeartbeat.length() > 100) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// 每分钟从数据库入库一次
|
// 每分钟从数据库入库一次
|
||||||
|
|
|
@ -3,15 +3,17 @@ package com.iot.modbus_rtcp.netty;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class ChannelGroup {
|
public class ChannelGroup {
|
||||||
|
|
||||||
private final ConcurrentHashMap<String, SyncPriorityChannel> mChannelMap = new ConcurrentHashMap<>();
|
private final AtomicLong lastCleanupChannel = new AtomicLong(0);
|
||||||
|
private final ConcurrentHashMap<String, SyncPriorityChannel> mChannelMap = new ConcurrentHashMap<>(512);
|
||||||
|
|
||||||
public void put(String identity, SyncPriorityChannel channel) {
|
public void put(String identity, SyncPriorityChannel channel) {
|
||||||
this.mChannelMap.put(identity, channel);
|
this.mChannelMap.put(identity, channel);
|
||||||
|
@ -25,6 +27,36 @@ public class ChannelGroup {
|
||||||
return this.mChannelMap.get(getKey(ip, port));
|
return this.mChannelMap.get(getKey(ip, port));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SyncPriorityChannel getAssociatedChannel(String identity, String gatewayHeartbeat, String ip, int port) {
|
||||||
|
SyncPriorityChannel channel = this.get(gatewayHeartbeat);
|
||||||
|
if (Objects.isNull(channel)) {
|
||||||
|
List<String> sameIPKeys = this.mChannelMap.keySet().stream()
|
||||||
|
.filter(key -> StringUtils.contains(key, ":"))
|
||||||
|
.filter(key -> StringUtils.startsWith(key, ip + ":"))
|
||||||
|
.toList();
|
||||||
|
if (sameIPKeys.size() == 1) {
|
||||||
|
channel = this.get(sameIPKeys.getFirst());
|
||||||
|
} else {
|
||||||
|
Optional<String> keyOptional = sameIPKeys.stream().filter(key -> StringUtils.endsWith(key, ":" + port)).findFirst();
|
||||||
|
if (keyOptional.isPresent()) {
|
||||||
|
channel = this.get(keyOptional.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (Objects.isNull(channel)) {
|
||||||
|
if (this.lastCleanupChannel.get() == 0 || TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis() - this.lastCleanupChannel.get()) >= 1) {
|
||||||
|
log.error("存在心跳包{}[{}]对应{}:{}关系映射错误问题,距离上次清理链接已操作24小时,尝试清理链接修复", identity, gatewayHeartbeat, ip, port);
|
||||||
|
new HashSet<>(this.mChannelMap.keySet())
|
||||||
|
.stream()
|
||||||
|
.filter(key -> StringUtils.contains(key, ":"))
|
||||||
|
.forEach(key -> Optional.ofNullable(this.mChannelMap.remove(key)).ifPresent(SyncPriorityChannel::close));
|
||||||
|
} else {
|
||||||
|
log.error("存在心跳包{}[{}]对应{}:{}关系映射错误问题", identity, gatewayHeartbeat, ip, port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
public void remove(String ip, int port) {
|
public void remove(String ip, int port) {
|
||||||
this.mChannelMap.remove(getKey(ip, port));
|
this.mChannelMap.remove(getKey(ip, port));
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,11 +10,10 @@ import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.springframework.amqp.core.QueueBuilder;
|
|
||||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class ModbusDecoder extends ByteToMessageDecoder {
|
public class ModbusDecoder extends ByteToMessageDecoder {
|
||||||
|
@ -37,35 +36,38 @@ public class ModbusDecoder extends ByteToMessageDecoder {
|
||||||
String hex = HexUtil.bytesToHexString(b);
|
String hex = HexUtil.bytesToHexString(b);
|
||||||
log.info("解析到:{}", hex);
|
log.info("解析到:{}", hex);
|
||||||
|
|
||||||
|
String currentAddress = ChannelGroup.getKey(ip, port);
|
||||||
if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳
|
if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳
|
||||||
String msgUpperCase = hex.toUpperCase();
|
String msgUpperCase = hex.toUpperCase();
|
||||||
log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase);
|
log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase);
|
||||||
String currentAddress = ChannelGroup.getKey(ip, port);
|
|
||||||
SyncPriorityChannel channel = this.channelGroup.get(currentAddress);
|
SyncPriorityChannel channel = this.channelGroup.get(currentAddress);
|
||||||
|
if (Objects.isNull(channel)) {
|
||||||
|
channel = this.channelGroup.getAssociatedChannel(this.equipmentIPProperties.get(msgUpperCase), msgUpperCase, ip, port);
|
||||||
|
if (Objects.isNull(channel)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
channel.setIdentifier(msgUpperCase);
|
channel.setIdentifier(msgUpperCase);
|
||||||
this.channelGroup.put(msgUpperCase, channel);
|
this.channelGroup.put(msgUpperCase, channel);
|
||||||
SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(channel.getCollectQueue()).build());
|
|
||||||
|
|
||||||
IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase));
|
IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase));
|
||||||
String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase);
|
String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase);
|
||||||
if (StringUtils.isNotBlank(currentAddress) &&
|
// 说明设备重连后IP+端口发生了改变
|
||||||
StringUtils.isNotBlank(oldAddress) &&
|
if (!StringUtils.equals(currentAddress, oldAddress)
|
||||||
!currentAddress.equals(oldAddress)) {
|
&& StringUtils.isNotBlank(currentAddress) && StringUtils.isNotBlank(oldAddress)) {
|
||||||
//说明设备重连后IP+端口发生了改变
|
// 删除老连接
|
||||||
String[] split = oldAddress.split(":");
|
this.channelGroup.remove(oldAddress);
|
||||||
//删除老连接
|
log.info("更新通道{}连接为:{},移除旧连接:{}", this.equipmentIPProperties.get(msgUpperCase), currentAddress, oldAddress);
|
||||||
this.channelGroup.remove(split[0], Integer.parseInt(split[1]));
|
|
||||||
log.info("删除channelGroup通道:{}连接:{},新增该通道连接:{}", this.equipmentIPProperties.get(msgUpperCase), oldAddress, currentAddress);
|
|
||||||
}
|
}
|
||||||
IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress);
|
IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncPriorityChannel channel = this.channelGroup.get(ip, port);
|
SyncPriorityChannel channel = this.channelGroup.get(currentAddress);
|
||||||
ModbusCommandDto message = channel.getCurrentMessage();
|
ModbusCommandDto message = channel.getCurrentMessage();
|
||||||
|
|
||||||
if (message == null) {
|
if (message == null) {
|
||||||
log.warn("非法传输:{},原报文:{}", HexUtil.bytesToHexString(b),new String(b));
|
log.warn("非法传输:{},原报文:{}", HexUtil.bytesToHexString(b), new String(b));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class NettyServer extends ChannelInitializer<SocketChannel> {
|
||||||
//过滤掉docker 网关请求
|
//过滤掉docker 网关请求
|
||||||
if ("172.17.0.1".equals(ip)) return;
|
if ("172.17.0.1".equals(ip)) return;
|
||||||
int port = ch.remoteAddress().getPort();
|
int port = ch.remoteAddress().getPort();
|
||||||
this.group.put(ip + ":" + port, new SyncPriorityChannel(ch));
|
this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch));
|
||||||
|
|
||||||
pipeline.addLast(new ModbusEncoder());
|
pipeline.addLast(new ModbusEncoder());
|
||||||
pipeline.addLast("decoder", new ModbusDecoder(this.group));
|
pipeline.addLast("decoder", new ModbusDecoder(this.group));
|
||||||
|
|
|
@ -3,16 +3,20 @@ package com.iot.modbus_rtcp.netty;
|
||||||
import com.iot.modbus_rtcp.dto.CommandTypeComparable;
|
import com.iot.modbus_rtcp.dto.CommandTypeComparable;
|
||||||
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
|
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
|
||||||
import com.iot.modbus_rtcp.utils.HexUtil;
|
import com.iot.modbus_rtcp.utils.HexUtil;
|
||||||
|
import com.iot.modbus_rtcp.utils.SpringUtil;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.SocketChannel;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.core.QueueBuilder;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.PriorityBlockingQueue;
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -27,6 +31,7 @@ public class SyncPriorityChannel implements Runnable {
|
||||||
|
|
||||||
private static final int allowRetryCount = 1;
|
private static final int allowRetryCount = 1;
|
||||||
private static final boolean isControl = true;
|
private static final boolean isControl = true;
|
||||||
|
private final AtomicBoolean close = new AtomicBoolean(Boolean.FALSE);
|
||||||
|
|
||||||
private final SocketChannel channel;
|
private final SocketChannel channel;
|
||||||
|
|
||||||
|
@ -62,6 +67,9 @@ public class SyncPriorityChannel implements Runnable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
if (this.close.getAcquire()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
this.sendNext();
|
this.sendNext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,6 +135,12 @@ public class SyncPriorityChannel implements Runnable {
|
||||||
this.sendNext();
|
this.sendNext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
this.close.setRelease(Boolean.TRUE);
|
||||||
|
this.messageQueue.clear();
|
||||||
|
this.channel.close();
|
||||||
|
}
|
||||||
|
|
||||||
public ChannelPromise getChannelPromise() {
|
public ChannelPromise getChannelPromise() {
|
||||||
return this.channelPromiseReference.getAcquire();
|
return this.channelPromiseReference.getAcquire();
|
||||||
}
|
}
|
||||||
|
@ -141,7 +155,8 @@ public class SyncPriorityChannel implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setCollectQueue() {
|
public void setCollectQueue() {
|
||||||
// this.collectQueue = "/modbus/device/" + this.identifier + "/collect";
|
|
||||||
this.collectQueue = "/modbus/collect/" + HexUtil.hashPartition(this.identifier, 10);
|
this.collectQueue = "/modbus/collect/" + HexUtil.hashPartition(this.identifier, 10);
|
||||||
|
SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(this.collectQueue).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue