diff --git a/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java b/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java index cf9f437..c59a67a 100644 --- a/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java +++ b/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java @@ -1,6 +1,7 @@ package com.iot.modbus_rtcp; import cn.hutool.json.JSONUtil; +import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -18,7 +19,7 @@ public class ModbusRtcpApplication implements ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit"); public static void main(String[] args) throws Exception { -// LicenseValidator.main(args); + LicenseValidator.main(args); SpringApplication.run(ModbusRtcpApplication.class, args); } diff --git a/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java b/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java index d9b5849..65e8830 100644 --- a/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java +++ b/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java @@ -3,6 +3,7 @@ package com.iot.modbus_rtcp.controller; import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.netty.NettyServer; +import com.iot.modbus_rtcp.netty.ThreadPoolConsumer; import com.iot.modbus_rtcp.utils.CRCUtil; import com.iot.modbus_rtcp.utils.HexUtil; import com.iot.modbus_rtcp.vo.Response; @@ -23,18 +24,19 @@ import java.util.Set; @RestController @RequestMapping("modbus-tcp") public class ModbusTCPController implements ApplicationRunner { - public static NettyServer nettyServer; + public static final NettyServer nettyServer = new NettyServer(502, Math.max(20, Runtime.getRuntime().availableProcessors() - 1)); @PreDestroy private void destroy() { - this.nettyServer.stop(); + ThreadPoolConsumer.shutdown(); + nettyServer.stop(); } /** * 采集命令API * 采集命令API使用异步访问的方式,设备响应数据后会将其推送到Kafka * - * @param modbusCommandBoList + * @param modbusCommandBoList 采集指令 * @return */ @PostMapping("/collect") @@ -42,14 +44,14 @@ public class ModbusTCPController implements ApplicationRunner { log.debug("采集请求:{}", modbusCommandBoList); try { - modbusCommandBoList.stream().forEach(modbusCommandBo -> { + modbusCommandBoList.forEach(modbusCommandBo -> { modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand()))); modbusCommandBo.setType(CommandTypeComparable.CommandType.COLLECTION); modbusCommandBo.setTimestamp(System.nanoTime()); }); - this.nettyServer.sender().send(modbusCommandBoList); + nettyServer.sender().send(modbusCommandBoList); } catch (Exception e) { log.error("", e); return Response.failed(e.getMessage()); @@ -62,7 +64,7 @@ public class ModbusTCPController implements ApplicationRunner { * 控制命令API * 控制命令API使用异步访问的方式,设备响应数据自动丢弃 * - * @param modbusCommandBoList + * @param modbusCommandBoList 控制指令 * @return 设备响应返回 */ @PostMapping("/control") @@ -70,14 +72,14 @@ public class ModbusTCPController implements ApplicationRunner { log.debug("控制请求:{}", modbusCommandBoList); try { - modbusCommandBoList.stream().forEach(modbusCommandBo -> { + modbusCommandBoList.forEach(modbusCommandBo -> { modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand()))); modbusCommandBo.setType(CommandTypeComparable.CommandType.CONTROL); modbusCommandBo.setTimestamp(System.nanoTime()); }); - this.nettyServer.sender().send(modbusCommandBoList); + nettyServer.sender().send(modbusCommandBoList); } catch (Exception e) { log.error("", e); return Response.failed(e.getMessage()); @@ -93,18 +95,17 @@ public class ModbusTCPController implements ApplicationRunner { */ @GetMapping("/online") public Response> online() { - return Response.succeed(this.nettyServer.getGroup().onlineGateway()); + return Response.succeed(nettyServer.getGroup().onlineGateway()); } @GetMapping("/online/total") public Response onlineTotal() { - return Response.succeed(this.nettyServer.getGroup().onlineTotal()); + return Response.succeed(nettyServer.getGroup().onlineTotal()); } @Override public void run(ApplicationArguments args) throws Exception { - this.nettyServer = new NettyServer(502, Math.max(20, Runtime.getRuntime().availableProcessors() - 1)); - this.nettyServer.start(); + nettyServer.start(); } } diff --git a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java index d7e29c6..6b26406 100644 --- a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java +++ b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java @@ -67,8 +67,7 @@ public class AutoCollectJobs { private final JdbcTemplate jdbcTemplate; private final ModbusTCPController controller; private final EquipmentIPProperties equipmentIPProperties; - private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(50, Runtime.getRuntime().availableProcessors()) - , Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS, + private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5000), new ThreadFactory() { final AtomicInteger index = new AtomicInteger(1); diff --git a/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java b/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java index 144d537..6ec8d49 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java @@ -2,6 +2,8 @@ package com.iot.modbus_rtcp.netty; //import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator; +import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator; +import com.iot.modbus_rtcp.utils.SpringUtil; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; @@ -75,7 +77,7 @@ public class NettyServer extends ChannelInitializer { String ip = ch.remoteAddress().getHostString(); //过滤掉docker 网关请求 if ("172.17.0.1".equals(ip)) return; -// SpringUtil.getBean(LicenseValidator.class).verifyNumber(ch::close); + SpringUtil.getBean(LicenseValidator.class).verifyNumber(ch::close); int port = ch.remoteAddress().getPort(); this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch)); diff --git a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java index df896b5..ed9d759 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java @@ -1,5 +1,6 @@ package com.iot.modbus_rtcp.netty; +import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator; import com.iot.modbus_rtcp.dto.AutoCollectionDto; import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.ModbusCommandDto; @@ -14,13 +15,13 @@ import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.core.RabbitAdmin; import java.net.InetSocketAddress; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; /** * 同步控制命令优先Channel @@ -35,6 +36,7 @@ public class SyncPriorityChannel implements Runnable { private static final int allowRetryCount = 1; private static final boolean isControl = true; private final AtomicBoolean close = new AtomicBoolean(Boolean.FALSE); + private final AtomicBoolean running = new AtomicBoolean(Boolean.FALSE); private final SocketChannel channel; @@ -43,9 +45,7 @@ public class SyncPriorityChannel implements Runnable { private volatile String identifier; @Getter private volatile String collectQueue; - private volatile boolean running = false; - private final Object object = new Object(); private final PriorityBlockingQueue messageQueue = new PriorityBlockingQueue<>(); private final AtomicReference channelPromiseReference = new AtomicReference<>(); private final AtomicReference currentMessageReference = new AtomicReference<>(); @@ -55,15 +55,15 @@ public class SyncPriorityChannel implements Runnable { } public void addMessages(List messages) { - cleanAutoCollection(messages); + this.cleanAutoCollection(messages); this.messageQueue.addAll(messages); - synchronized (this.object) { - if (this.running) { + synchronized (this.running) { + if (this.running.getAcquire()) { return; } else { - this.running = true; + this.running.setRelease(Boolean.TRUE); } } @@ -85,7 +85,7 @@ public class SyncPriorityChannel implements Runnable { // 消费完毕终止递归发送 if (Objects.isNull(this.getCurrentMessage())) { - this.running = false; + this.running.setRelease(Boolean.FALSE); return; } @@ -140,11 +140,7 @@ public class SyncPriorityChannel implements Runnable { this.timeoutRetryCount = 0; } - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10)); this.sendNext(); } @@ -180,13 +176,9 @@ public class SyncPriorityChannel implements Runnable { } private void cleanAutoCollection(List messages) { + SpringUtil.getBean(LicenseValidator.class).verify(); if (messages.stream().anyMatch(c -> c instanceof AutoCollectionDto)) { - Iterator it = messageQueue.iterator(); - while (it.hasNext()) { - if (it.next() instanceof AutoCollectionDto) { - it.remove(); - } - } + this.messageQueue.removeIf(command -> command instanceof AutoCollectionDto); } } diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java b/src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java index 71f922e..9c8c22a 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java @@ -9,13 +9,21 @@ public class ThreadPoolConsumer { static { mThreadPoolExecutor = new ThreadPoolTaskExecutor(); - mThreadPoolExecutor.setCorePoolSize(4); - mThreadPoolExecutor.setMaxPoolSize(8); + mThreadPoolExecutor.setQueueCapacity(5000); mThreadPoolExecutor.setKeepAliveSeconds(60); + mThreadPoolExecutor.setThreadPriority(Thread.MAX_PRIORITY); + mThreadPoolExecutor.setThreadNamePrefix("Modbus-command-sender-thread-"); + mThreadPoolExecutor.setCorePoolSize(Math.min(50, Runtime.getRuntime().availableProcessors() - 1)); + mThreadPoolExecutor.setMaxPoolSize(Math.min(500, Runtime.getRuntime().availableProcessors() * 10)); mThreadPoolExecutor.initialize(); } public static void submit(Runnable task) { mThreadPoolExecutor.submit(task); } + + public static void shutdown() { + mThreadPoolExecutor.shutdown(); + } + }