From ee03c86167cf4cc8c5826ef45065c5539b177e2e Mon Sep 17 00:00:00 2001 From: wangshilong Date: Fri, 20 Dec 2024 22:57:24 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E9=87=87=E9=9B=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/EquipmentIPProperties.java | 2 + .../iot/modbus_rtcp/jobs/AutoCollectJobs.java | 126 +++++++++++------- .../netty/SyncPriorityChannel.java | 11 +- 3 files changed, 91 insertions(+), 48 deletions(-) diff --git a/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java b/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java index 1280aab..27e5f8d 100644 --- a/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java +++ b/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java @@ -36,6 +36,8 @@ public class EquipmentIPProperties { if (gatewayHeartbeat.length() > 100) { return false; } + // 解决心跳包中可能存在的空格符和换行符 + gatewayHeartbeat = HexUtil.bytesToHexString(StringUtils.trim(new String(HexUtil.hexStringToBytes(gatewayHeartbeat))).getBytes()).toUpperCase(); // 每分钟从数据库入库一次 if (this.lastSyncEquipment.getAcquire() < System.currentTimeMillis() - 60_000) { this.jdbcTemplate.queryForList("select gateway_sn from device").forEach(map -> { diff --git a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java index 4e2ff2d..702e837 100644 --- a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java +++ b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java @@ -22,6 +22,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * @author 王仕龙 @@ -66,10 +67,10 @@ public class AutoCollectJobs { private final JdbcTemplate jdbcTemplate; private final ModbusTCPController controller; private final EquipmentIPProperties equipmentIPProperties; - private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors()) + private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(50, Runtime.getRuntime().availableProcessors()) , Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5000), new ThreadFactory() { - AtomicInteger index = new AtomicInteger(1); + final AtomicInteger index = new AtomicInteger(1); @Override public Thread newThread(Runnable runnable) { @@ -85,53 +86,86 @@ public class AutoCollectJobs { int pageIndex = 0; int pageSize = 1000; long timestamp = System.currentTimeMillis(); - List> resultList = null; - while (Objects.isNull(resultList) || resultList.size() >= pageSize) { - resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++ * pageSize) + ", " + pageSize); + while (true) { + List> resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++ * pageSize) + ", " + pageSize); if (ObjectUtils.isEmpty(resultList)) { - continue; + break; } - resultList.stream() - .filter(item -> { - String gatewaySn = StringUtils.trim(MapUtil.getStr(item, "identifier")); - String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn); - if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup() - .get(gatewayIdentifier))) { - item.put("identifier", gatewayIdentifier); - return true; - } - log.warn("Gateway {} is disconnected and does not collect data", gatewaySn); - return false; - }) - .collect(Collectors.groupingBy(item -> StringUtils.trim(MapUtil.getStr(item, "deviceId")))) - .forEach((deviceId, commandList) -> { - Map>> refTypeCommandListMap = commandList.stream() - .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType"))); - List> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE); - if (ObjectUtils.isEmpty(deviceCommandList)) { - deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE); - } - if (ObjectUtils.isNotEmpty(deviceCommandList)) { - List collectCommondList = deviceCommandList.stream() - .map(item -> { - String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier")); - return ModbusCommandDto.builder() - .command(StringUtils.trim(MapUtil.getStr(item, "command"))) - .identifier(identifier) - .length(MapUtil.getInt(item, "messageLength")) - .type(CommandTypeComparable.CommandType.COLLECTION) - .key(StringUtils.joinWith("/", identifier, - StringUtils.trim(MapUtil.getStr(item, "deviceId")), - StringUtils.trim(MapUtil.getStr(item, "commandId")), - timestamp)) - .build(); - }) - .collect(Collectors.toList()); - if (ObjectUtils.isNotEmpty(collectCommondList)) { - this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList)); + this.collectThreadPool.execute(() -> { + this.controller.collect(resultList.stream() + .filter(item -> { + String gatewaySn = StringUtils.trim(MapUtil.getStr(item, "identifier")); + String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn); + if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup() + .get(gatewayIdentifier))) { + item.put("identifier", gatewayIdentifier); + return true; } - } - }); + log.warn("Gateway {} is disconnected and does not collect data", gatewaySn); + return false; + }) + .collect(Collectors.groupingBy(item -> StringUtils.trim(MapUtil.getStr(item, "deviceId")))) + .entrySet() + .stream() + .flatMap(entry -> { + List> commandList = entry.getValue(); + Map>> refTypeCommandListMap = commandList.stream() + .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType"))); + List> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE); + if (ObjectUtils.isEmpty(deviceCommandList)) { + deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE); + } + if (ObjectUtils.isNotEmpty(deviceCommandList)) { + return deviceCommandList.stream() + .map(item -> { + String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier")); + return ModbusCommandDto.builder() + .command(StringUtils.trim(MapUtil.getStr(item, "command"))) + .identifier(identifier) + .length(MapUtil.getInt(item, "messageLength")) + .type(CommandTypeComparable.CommandType.COLLECTION) + .key(StringUtils.joinWith("/", identifier, + StringUtils.trim(MapUtil.getStr(item, "deviceId")), + StringUtils.trim(MapUtil.getStr(item, "commandId")), + timestamp)) + .build(); + }); + } + return Stream.empty(); + }) + .collect(Collectors.toList())); +// .forEach((deviceId, commandList) -> { +// Map>> refTypeCommandListMap = commandList.stream() +// .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType"))); +// List> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE); +// if (ObjectUtils.isEmpty(deviceCommandList)) { +// deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE); +// } +// if (ObjectUtils.isNotEmpty(deviceCommandList)) { +// List collectCommondList = deviceCommandList.stream() +// .map(item -> { +// String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier")); +// return ModbusCommandDto.builder() +// .command(StringUtils.trim(MapUtil.getStr(item, "command"))) +// .identifier(identifier) +// .length(MapUtil.getInt(item, "messageLength")) +// .type(CommandTypeComparable.CommandType.COLLECTION) +// .key(StringUtils.joinWith("/", identifier, +// StringUtils.trim(MapUtil.getStr(item, "deviceId")), +// StringUtils.trim(MapUtil.getStr(item, "commandId")), +// timestamp)) +// .build(); +// }) +// .collect(Collectors.toList()); +// if (ObjectUtils.isNotEmpty(collectCommondList)) { +// this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList)); +// } +// } +// }); + }); + if (resultList.size() < pageSize) { + break; + } } } } diff --git a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java index 5ae63a3..be25fc4 100644 --- a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java +++ b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java @@ -8,6 +8,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.socket.SocketChannel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.core.RabbitAdmin; @@ -150,8 +151,14 @@ public class SyncPriorityChannel implements Runnable { } public void setIdentifier(String identifier) { - this.identifier = identifier; - this.setCollectQueue(); + if (StringUtils.isBlank(this.identifier)) { + this.identifier = identifier; + this.setCollectQueue(); + } else if (!StringUtils.equals(this.identifier, identifier)) { + throw new RuntimeException("多个设备对应同一心跳包,当前已绑定心跳信息:" + + new String(HexUtil.hexStringToBytes(identifier)) + "[" + identifier + "] --> " + + this.channel.remoteAddress().getHostString() + ":" + this.channel.remoteAddress().getPort()); + } } public void setCollectQueue() {