BUG修复

This commit is contained in:
wangshilong 2024-12-22 15:20:43 +08:00
parent abcbd83bc2
commit cd236a420c
11 changed files with 68 additions and 53 deletions

View File

@ -1,7 +1,6 @@
package com.iot.modbus_rtcp; package com.iot.modbus_rtcp;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
@ -19,7 +18,7 @@ public class ModbusRtcpApplication implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit"); private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit");
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
LicenseValidator.main(args); // LicenseValidator.main(args);
SpringApplication.run(ModbusRtcpApplication.class, args); SpringApplication.run(ModbusRtcpApplication.class, args);
} }

View File

@ -103,7 +103,7 @@ public class ModbusTCPController implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
this.nettyServer = new NettyServer(502, 10); this.nettyServer = new NettyServer(502, Math.max(20, Runtime.getRuntime().availableProcessors() - 1));
this.nettyServer.start(); this.nettyServer.start();
} }

View File

@ -0,0 +1,15 @@
package com.iot.modbus_rtcp.dto;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
@ToString(callSuper = true)
public class AutoCollectionDto extends ModbusCommandDto {
}

View File

@ -3,11 +3,13 @@ package com.iot.modbus_rtcp.dto;
import lombok.Data; import lombok.Data;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@Data @Data
@SuperBuilder @SuperBuilder
@NoArgsConstructor @NoArgsConstructor
@ToString(callSuper = true)
public class CommandTypeComparable implements Comparable { public class CommandTypeComparable implements Comparable {
private CommandType type; private CommandType type;

View File

@ -2,12 +2,14 @@ package com.iot.modbus_rtcp.dto;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@Data @Data
@SuperBuilder @SuperBuilder
@NoArgsConstructor @NoArgsConstructor
@ToString(callSuper = true)
public class ModbusCommandDto extends CommandTypeComparable implements CommandDto { public class ModbusCommandDto extends CommandTypeComparable implements CommandDto {
/** /**
* 12位十六进制的命令 * 12位十六进制的命令

View File

@ -3,13 +3,14 @@ package com.iot.modbus_rtcp.jobs;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import com.iot.modbus_rtcp.config.EquipmentIPProperties; import com.iot.modbus_rtcp.config.EquipmentIPProperties;
import com.iot.modbus_rtcp.controller.ModbusTCPController; import com.iot.modbus_rtcp.controller.ModbusTCPController;
import com.iot.modbus_rtcp.dto.AutoCollectionDto;
import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
@ -80,7 +81,7 @@ public class AutoCollectJobs {
} }
}, new ThreadPoolExecutor.CallerRunsPolicy()); }, new ThreadPoolExecutor.CallerRunsPolicy());
// @Scheduled(cron = "0/30 * * * * ? ") @Scheduled(cron = "0/30 * * * * ? ")
public void autoCollect() { public void autoCollect() {
int pageIndex = 0; int pageIndex = 0;
int pageSize = 1000; int pageSize = 1000;
@ -118,7 +119,7 @@ public class AutoCollectJobs {
return deviceCommandList.stream() return deviceCommandList.stream()
.map(item -> { .map(item -> {
String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier")); String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier"));
return ModbusCommandDto.builder() return AutoCollectionDto.builder()
.command(StringUtils.trim(MapUtil.getStr(item, "command"))) .command(StringUtils.trim(MapUtil.getStr(item, "command")))
.identifier(identifier) .identifier(identifier)
.length(MapUtil.getInt(item, "messageLength")) .length(MapUtil.getInt(item, "messageLength"))
@ -133,34 +134,6 @@ public class AutoCollectJobs {
return Stream.empty(); return Stream.empty();
}) })
.collect(Collectors.toList())); .collect(Collectors.toList()));
// .forEach((deviceId, commandList) -> {
// Map<String, List<Map<String, Object>>> refTypeCommandListMap = commandList.stream()
// .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType")));
// List<Map<String, Object>> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE);
// if (ObjectUtils.isEmpty(deviceCommandList)) {
// deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE);
// }
// if (ObjectUtils.isNotEmpty(deviceCommandList)) {
// List<ModbusCommandDto> collectCommondList = deviceCommandList.stream()
// .map(item -> {
// String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier"));
// return ModbusCommandDto.builder()
// .command(StringUtils.trim(MapUtil.getStr(item, "command")))
// .identifier(identifier)
// .length(MapUtil.getInt(item, "messageLength"))
// .type(CommandTypeComparable.CommandType.COLLECTION)
// .key(StringUtils.joinWith("/", identifier,
// StringUtils.trim(MapUtil.getStr(item, "deviceId")),
// StringUtils.trim(MapUtil.getStr(item, "commandId")),
// timestamp))
// .build();
// })
// .collect(Collectors.toList());
// if (ObjectUtils.isNotEmpty(collectCommondList)) {
// this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList));
// }
// }
// });
}); });
if (resultList.size() < pageSize) { if (resultList.size() < pageSize) {
break; break;

View File

@ -34,7 +34,7 @@ public class ModbusDecoder extends ByteToMessageDecoder {
buffer.readBytes(b); buffer.readBytes(b);
String hex = HexUtil.bytesToHexString(b); String hex = HexUtil.bytesToHexString(b);
log.info("解析到:{}", hex); log.debug("解析到:{}", hex);
String currentAddress = ChannelGroup.getKey(ip, port); String currentAddress = ChannelGroup.getKey(ip, port);
if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳 if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳
@ -67,7 +67,7 @@ public class ModbusDecoder extends ByteToMessageDecoder {
ModbusCommandDto message = channel.getCurrentMessage(); ModbusCommandDto message = channel.getCurrentMessage();
if (message == null) { if (message == null) {
log.warn("非法传输:{},原报文:{}", HexUtil.bytesToHexString(b), new String(b)); log.warn("非法传输:{},原报文:{}IP{}:{}", HexUtil.bytesToHexString(b), new String(b), ip, port);
return; return;
} }

View File

@ -35,7 +35,7 @@ public class ModbusSender {
throw new RuntimeException("设备" + identity + "的连接未配置或该设备已断开与服务器的连接,请稍后再试或联系管理员"); throw new RuntimeException("设备" + identity + "的连接未配置或该设备已断开与服务器的连接,请稍后再试或联系管理员");
} }
log.info("{}通道({})提交{}条命令:{}", this.equipmentIPProperties.get(identity), identity, v.size(), v); log.debug("{}通道({})提交{}条命令:{}", this.equipmentIPProperties.get(identity), identity, v.size(), v);
channel.addMessages(v); channel.addMessages(v);
}); });
} }

View File

@ -1,7 +1,7 @@
package com.iot.modbus_rtcp.netty; package com.iot.modbus_rtcp.netty;
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator; //import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
import com.iot.modbus_rtcp.utils.SpringUtil;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
@ -75,7 +75,7 @@ public class NettyServer extends ChannelInitializer<SocketChannel> {
String ip = ch.remoteAddress().getHostString(); String ip = ch.remoteAddress().getHostString();
//过滤掉docker 网关请求 //过滤掉docker 网关请求
if ("172.17.0.1".equals(ip)) return; if ("172.17.0.1".equals(ip)) return;
SpringUtil.getBean(LicenseValidator.class).verifyNumber(ch::close); // SpringUtil.getBean(LicenseValidator.class).verifyNumber(ch::close);
int port = ch.remoteAddress().getPort(); int port = ch.remoteAddress().getPort();
this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch)); this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch));

View File

@ -39,10 +39,10 @@ public class SyncHandler extends ChannelInboundHandlerAdapter {
try { try {
channel.getChannelPromise().setSuccess(); channel.getChannelPromise().setSuccess();
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
log.info("数据返回慢了,已经超时!"); log.warn("数据返回慢了,已经超时!");
return; return;
} catch (Exception e) { } catch (Exception e) {
log.info("丢失数据:{}", msg); log.warn("丢失数据:{}", msg);
return; return;
} }
@ -64,7 +64,7 @@ public class SyncHandler extends ChannelInboundHandlerAdapter {
} }
String json = message.getKey() + "/" + System.currentTimeMillis() + "/" + msg; String json = message.getKey() + "/" + System.currentTimeMillis() + "/" + msg;
log.info("推数据到MQ({}): {}", channel.getCollectQueue(), json); log.debug("推数据到MQ({}): {}", channel.getCollectQueue(), json);
try { try {
this.rabbitTemplate.convertAndSend(channel.getCollectQueue(), json); this.rabbitTemplate.convertAndSend(channel.getCollectQueue(), json);
} catch (Exception e) { } catch (Exception e) {
@ -80,7 +80,7 @@ public class SyncHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("异常: ", cause); log.warn("异常: ", cause);
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String ipKey = ChannelGroup.getKey(remoteAddress.getHostString(), remoteAddress.getPort()); String ipKey = ChannelGroup.getKey(remoteAddress.getHostString(), remoteAddress.getPort());
this.channelGroup.remove(ipKey); this.channelGroup.remove(ipKey);

View File

@ -1,5 +1,6 @@
package com.iot.modbus_rtcp.netty; package com.iot.modbus_rtcp.netty;
import com.iot.modbus_rtcp.dto.AutoCollectionDto;
import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import com.iot.modbus_rtcp.utils.HexUtil; import com.iot.modbus_rtcp.utils.HexUtil;
@ -13,6 +14,7 @@ import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
@ -53,6 +55,8 @@ public class SyncPriorityChannel implements Runnable {
} }
public void addMessages(List<ModbusCommandDto> messages) { public void addMessages(List<ModbusCommandDto> messages) {
cleanAutoCollection(messages);
this.messageQueue.addAll(messages); this.messageQueue.addAll(messages);
synchronized (this.object) { synchronized (this.object) {
@ -91,12 +95,21 @@ public class SyncPriorityChannel implements Runnable {
this.channelPromiseReference.setRelease(this.channel.newPromise()); this.channelPromiseReference.setRelease(this.channel.newPromise());
if (isControl || Objects.equals(CommandTypeComparable.CommandType.COLLECTION, this.getCurrentMessage().getType())) { if (isControl || Objects.equals(CommandTypeComparable.CommandType.COLLECTION, this.getCurrentMessage().getType())) {
log.info("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送", if (Objects.equals(CommandTypeComparable.CommandType.CONTROL, this.getCurrentMessage().getType())) {
this.getCurrentMessage().getCommand(), log.info("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
this.getCurrentMessage().getLength(), this.getCurrentMessage().getCommand(),
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)), this.getCurrentMessage().getLength(),
ChannelGroup.getKey(ip, port), IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
this.messageQueue.size()); ChannelGroup.getKey(ip, port),
this.messageQueue.size());
} else {
log.debug("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
this.getCurrentMessage().getCommand(),
this.getCurrentMessage().getLength(),
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
ChannelGroup.getKey(ip, port),
this.messageQueue.size());
}
this.channel.writeAndFlush(this.getCurrentMessage().getCommand()); this.channel.writeAndFlush(this.getCurrentMessage().getCommand());
} }
@ -106,19 +119,19 @@ public class SyncPriorityChannel implements Runnable {
boolean timeout; boolean timeout;
try { try {
timeout = !this.getChannelPromise().await(3, TimeUnit.SECONDS); timeout = !this.getChannelPromise().await(1, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("channelPromise.await发生异常,", e); log.error("channelPromise.await发生异常,", e);
return; return;
} }
if (timeout && this.timeoutRetryCount == (allowRetryCount - 1)) { if (timeout && this.timeoutRetryCount == (allowRetryCount - 1)) {
log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送", log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送,Command:{}",
this.getCurrentMessage().getCommand(), this.getCurrentMessage().getCommand(),
this.getCurrentMessage().getLength(), this.getCurrentMessage().getLength(),
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)), IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
ChannelGroup.getKey(ip, port), ChannelGroup.getKey(ip, port),
this.messageQueue.size()); this.messageQueue.size(), this.getCurrentMessage());
} }
if (timeout && this.timeoutRetryCount < (allowRetryCount - 1)) { if (timeout && this.timeoutRetryCount < (allowRetryCount - 1)) {
@ -166,4 +179,15 @@ public class SyncPriorityChannel implements Runnable {
SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(this.collectQueue).build()); SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(this.collectQueue).build());
} }
private void cleanAutoCollection(List<ModbusCommandDto> messages) {
if (messages.stream().anyMatch(c -> c instanceof AutoCollectionDto)) {
Iterator<ModbusCommandDto> it = messageQueue.iterator();
while (it.hasNext()) {
if (it.next() instanceof AutoCollectionDto) {
it.remove();
}
}
}
}
} }