From 539907bd0c52d2f16150f0272928715fbaf59a42 Mon Sep 17 00:00:00 2001 From: wangshilong Date: Mon, 25 Nov 2024 14:49:18 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E6=9E=90=E6=95=B0=E6=8D=AE=E8=90=BD?= =?UTF-8?q?=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 ++ .../controller/ModbusTCPController.java | 18 +++++-- .../iot/modbus_rtcp/jobs/AutoCollectJobs.java | 50 +++++++++++++++---- .../iot/modbus_rtcp/netty/ChannelGroup.java | 10 ++++ .../iot/modbus_rtcp/netty/ModbusDecoder.java | 15 ------ .../iot/modbus_rtcp/netty/NettyServer.java | 2 + .../com/iot/modbus_rtcp/utils/CRCUtil.java | 12 +++++ 7 files changed, 82 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index 5d0259c..426f6f7 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,11 @@ cn.hutool hutool-all + + org.bouncycastle + bcprov-jdk15on + 1.70 + org.apache.commons commons-lang3 diff --git a/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java b/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java index 99b3ab0..70a73ac 100644 --- a/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java +++ b/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java @@ -10,12 +10,10 @@ import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import java.util.List; +import java.util.Set; /** * Modbus-TCP协议API @@ -25,7 +23,7 @@ import java.util.List; @RestController @RequestMapping("modbus-tcp") public class ModbusTCPController implements ApplicationRunner { - private NettyServer nettyServer; + public static NettyServer nettyServer; @PreDestroy private void destroy() { @@ -88,6 +86,16 @@ public class ModbusTCPController implements ApplicationRunner { return Response.succeed(); } + /** + * 在綫設備列表 + * + * @return + */ + @GetMapping("/online") + public Response> online() { + return Response.succeed(this.nettyServer.getGroup().onlineGateway()); + } + @Override public void run(ApplicationArguments args) throws Exception { this.nettyServer = new NettyServer(502, 10); diff --git a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java index aa8c0b8..7d298f1 100644 --- a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java +++ b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java @@ -14,6 +14,11 @@ import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -24,11 +29,25 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class AutoCollectJobs { - private static final String SQL = "SELECT ref_id as deviceId, command, message_length messageLength FROM commands " + + private static final String SQL = "SELECT id as commandId, ref_id as deviceId, command, " + + "message_length messageLength, '4B454E454E4731343030303030333538' as identifier FROM commands " + "WHERE type = 'COLLECTION' and ref_type = 'DEVICE' and id < 10099 order by id"; private final JdbcTemplate jdbcTemplate; private final ModbusTCPController controller; + private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors()) + , Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(5000), new ThreadFactory() { + AtomicInteger index = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Modbus-auto-collect-thread-" + index.getAndIncrement()); + thread.setDaemon(true); + thread.setPriority(Thread.MIN_PRIORITY); + return thread; + } + }, new ThreadPoolExecutor.CallerRunsPolicy()); @Scheduled(cron = "0/30 * * * * ? ") public void autoCollect() { @@ -38,19 +57,30 @@ public class AutoCollectJobs { List> resultList = null; while (Objects.isNull(resultList) || resultList.size() >= pageSize) { resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++) + "," + pageSize); - if (ObjectUtils.isNotEmpty(resultList)) { - this.controller.collect(resultList.stream() - .filter(item -> StringUtils.isNotEmpty(MapUtil.getStr(item, "identifier"))) - .map(item -> ModbusCommandDto.builder() + if (ObjectUtils.isEmpty(resultList)) { + continue; + } + List collectCommondList = resultList.stream() + .filter(item -> Objects.nonNull(ModbusTCPController.nettyServer.getGroup() + .get((MapUtil.getStr(item, "identifier"))))) + .map(item -> { + String identifier = MapUtil.getStr(item, "identifier"); + return ModbusCommandDto.builder() .command(MapUtil.getStr(item, "command")) - .identifier("4B454E454E4731343030303030333538") + .identifier(identifier) .length(MapUtil.getInt(item, "messageLength")) .type(CommandTypeComparable.CommandType.COLLECTION) - .key("4B454E454E4731343030303030333538/" + MapUtil.getStr(item, "deviceId") + "/" + timestamp) - .build()) - .collect(Collectors.toList())); + .key(StringUtils.joinWith("/", identifier, + MapUtil.getStr(item, "deviceId"), + MapUtil.getStr(item, "commandId"), + timestamp)) + .build(); + }) + .collect(Collectors.toList()); + if (ObjectUtils.isEmpty(collectCommondList)) { + continue; } + this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList)); } - } } diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java b/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java index b2ea8a4..617257f 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java @@ -1,9 +1,12 @@ package com.iot.modbus_rtcp.netty; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; @Slf4j public class ChannelGroup { @@ -30,6 +33,13 @@ public class ChannelGroup { this.mChannelMap.remove(identity); } + public Set onlineGateway() { + return this.mChannelMap.keySet() + .stream() + .filter(identifier -> !StringUtils.contains(identifier, ":")) + .collect(Collectors.toSet()); + } + public void see() { log.info("当前连接有:{}", new ArrayList<>(this.mChannelMap.keySet())); } diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java index cdd3fdf..81513a8 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java @@ -3,7 +3,6 @@ package com.iot.modbus_rtcp.netty; import cn.hutool.core.util.ArrayUtil; import com.iot.modbus_rtcp.config.EquipmentIPProperties; import com.iot.modbus_rtcp.dto.ModbusCommandDto; -import com.iot.modbus_rtcp.utils.CRCUtil; import com.iot.modbus_rtcp.utils.HexUtil; import com.iot.modbus_rtcp.utils.SpringUtil; import io.netty.buffer.ByteBuf; @@ -11,7 +10,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.bouncycastle.pqc.math.linearalgebra.ByteUtils; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.core.RabbitAdmin; @@ -90,21 +88,8 @@ public class ModbusDecoder extends ByteToMessageDecoder { if (bytesCache.length < (message.getLength() / 2)) { DataCache.put(identity, bytesCache); } else if (bytesCache.length == (message.getLength() / 2)) { -// if (!verifyCRC(bytesCache)) { -// return; -// } - out.add(HexUtil.bytesToHexString(bytesCache)); } } - private boolean verifyCRC(byte[] bytes) { - byte[] crc = ByteUtils.subArray(bytes, bytes.length - 2, bytes.length); - byte[] data = ByteUtils.subArray(bytes, 0, bytes.length - 2); - - String generate = CRCUtil.getCRC(data); - String original = HexUtil.bytesToHexString(crc); - - return original.equalsIgnoreCase(generate); - } } diff --git a/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java b/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java index 6d8ab78..8289cb7 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java @@ -7,6 +7,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -16,6 +17,7 @@ public class NettyServer extends ChannelInitializer { private ChannelFuture future; private int port, nThread; + @Getter private ChannelGroup group; private ModbusSender sender; private ChannelHandler handler; diff --git a/src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java b/src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java index edae9f5..65cc5be 100644 --- a/src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java +++ b/src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java @@ -1,5 +1,7 @@ package com.iot.modbus_rtcp.utils; +import org.bouncycastle.pqc.math.linearalgebra.ByteUtils; + public class CRCUtil { public static String getCRC(byte[] bytes, boolean cvs) { int CRC = 0x0000ffff; @@ -29,6 +31,16 @@ public class CRCUtil { return getCRC(bytes, true); } + public static boolean verifyCRC(byte[] bytes) { + byte[] crc = ByteUtils.subArray(bytes, bytes.length - 2, bytes.length); + byte[] data = ByteUtils.subArray(bytes, 0, bytes.length - 2); + + String generate = CRCUtil.getCRC(data); + String original = HexUtil.bytesToHexString(crc); + + return original.equalsIgnoreCase(generate); + } + public static void main(String[] args) { String[] array = new String[]{"0A0300000019", "0A0303930023", "0A0301A4002D", "0A0301D6002D", "0A0300320064", "0A0300960064", "0A0300FA0064", "0A03015E0064", "0A0200000050"}; for (String str : array) {