性能升级

This commit is contained in:
wangshilong 2024-12-30 09:19:11 +08:00
parent cd236a420c
commit ec5c7e0eba
6 changed files with 40 additions and 37 deletions

View File

@ -1,6 +1,7 @@
package com.iot.modbus_rtcp; package com.iot.modbus_rtcp;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
@ -18,7 +19,7 @@ public class ModbusRtcpApplication implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit"); private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit");
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// LicenseValidator.main(args); LicenseValidator.main(args);
SpringApplication.run(ModbusRtcpApplication.class, args); SpringApplication.run(ModbusRtcpApplication.class, args);
} }

View File

@ -3,6 +3,7 @@ package com.iot.modbus_rtcp.controller;
import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import com.iot.modbus_rtcp.netty.NettyServer; import com.iot.modbus_rtcp.netty.NettyServer;
import com.iot.modbus_rtcp.netty.ThreadPoolConsumer;
import com.iot.modbus_rtcp.utils.CRCUtil; import com.iot.modbus_rtcp.utils.CRCUtil;
import com.iot.modbus_rtcp.utils.HexUtil; import com.iot.modbus_rtcp.utils.HexUtil;
import com.iot.modbus_rtcp.vo.Response; import com.iot.modbus_rtcp.vo.Response;
@ -23,18 +24,19 @@ import java.util.Set;
@RestController @RestController
@RequestMapping("modbus-tcp") @RequestMapping("modbus-tcp")
public class ModbusTCPController implements ApplicationRunner { public class ModbusTCPController implements ApplicationRunner {
public static NettyServer nettyServer; public static final NettyServer nettyServer = new NettyServer(502, Math.max(20, Runtime.getRuntime().availableProcessors() - 1));
@PreDestroy @PreDestroy
private void destroy() { private void destroy() {
this.nettyServer.stop(); ThreadPoolConsumer.shutdown();
nettyServer.stop();
} }
/** /**
* 采集命令API * 采集命令API
* 采集命令API使用异步访问的方式设备响应数据后会将其推送到Kafka * 采集命令API使用异步访问的方式设备响应数据后会将其推送到Kafka
* *
* @param modbusCommandBoList * @param modbusCommandBoList 采集指令
* @return * @return
*/ */
@PostMapping("/collect") @PostMapping("/collect")
@ -42,14 +44,14 @@ public class ModbusTCPController implements ApplicationRunner {
log.debug("采集请求:{}", modbusCommandBoList); log.debug("采集请求:{}", modbusCommandBoList);
try { try {
modbusCommandBoList.stream().forEach(modbusCommandBo -> { modbusCommandBoList.forEach(modbusCommandBo -> {
modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand()))); modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand())));
modbusCommandBo.setType(CommandTypeComparable.CommandType.COLLECTION); modbusCommandBo.setType(CommandTypeComparable.CommandType.COLLECTION);
modbusCommandBo.setTimestamp(System.nanoTime()); modbusCommandBo.setTimestamp(System.nanoTime());
}); });
this.nettyServer.sender().send(modbusCommandBoList); nettyServer.sender().send(modbusCommandBoList);
} catch (Exception e) { } catch (Exception e) {
log.error("", e); log.error("", e);
return Response.failed(e.getMessage()); return Response.failed(e.getMessage());
@ -62,7 +64,7 @@ public class ModbusTCPController implements ApplicationRunner {
* 控制命令API * 控制命令API
* 控制命令API使用异步访问的方式设备响应数据自动丢弃 * 控制命令API使用异步访问的方式设备响应数据自动丢弃
* *
* @param modbusCommandBoList * @param modbusCommandBoList 控制指令
* @return 设备响应返回 * @return 设备响应返回
*/ */
@PostMapping("/control") @PostMapping("/control")
@ -70,14 +72,14 @@ public class ModbusTCPController implements ApplicationRunner {
log.debug("控制请求:{}", modbusCommandBoList); log.debug("控制请求:{}", modbusCommandBoList);
try { try {
modbusCommandBoList.stream().forEach(modbusCommandBo -> { modbusCommandBoList.forEach(modbusCommandBo -> {
modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand()))); modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand())));
modbusCommandBo.setType(CommandTypeComparable.CommandType.CONTROL); modbusCommandBo.setType(CommandTypeComparable.CommandType.CONTROL);
modbusCommandBo.setTimestamp(System.nanoTime()); modbusCommandBo.setTimestamp(System.nanoTime());
}); });
this.nettyServer.sender().send(modbusCommandBoList); nettyServer.sender().send(modbusCommandBoList);
} catch (Exception e) { } catch (Exception e) {
log.error("", e); log.error("", e);
return Response.failed(e.getMessage()); return Response.failed(e.getMessage());
@ -93,18 +95,17 @@ public class ModbusTCPController implements ApplicationRunner {
*/ */
@GetMapping("/online") @GetMapping("/online")
public Response<Set<String>> online() { public Response<Set<String>> online() {
return Response.succeed(this.nettyServer.getGroup().onlineGateway()); return Response.succeed(nettyServer.getGroup().onlineGateway());
} }
@GetMapping("/online/total") @GetMapping("/online/total")
public Response<Long> onlineTotal() { public Response<Long> onlineTotal() {
return Response.succeed(this.nettyServer.getGroup().onlineTotal()); return Response.succeed(nettyServer.getGroup().onlineTotal());
} }
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
this.nettyServer = new NettyServer(502, Math.max(20, Runtime.getRuntime().availableProcessors() - 1)); nettyServer.start();
this.nettyServer.start();
} }
} }

View File

@ -67,8 +67,7 @@ public class AutoCollectJobs {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final ModbusTCPController controller; private final ModbusTCPController controller;
private final EquipmentIPProperties equipmentIPProperties; private final EquipmentIPProperties equipmentIPProperties;
private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(50, Runtime.getRuntime().availableProcessors()) private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS,
, Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5000), new ThreadFactory() { new ArrayBlockingQueue<>(5000), new ThreadFactory() {
final AtomicInteger index = new AtomicInteger(1); final AtomicInteger index = new AtomicInteger(1);

View File

@ -2,6 +2,8 @@ package com.iot.modbus_rtcp.netty;
//import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator; //import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
import com.iot.modbus_rtcp.utils.SpringUtil;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
@ -75,7 +77,7 @@ public class NettyServer extends ChannelInitializer<SocketChannel> {
String ip = ch.remoteAddress().getHostString(); String ip = ch.remoteAddress().getHostString();
//过滤掉docker 网关请求 //过滤掉docker 网关请求
if ("172.17.0.1".equals(ip)) return; if ("172.17.0.1".equals(ip)) return;
// SpringUtil.getBean(LicenseValidator.class).verifyNumber(ch::close); SpringUtil.getBean(LicenseValidator.class).verifyNumber(ch::close);
int port = ch.remoteAddress().getPort(); int port = ch.remoteAddress().getPort();
this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch)); this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch));

View File

@ -1,5 +1,6 @@
package com.iot.modbus_rtcp.netty; package com.iot.modbus_rtcp.netty;
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
import com.iot.modbus_rtcp.dto.AutoCollectionDto; import com.iot.modbus_rtcp.dto.AutoCollectionDto;
import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.dto.ModbusCommandDto;
@ -14,13 +15,13 @@ import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/** /**
* 同步控制命令优先Channel * 同步控制命令优先Channel
@ -35,6 +36,7 @@ public class SyncPriorityChannel implements Runnable {
private static final int allowRetryCount = 1; private static final int allowRetryCount = 1;
private static final boolean isControl = true; private static final boolean isControl = true;
private final AtomicBoolean close = new AtomicBoolean(Boolean.FALSE); private final AtomicBoolean close = new AtomicBoolean(Boolean.FALSE);
private final AtomicBoolean running = new AtomicBoolean(Boolean.FALSE);
private final SocketChannel channel; private final SocketChannel channel;
@ -43,9 +45,7 @@ public class SyncPriorityChannel implements Runnable {
private volatile String identifier; private volatile String identifier;
@Getter @Getter
private volatile String collectQueue; private volatile String collectQueue;
private volatile boolean running = false;
private final Object object = new Object();
private final PriorityBlockingQueue<ModbusCommandDto> messageQueue = new PriorityBlockingQueue<>(); private final PriorityBlockingQueue<ModbusCommandDto> messageQueue = new PriorityBlockingQueue<>();
private final AtomicReference<ChannelPromise> channelPromiseReference = new AtomicReference<>(); private final AtomicReference<ChannelPromise> channelPromiseReference = new AtomicReference<>();
private final AtomicReference<ModbusCommandDto> currentMessageReference = new AtomicReference<>(); private final AtomicReference<ModbusCommandDto> currentMessageReference = new AtomicReference<>();
@ -55,15 +55,15 @@ public class SyncPriorityChannel implements Runnable {
} }
public void addMessages(List<ModbusCommandDto> messages) { public void addMessages(List<ModbusCommandDto> messages) {
cleanAutoCollection(messages); this.cleanAutoCollection(messages);
this.messageQueue.addAll(messages); this.messageQueue.addAll(messages);
synchronized (this.object) { synchronized (this.running) {
if (this.running) { if (this.running.getAcquire()) {
return; return;
} else { } else {
this.running = true; this.running.setRelease(Boolean.TRUE);
} }
} }
@ -85,7 +85,7 @@ public class SyncPriorityChannel implements Runnable {
// 消费完毕终止递归发送 // 消费完毕终止递归发送
if (Objects.isNull(this.getCurrentMessage())) { if (Objects.isNull(this.getCurrentMessage())) {
this.running = false; this.running.setRelease(Boolean.FALSE);
return; return;
} }
@ -140,11 +140,7 @@ public class SyncPriorityChannel implements Runnable {
this.timeoutRetryCount = 0; this.timeoutRetryCount = 0;
} }
try { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.sendNext(); this.sendNext();
} }
@ -180,13 +176,9 @@ public class SyncPriorityChannel implements Runnable {
} }
private void cleanAutoCollection(List<ModbusCommandDto> messages) { private void cleanAutoCollection(List<ModbusCommandDto> messages) {
SpringUtil.getBean(LicenseValidator.class).verify();
if (messages.stream().anyMatch(c -> c instanceof AutoCollectionDto)) { if (messages.stream().anyMatch(c -> c instanceof AutoCollectionDto)) {
Iterator<ModbusCommandDto> it = messageQueue.iterator(); this.messageQueue.removeIf(command -> command instanceof AutoCollectionDto);
while (it.hasNext()) {
if (it.next() instanceof AutoCollectionDto) {
it.remove();
}
}
} }
} }

View File

@ -9,13 +9,21 @@ public class ThreadPoolConsumer {
static { static {
mThreadPoolExecutor = new ThreadPoolTaskExecutor(); mThreadPoolExecutor = new ThreadPoolTaskExecutor();
mThreadPoolExecutor.setCorePoolSize(4); mThreadPoolExecutor.setQueueCapacity(5000);
mThreadPoolExecutor.setMaxPoolSize(8);
mThreadPoolExecutor.setKeepAliveSeconds(60); mThreadPoolExecutor.setKeepAliveSeconds(60);
mThreadPoolExecutor.setThreadPriority(Thread.MAX_PRIORITY);
mThreadPoolExecutor.setThreadNamePrefix("Modbus-command-sender-thread-");
mThreadPoolExecutor.setCorePoolSize(Math.min(50, Runtime.getRuntime().availableProcessors() - 1));
mThreadPoolExecutor.setMaxPoolSize(Math.min(500, Runtime.getRuntime().availableProcessors() * 10));
mThreadPoolExecutor.initialize(); mThreadPoolExecutor.initialize();
} }
public static void submit(Runnable task) { public static void submit(Runnable task) {
mThreadPoolExecutor.submit(task); mThreadPoolExecutor.submit(task);
} }
public static void shutdown() {
mThreadPoolExecutor.shutdown();
}
} }