From cd236a420cec456058405038984880596ae070de Mon Sep 17 00:00:00 2001 From: wangshilong Date: Sun, 22 Dec 2024 15:20:43 +0800 Subject: [PATCH] =?UTF-8?q?BUG=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../modbus_rtcp/ModbusRtcpApplication.java | 3 +- .../controller/ModbusTCPController.java | 2 +- .../modbus_rtcp/dto/AutoCollectionDto.java | 15 +++++++ .../dto/CommandTypeComparable.java | 2 + .../iot/modbus_rtcp/dto/ModbusCommandDto.java | 2 + .../iot/modbus_rtcp/jobs/AutoCollectJobs.java | 35 ++-------------- .../iot/modbus_rtcp/netty/ModbusDecoder.java | 4 +- .../iot/modbus_rtcp/netty/ModbusSender.java | 2 +- .../iot/modbus_rtcp/netty/NettyServer.java | 6 +-- .../iot/modbus_rtcp/netty/SyncHandler.java | 8 ++-- .../netty/SyncPriorityChannel.java | 42 +++++++++++++++---- 11 files changed, 68 insertions(+), 53 deletions(-) create mode 100644 src/main/java/com/iot/modbus_rtcp/dto/AutoCollectionDto.java diff --git a/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java b/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java index c59a67a..cf9f437 100644 --- a/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java +++ b/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java @@ -1,7 +1,6 @@ package com.iot.modbus_rtcp; import cn.hutool.json.JSONUtil; -import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -19,7 +18,7 @@ public class ModbusRtcpApplication implements ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit"); public static void main(String[] args) throws Exception { - LicenseValidator.main(args); +// LicenseValidator.main(args); SpringApplication.run(ModbusRtcpApplication.class, args); } diff --git a/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java b/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java index 9f8f527..d9b5849 100644 --- a/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java +++ b/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java @@ -103,7 +103,7 @@ public class ModbusTCPController implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { - this.nettyServer = new NettyServer(502, 10); + this.nettyServer = new NettyServer(502, Math.max(20, Runtime.getRuntime().availableProcessors() - 1)); this.nettyServer.start(); } diff --git a/src/main/java/com/iot/modbus_rtcp/dto/AutoCollectionDto.java b/src/main/java/com/iot/modbus_rtcp/dto/AutoCollectionDto.java new file mode 100644 index 0000000..a1cc3ec --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/dto/AutoCollectionDto.java @@ -0,0 +1,15 @@ +package com.iot.modbus_rtcp.dto; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + + +@Data +@SuperBuilder +@NoArgsConstructor +@ToString(callSuper = true) +public class AutoCollectionDto extends ModbusCommandDto { + +} diff --git a/src/main/java/com/iot/modbus_rtcp/dto/CommandTypeComparable.java b/src/main/java/com/iot/modbus_rtcp/dto/CommandTypeComparable.java index 9b3c387..3c244e1 100644 --- a/src/main/java/com/iot/modbus_rtcp/dto/CommandTypeComparable.java +++ b/src/main/java/com/iot/modbus_rtcp/dto/CommandTypeComparable.java @@ -3,11 +3,13 @@ package com.iot.modbus_rtcp.dto; import lombok.Data; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.ToString; import lombok.experimental.SuperBuilder; @Data @SuperBuilder @NoArgsConstructor +@ToString(callSuper = true) public class CommandTypeComparable implements Comparable { private CommandType type; diff --git a/src/main/java/com/iot/modbus_rtcp/dto/ModbusCommandDto.java b/src/main/java/com/iot/modbus_rtcp/dto/ModbusCommandDto.java index 6dd8142..31560ba 100644 --- a/src/main/java/com/iot/modbus_rtcp/dto/ModbusCommandDto.java +++ b/src/main/java/com/iot/modbus_rtcp/dto/ModbusCommandDto.java @@ -2,12 +2,14 @@ package com.iot.modbus_rtcp.dto; import lombok.Data; import lombok.NoArgsConstructor; +import lombok.ToString; import lombok.experimental.SuperBuilder; @Data @SuperBuilder @NoArgsConstructor +@ToString(callSuper = true) public class ModbusCommandDto extends CommandTypeComparable implements CommandDto { /** * 12位十六进制的命令 diff --git a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java index 26253d2..d7e29c6 100644 --- a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java +++ b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java @@ -3,13 +3,14 @@ package com.iot.modbus_rtcp.jobs; import cn.hutool.core.map.MapUtil; import com.iot.modbus_rtcp.config.EquipmentIPProperties; import com.iot.modbus_rtcp.controller.ModbusTCPController; +import com.iot.modbus_rtcp.dto.AutoCollectionDto; import com.iot.modbus_rtcp.dto.CommandTypeComparable; -import com.iot.modbus_rtcp.dto.ModbusCommandDto; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; @@ -80,7 +81,7 @@ public class AutoCollectJobs { } }, new ThreadPoolExecutor.CallerRunsPolicy()); - // @Scheduled(cron = "0/30 * * * * ? ") + @Scheduled(cron = "0/30 * * * * ? ") public void autoCollect() { int pageIndex = 0; int pageSize = 1000; @@ -118,7 +119,7 @@ public class AutoCollectJobs { return deviceCommandList.stream() .map(item -> { String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier")); - return ModbusCommandDto.builder() + return AutoCollectionDto.builder() .command(StringUtils.trim(MapUtil.getStr(item, "command"))) .identifier(identifier) .length(MapUtil.getInt(item, "messageLength")) @@ -133,34 +134,6 @@ public class AutoCollectJobs { return Stream.empty(); }) .collect(Collectors.toList())); -// .forEach((deviceId, commandList) -> { -// Map>> refTypeCommandListMap = commandList.stream() -// .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType"))); -// List> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE); -// if (ObjectUtils.isEmpty(deviceCommandList)) { -// deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE); -// } -// if (ObjectUtils.isNotEmpty(deviceCommandList)) { -// List collectCommondList = deviceCommandList.stream() -// .map(item -> { -// String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier")); -// return ModbusCommandDto.builder() -// .command(StringUtils.trim(MapUtil.getStr(item, "command"))) -// .identifier(identifier) -// .length(MapUtil.getInt(item, "messageLength")) -// .type(CommandTypeComparable.CommandType.COLLECTION) -// .key(StringUtils.joinWith("/", identifier, -// StringUtils.trim(MapUtil.getStr(item, "deviceId")), -// StringUtils.trim(MapUtil.getStr(item, "commandId")), -// timestamp)) -// .build(); -// }) -// .collect(Collectors.toList()); -// if (ObjectUtils.isNotEmpty(collectCommondList)) { -// this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList)); -// } -// } -// }); }); if (resultList.size() < pageSize) { break; diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java index 75f03bf..5067fcd 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java @@ -34,7 +34,7 @@ public class ModbusDecoder extends ByteToMessageDecoder { buffer.readBytes(b); String hex = HexUtil.bytesToHexString(b); - log.info("解析到:{}", hex); + log.debug("解析到:{}", hex); String currentAddress = ChannelGroup.getKey(ip, port); if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳 @@ -67,7 +67,7 @@ public class ModbusDecoder extends ByteToMessageDecoder { ModbusCommandDto message = channel.getCurrentMessage(); if (message == null) { - log.warn("非法传输:{},原报文:{}", HexUtil.bytesToHexString(b), new String(b)); + log.warn("非法传输:{},原报文:{},IP:{}:{}", HexUtil.bytesToHexString(b), new String(b), ip, port); return; } diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ModbusSender.java b/src/main/java/com/iot/modbus_rtcp/netty/ModbusSender.java index 5810123..613bbbe 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/ModbusSender.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/ModbusSender.java @@ -35,7 +35,7 @@ public class ModbusSender { throw new RuntimeException("设备" + identity + "的连接未配置或该设备已断开与服务器的连接,请稍后再试或联系管理员"); } - log.info("{}通道({})提交{}条命令:{}", this.equipmentIPProperties.get(identity), identity, v.size(), v); + log.debug("{}通道({})提交{}条命令:{}", this.equipmentIPProperties.get(identity), identity, v.size(), v); channel.addMessages(v); }); } diff --git a/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java b/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java index c558e2b..144d537 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java @@ -1,7 +1,7 @@ package com.iot.modbus_rtcp.netty; -import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator; -import com.iot.modbus_rtcp.utils.SpringUtil; +//import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; @@ -75,7 +75,7 @@ public class NettyServer extends ChannelInitializer { String ip = ch.remoteAddress().getHostString(); //过滤掉docker 网关请求 if ("172.17.0.1".equals(ip)) return; - SpringUtil.getBean(LicenseValidator.class).verifyNumber(ch::close); +// SpringUtil.getBean(LicenseValidator.class).verifyNumber(ch::close); int port = ch.remoteAddress().getPort(); this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch)); diff --git a/src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java b/src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java index fb99b46..eb7aac0 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java @@ -39,10 +39,10 @@ public class SyncHandler extends ChannelInboundHandlerAdapter { try { channel.getChannelPromise().setSuccess(); } catch (IllegalStateException e) { - log.info("数据返回慢了,已经超时!"); + log.warn("数据返回慢了,已经超时!"); return; } catch (Exception e) { - log.info("丢失数据:{}", msg); + log.warn("丢失数据:{}", msg); return; } @@ -64,7 +64,7 @@ public class SyncHandler extends ChannelInboundHandlerAdapter { } String json = message.getKey() + "/" + System.currentTimeMillis() + "/" + msg; - log.info("推数据到MQ({}): {}", channel.getCollectQueue(), json); + log.debug("推数据到MQ({}): {}", channel.getCollectQueue(), json); try { this.rabbitTemplate.convertAndSend(channel.getCollectQueue(), json); } catch (Exception e) { @@ -80,7 +80,7 @@ public class SyncHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - log.info("异常: ", cause); + log.warn("异常: ", cause); InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ipKey = ChannelGroup.getKey(remoteAddress.getHostString(), remoteAddress.getPort()); this.channelGroup.remove(ipKey); diff --git a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java index be25fc4..df896b5 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java @@ -1,5 +1,6 @@ package com.iot.modbus_rtcp.netty; +import com.iot.modbus_rtcp.dto.AutoCollectionDto; import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.utils.HexUtil; @@ -13,6 +14,7 @@ import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.core.RabbitAdmin; import java.net.InetSocketAddress; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.PriorityBlockingQueue; @@ -53,6 +55,8 @@ public class SyncPriorityChannel implements Runnable { } public void addMessages(List messages) { + cleanAutoCollection(messages); + this.messageQueue.addAll(messages); synchronized (this.object) { @@ -91,12 +95,21 @@ public class SyncPriorityChannel implements Runnable { this.channelPromiseReference.setRelease(this.channel.newPromise()); if (isControl || Objects.equals(CommandTypeComparable.CommandType.COLLECTION, this.getCurrentMessage().getType())) { - log.info("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送", - this.getCurrentMessage().getCommand(), - this.getCurrentMessage().getLength(), - IPGatewayRelation.get(ChannelGroup.getKey(ip, port)), - ChannelGroup.getKey(ip, port), - this.messageQueue.size()); + if (Objects.equals(CommandTypeComparable.CommandType.CONTROL, this.getCurrentMessage().getType())) { + log.info("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送", + this.getCurrentMessage().getCommand(), + this.getCurrentMessage().getLength(), + IPGatewayRelation.get(ChannelGroup.getKey(ip, port)), + ChannelGroup.getKey(ip, port), + this.messageQueue.size()); + } else { + log.debug("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送", + this.getCurrentMessage().getCommand(), + this.getCurrentMessage().getLength(), + IPGatewayRelation.get(ChannelGroup.getKey(ip, port)), + ChannelGroup.getKey(ip, port), + this.messageQueue.size()); + } this.channel.writeAndFlush(this.getCurrentMessage().getCommand()); } @@ -106,19 +119,19 @@ public class SyncPriorityChannel implements Runnable { boolean timeout; try { - timeout = !this.getChannelPromise().await(3, TimeUnit.SECONDS); + timeout = !this.getChannelPromise().await(1, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("channelPromise.await发生异常,", e); return; } if (timeout && this.timeoutRetryCount == (allowRetryCount - 1)) { - log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送", + log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送,Command:{}", this.getCurrentMessage().getCommand(), this.getCurrentMessage().getLength(), IPGatewayRelation.get(ChannelGroup.getKey(ip, port)), ChannelGroup.getKey(ip, port), - this.messageQueue.size()); + this.messageQueue.size(), this.getCurrentMessage()); } if (timeout && this.timeoutRetryCount < (allowRetryCount - 1)) { @@ -166,4 +179,15 @@ public class SyncPriorityChannel implements Runnable { SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(this.collectQueue).build()); } + private void cleanAutoCollection(List messages) { + if (messages.stream().anyMatch(c -> c instanceof AutoCollectionDto)) { + Iterator it = messageQueue.iterator(); + while (it.hasNext()) { + if (it.next() instanceof AutoCollectionDto) { + it.remove(); + } + } + } + } + }