Compare commits
10 Commits
392ccee6ba
...
12f7292ec4
Author | SHA1 | Date |
---|---|---|
|
12f7292ec4 | |
|
0cacf26649 | |
|
ed1931cd98 | |
|
2acf6d2f7c | |
|
d2acd803f3 | |
|
43284cfe85 | |
|
ec5c7e0eba | |
|
cd236a420c | |
|
abcbd83bc2 | |
|
ee03c86167 |
5
pom.xml
5
pom.xml
|
@ -112,6 +112,11 @@
|
|||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.iot.modbus.rtcp.commons</groupId>
|
||||
<artifactId>modbus-rtcp-commons-keymgr</artifactId>
|
||||
<version>1.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-devtools</artifactId>
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package com.iot.modbus_rtcp;
|
||||
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
|
@ -12,12 +13,13 @@ import org.springframework.context.ApplicationContextAware;
|
|||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@EnableScheduling
|
||||
@SpringBootApplication(scanBasePackages = {"com.iot.modbus_rtcp"})
|
||||
@SpringBootApplication(scanBasePackages = {"com.iot.modbus_rtcp", "com.iot.modbus.rtcp"})
|
||||
public class ModbusRtcpApplication implements ApplicationContextAware {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit");
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws Exception {
|
||||
LicenseValidator.main(null);
|
||||
SpringApplication.run(ModbusRtcpApplication.class, args);
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,8 @@ public class EquipmentIPProperties {
|
|||
if (gatewayHeartbeat.length() > 100) {
|
||||
return false;
|
||||
}
|
||||
// 解决心跳包中可能存在的空格符和换行符
|
||||
gatewayHeartbeat = HexUtil.bytesToHexString(StringUtils.trim(new String(HexUtil.hexStringToBytes(gatewayHeartbeat))).getBytes()).toUpperCase();
|
||||
// 每分钟从数据库入库一次
|
||||
if (this.lastSyncEquipment.getAcquire() < System.currentTimeMillis() - 60_000) {
|
||||
this.jdbcTemplate.queryForList("select gateway_sn from device").forEach(map -> {
|
||||
|
|
|
@ -3,6 +3,7 @@ package com.iot.modbus_rtcp.controller;
|
|||
import com.iot.modbus_rtcp.dto.CommandTypeComparable;
|
||||
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
|
||||
import com.iot.modbus_rtcp.netty.NettyServer;
|
||||
import com.iot.modbus_rtcp.netty.ThreadPoolConsumer;
|
||||
import com.iot.modbus_rtcp.utils.CRCUtil;
|
||||
import com.iot.modbus_rtcp.utils.HexUtil;
|
||||
import com.iot.modbus_rtcp.vo.Response;
|
||||
|
@ -27,14 +28,15 @@ public class ModbusTCPController implements ApplicationRunner {
|
|||
|
||||
@PreDestroy
|
||||
private void destroy() {
|
||||
this.nettyServer.stop();
|
||||
ThreadPoolConsumer.shutdown();
|
||||
nettyServer.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* 采集命令API
|
||||
* 采集命令API使用异步访问的方式,设备响应数据后会将其推送到Kafka
|
||||
*
|
||||
* @param modbusCommandBoList
|
||||
* @param modbusCommandBoList 采集指令
|
||||
* @return
|
||||
*/
|
||||
@PostMapping("/collect")
|
||||
|
@ -42,14 +44,14 @@ public class ModbusTCPController implements ApplicationRunner {
|
|||
log.debug("采集请求:{}", modbusCommandBoList);
|
||||
|
||||
try {
|
||||
modbusCommandBoList.stream().forEach(modbusCommandBo -> {
|
||||
modbusCommandBoList.forEach(modbusCommandBo -> {
|
||||
modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand())));
|
||||
|
||||
modbusCommandBo.setType(CommandTypeComparable.CommandType.COLLECTION);
|
||||
modbusCommandBo.setTimestamp(System.nanoTime());
|
||||
});
|
||||
|
||||
this.nettyServer.sender().send(modbusCommandBoList);
|
||||
nettyServer.sender().send(modbusCommandBoList);
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
return Response.failed(e.getMessage());
|
||||
|
@ -62,7 +64,7 @@ public class ModbusTCPController implements ApplicationRunner {
|
|||
* 控制命令API
|
||||
* 控制命令API使用异步访问的方式,设备响应数据自动丢弃
|
||||
*
|
||||
* @param modbusCommandBoList
|
||||
* @param modbusCommandBoList 控制指令
|
||||
* @return 设备响应返回
|
||||
*/
|
||||
@PostMapping("/control")
|
||||
|
@ -70,14 +72,14 @@ public class ModbusTCPController implements ApplicationRunner {
|
|||
log.debug("控制请求:{}", modbusCommandBoList);
|
||||
|
||||
try {
|
||||
modbusCommandBoList.stream().forEach(modbusCommandBo -> {
|
||||
modbusCommandBoList.forEach(modbusCommandBo -> {
|
||||
modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand())));
|
||||
|
||||
modbusCommandBo.setType(CommandTypeComparable.CommandType.CONTROL);
|
||||
modbusCommandBo.setTimestamp(System.nanoTime());
|
||||
});
|
||||
|
||||
this.nettyServer.sender().send(modbusCommandBoList);
|
||||
nettyServer.sender().send(modbusCommandBoList);
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
return Response.failed(e.getMessage());
|
||||
|
@ -93,13 +95,18 @@ public class ModbusTCPController implements ApplicationRunner {
|
|||
*/
|
||||
@GetMapping("/online")
|
||||
public Response<Set<String>> online() {
|
||||
return Response.succeed(this.nettyServer.getGroup().onlineGateway());
|
||||
return Response.succeed(nettyServer.getGroup().onlineGateway());
|
||||
}
|
||||
|
||||
@GetMapping("/online/total")
|
||||
public Response<Long> onlineTotal() {
|
||||
return Response.succeed(nettyServer.getGroup().onlineTotal());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
this.nettyServer = new NettyServer(502, 10);
|
||||
this.nettyServer.start();
|
||||
nettyServer = new NettyServer(502, Math.max(20, Runtime.getRuntime().availableProcessors() - 1));
|
||||
nettyServer.start();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
package com.iot.modbus_rtcp.dto;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
@ToString(callSuper = true)
|
||||
public class AutoCollectionDto extends ModbusCommandDto {
|
||||
|
||||
}
|
|
@ -3,11 +3,13 @@ package com.iot.modbus_rtcp.dto;
|
|||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
@ToString(callSuper = true)
|
||||
public class CommandTypeComparable implements Comparable {
|
||||
private CommandType type;
|
||||
|
||||
|
|
|
@ -2,12 +2,14 @@ package com.iot.modbus_rtcp.dto;
|
|||
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
|
||||
@Data
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
@ToString(callSuper = true)
|
||||
public class ModbusCommandDto extends CommandTypeComparable implements CommandDto {
|
||||
/**
|
||||
* 12位十六进制的命令
|
||||
|
|
|
@ -3,8 +3,8 @@ package com.iot.modbus_rtcp.jobs;
|
|||
import cn.hutool.core.map.MapUtil;
|
||||
import com.iot.modbus_rtcp.config.EquipmentIPProperties;
|
||||
import com.iot.modbus_rtcp.controller.ModbusTCPController;
|
||||
import com.iot.modbus_rtcp.dto.AutoCollectionDto;
|
||||
import com.iot.modbus_rtcp.dto.CommandTypeComparable;
|
||||
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
|
@ -22,6 +22,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:scwsl@foxmail.com">王仕龙</a>
|
||||
|
@ -66,10 +67,9 @@ public class AutoCollectJobs {
|
|||
private final JdbcTemplate jdbcTemplate;
|
||||
private final ModbusTCPController controller;
|
||||
private final EquipmentIPProperties equipmentIPProperties;
|
||||
private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors())
|
||||
, Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS,
|
||||
private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS,
|
||||
new ArrayBlockingQueue<>(5000), new ThreadFactory() {
|
||||
AtomicInteger index = new AtomicInteger(1);
|
||||
final AtomicInteger index = new AtomicInteger(1);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable runnable) {
|
||||
|
@ -85,13 +85,13 @@ public class AutoCollectJobs {
|
|||
int pageIndex = 0;
|
||||
int pageSize = 1000;
|
||||
long timestamp = System.currentTimeMillis();
|
||||
List<Map<String, Object>> resultList = null;
|
||||
while (Objects.isNull(resultList) || resultList.size() >= pageSize) {
|
||||
resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++ * pageSize) + ", " + pageSize);
|
||||
while (true) {
|
||||
List<Map<String, Object>> resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++ * pageSize) + ", " + pageSize);
|
||||
if (ObjectUtils.isEmpty(resultList)) {
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
resultList.stream()
|
||||
this.collectThreadPool.execute(() -> {
|
||||
this.controller.collect(resultList.stream()
|
||||
.filter(item -> {
|
||||
String gatewaySn = StringUtils.trim(MapUtil.getStr(item, "identifier"));
|
||||
String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn);
|
||||
|
@ -104,7 +104,10 @@ public class AutoCollectJobs {
|
|||
return false;
|
||||
})
|
||||
.collect(Collectors.groupingBy(item -> StringUtils.trim(MapUtil.getStr(item, "deviceId"))))
|
||||
.forEach((deviceId, commandList) -> {
|
||||
.entrySet()
|
||||
.stream()
|
||||
.flatMap(entry -> {
|
||||
List<Map<String, Object>> commandList = entry.getValue();
|
||||
Map<String, List<Map<String, Object>>> refTypeCommandListMap = commandList.stream()
|
||||
.collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType")));
|
||||
List<Map<String, Object>> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE);
|
||||
|
@ -112,10 +115,10 @@ public class AutoCollectJobs {
|
|||
deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE);
|
||||
}
|
||||
if (ObjectUtils.isNotEmpty(deviceCommandList)) {
|
||||
List<ModbusCommandDto> collectCommondList = deviceCommandList.stream()
|
||||
return deviceCommandList.stream()
|
||||
.map(item -> {
|
||||
String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier"));
|
||||
return ModbusCommandDto.builder()
|
||||
return AutoCollectionDto.builder()
|
||||
.command(StringUtils.trim(MapUtil.getStr(item, "command")))
|
||||
.identifier(identifier)
|
||||
.length(MapUtil.getInt(item, "messageLength"))
|
||||
|
@ -125,13 +128,15 @@ public class AutoCollectJobs {
|
|||
StringUtils.trim(MapUtil.getStr(item, "commandId")),
|
||||
timestamp))
|
||||
.build();
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
if (ObjectUtils.isNotEmpty(collectCommondList)) {
|
||||
this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
return Stream.empty();
|
||||
})
|
||||
.collect(Collectors.toList()));
|
||||
});
|
||||
if (resultList.size() < pageSize) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,6 +72,13 @@ public class ChannelGroup {
|
|||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
public Long onlineTotal() {
|
||||
return this.mChannelMap.keySet()
|
||||
.stream()
|
||||
.filter(identifier -> !StringUtils.contains(identifier, ":"))
|
||||
.count();
|
||||
}
|
||||
|
||||
public void see() {
|
||||
log.info("当前连接有:{}", new ArrayList<>(this.mChannelMap.keySet()));
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ public class ModbusDecoder extends ByteToMessageDecoder {
|
|||
buffer.readBytes(b);
|
||||
|
||||
String hex = HexUtil.bytesToHexString(b);
|
||||
log.info("解析到:{}", hex);
|
||||
log.debug("解析到:{}", hex);
|
||||
|
||||
String currentAddress = ChannelGroup.getKey(ip, port);
|
||||
if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳
|
||||
|
@ -67,7 +67,7 @@ public class ModbusDecoder extends ByteToMessageDecoder {
|
|||
ModbusCommandDto message = channel.getCurrentMessage();
|
||||
|
||||
if (message == null) {
|
||||
log.warn("非法传输:{},原报文:{}", HexUtil.bytesToHexString(b), new String(b));
|
||||
log.warn("非法传输:{},原报文:{},IP:{}:{}", HexUtil.bytesToHexString(b), new String(b), ip, port);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ public class ModbusSender {
|
|||
throw new RuntimeException("设备" + identity + "的连接未配置或该设备已断开与服务器的连接,请稍后再试或联系管理员");
|
||||
}
|
||||
|
||||
log.info("{}通道({})提交{}条命令:{}", this.equipmentIPProperties.get(identity), identity, v.size(), v);
|
||||
log.debug("{}通道({})提交{}条命令:{}", this.equipmentIPProperties.get(identity), identity, v.size(), v);
|
||||
channel.addMessages(v);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
package com.iot.modbus_rtcp.netty;
|
||||
|
||||
//import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
|
||||
|
||||
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
|
||||
import com.iot.modbus_rtcp.utils.SpringUtil;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
|
@ -73,6 +77,7 @@ public class NettyServer extends ChannelInitializer<SocketChannel> {
|
|||
String ip = ch.remoteAddress().getHostString();
|
||||
//过滤掉docker 网关请求
|
||||
if ("172.17.0.1".equals(ip)) return;
|
||||
SpringUtil.getBean(LicenseValidator.class).verifyNumber(ch::close);
|
||||
int port = ch.remoteAddress().getPort();
|
||||
this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch));
|
||||
|
||||
|
|
|
@ -39,10 +39,10 @@ public class SyncHandler extends ChannelInboundHandlerAdapter {
|
|||
try {
|
||||
channel.getChannelPromise().setSuccess();
|
||||
} catch (IllegalStateException e) {
|
||||
log.info("数据返回慢了,已经超时!");
|
||||
log.warn("数据返回慢了,已经超时!");
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
log.info("丢失数据:{}", msg);
|
||||
log.warn("丢失数据:{}", msg);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,7 @@ public class SyncHandler extends ChannelInboundHandlerAdapter {
|
|||
}
|
||||
|
||||
String json = message.getKey() + "/" + System.currentTimeMillis() + "/" + msg;
|
||||
log.info("推数据到MQ({}): {}", channel.getCollectQueue(), json);
|
||||
log.debug("推数据到MQ({}): {}", channel.getCollectQueue(), json);
|
||||
try {
|
||||
this.rabbitTemplate.convertAndSend(channel.getCollectQueue(), json);
|
||||
} catch (Exception e) {
|
||||
|
@ -80,7 +80,7 @@ public class SyncHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
log.info("异常: ", cause);
|
||||
log.warn("异常: ", cause);
|
||||
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
String ipKey = ChannelGroup.getKey(remoteAddress.getHostString(), remoteAddress.getPort());
|
||||
this.channelGroup.remove(ipKey);
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package com.iot.modbus_rtcp.netty;
|
||||
|
||||
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
|
||||
import com.iot.modbus_rtcp.dto.AutoCollectionDto;
|
||||
import com.iot.modbus_rtcp.dto.CommandTypeComparable;
|
||||
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
|
||||
import com.iot.modbus_rtcp.utils.HexUtil;
|
||||
|
@ -8,6 +10,7 @@ import io.netty.channel.ChannelPromise;
|
|||
import io.netty.channel.socket.SocketChannel;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.amqp.core.QueueBuilder;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
|
||||
|
@ -18,6 +21,7 @@ import java.util.concurrent.PriorityBlockingQueue;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
/**
|
||||
* 同步控制命令优先Channel
|
||||
|
@ -32,6 +36,7 @@ public class SyncPriorityChannel implements Runnable {
|
|||
private static final int allowRetryCount = 1;
|
||||
private static final boolean isControl = true;
|
||||
private final AtomicBoolean close = new AtomicBoolean(Boolean.FALSE);
|
||||
private final AtomicBoolean running = new AtomicBoolean(Boolean.FALSE);
|
||||
|
||||
private final SocketChannel channel;
|
||||
|
||||
|
@ -40,9 +45,7 @@ public class SyncPriorityChannel implements Runnable {
|
|||
private volatile String identifier;
|
||||
@Getter
|
||||
private volatile String collectQueue;
|
||||
private volatile boolean running = false;
|
||||
|
||||
private final Object object = new Object();
|
||||
private final PriorityBlockingQueue<ModbusCommandDto> messageQueue = new PriorityBlockingQueue<>();
|
||||
private final AtomicReference<ChannelPromise> channelPromiseReference = new AtomicReference<>();
|
||||
private final AtomicReference<ModbusCommandDto> currentMessageReference = new AtomicReference<>();
|
||||
|
@ -52,13 +55,15 @@ public class SyncPriorityChannel implements Runnable {
|
|||
}
|
||||
|
||||
public void addMessages(List<ModbusCommandDto> messages) {
|
||||
this.cleanAutoCollection(messages);
|
||||
|
||||
this.messageQueue.addAll(messages);
|
||||
|
||||
synchronized (this.object) {
|
||||
if (this.running) {
|
||||
synchronized (this.running) {
|
||||
if (this.running.getAcquire()) {
|
||||
return;
|
||||
} else {
|
||||
this.running = true;
|
||||
this.running.setRelease(Boolean.TRUE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,7 +85,7 @@ public class SyncPriorityChannel implements Runnable {
|
|||
|
||||
// 消费完毕终止递归发送
|
||||
if (Objects.isNull(this.getCurrentMessage())) {
|
||||
this.running = false;
|
||||
this.running.setRelease(Boolean.FALSE);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -90,12 +95,21 @@ public class SyncPriorityChannel implements Runnable {
|
|||
|
||||
this.channelPromiseReference.setRelease(this.channel.newPromise());
|
||||
if (isControl || Objects.equals(CommandTypeComparable.CommandType.COLLECTION, this.getCurrentMessage().getType())) {
|
||||
if (Objects.equals(CommandTypeComparable.CommandType.CONTROL, this.getCurrentMessage().getType())) {
|
||||
log.info("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
|
||||
this.getCurrentMessage().getCommand(),
|
||||
this.getCurrentMessage().getLength(),
|
||||
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
|
||||
ChannelGroup.getKey(ip, port),
|
||||
this.messageQueue.size());
|
||||
} else {
|
||||
log.debug("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
|
||||
this.getCurrentMessage().getCommand(),
|
||||
this.getCurrentMessage().getLength(),
|
||||
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
|
||||
ChannelGroup.getKey(ip, port),
|
||||
this.messageQueue.size());
|
||||
}
|
||||
|
||||
this.channel.writeAndFlush(this.getCurrentMessage().getCommand());
|
||||
}
|
||||
|
@ -112,12 +126,12 @@ public class SyncPriorityChannel implements Runnable {
|
|||
}
|
||||
|
||||
if (timeout && this.timeoutRetryCount == (allowRetryCount - 1)) {
|
||||
log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
|
||||
log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送,Command:{}",
|
||||
this.getCurrentMessage().getCommand(),
|
||||
this.getCurrentMessage().getLength(),
|
||||
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
|
||||
ChannelGroup.getKey(ip, port),
|
||||
this.messageQueue.size());
|
||||
this.messageQueue.size(), this.getCurrentMessage());
|
||||
}
|
||||
|
||||
if (timeout && this.timeoutRetryCount < (allowRetryCount - 1)) {
|
||||
|
@ -126,11 +140,7 @@ public class SyncPriorityChannel implements Runnable {
|
|||
this.timeoutRetryCount = 0;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
|
||||
|
||||
this.sendNext();
|
||||
}
|
||||
|
@ -150,8 +160,14 @@ public class SyncPriorityChannel implements Runnable {
|
|||
}
|
||||
|
||||
public void setIdentifier(String identifier) {
|
||||
if (StringUtils.isBlank(this.identifier)) {
|
||||
this.identifier = identifier;
|
||||
this.setCollectQueue();
|
||||
} else if (!StringUtils.equals(this.identifier, identifier)) {
|
||||
throw new RuntimeException("多个设备对应同一心跳包,当前已绑定心跳信息:"
|
||||
+ new String(HexUtil.hexStringToBytes(identifier)) + "[" + identifier + "] --> "
|
||||
+ this.channel.remoteAddress().getHostString() + ":" + this.channel.remoteAddress().getPort());
|
||||
}
|
||||
}
|
||||
|
||||
public void setCollectQueue() {
|
||||
|
@ -159,4 +175,11 @@ public class SyncPriorityChannel implements Runnable {
|
|||
SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(this.collectQueue).build());
|
||||
}
|
||||
|
||||
private void cleanAutoCollection(List<ModbusCommandDto> messages) {
|
||||
SpringUtil.getBean(LicenseValidator.class).verify();
|
||||
if (messages.stream().anyMatch(c -> c instanceof AutoCollectionDto)) {
|
||||
this.messageQueue.removeIf(command -> command instanceof AutoCollectionDto);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,13 +9,21 @@ public class ThreadPoolConsumer {
|
|||
|
||||
static {
|
||||
mThreadPoolExecutor = new ThreadPoolTaskExecutor();
|
||||
mThreadPoolExecutor.setCorePoolSize(4);
|
||||
mThreadPoolExecutor.setMaxPoolSize(8);
|
||||
mThreadPoolExecutor.setQueueCapacity(5000);
|
||||
mThreadPoolExecutor.setKeepAliveSeconds(60);
|
||||
mThreadPoolExecutor.setThreadPriority(Thread.MAX_PRIORITY);
|
||||
mThreadPoolExecutor.setThreadNamePrefix("Modbus-command-sender-thread-");
|
||||
mThreadPoolExecutor.setCorePoolSize(Math.min(50, Runtime.getRuntime().availableProcessors() - 1));
|
||||
mThreadPoolExecutor.setMaxPoolSize(Math.min(500, Runtime.getRuntime().availableProcessors() * 10));
|
||||
mThreadPoolExecutor.initialize();
|
||||
}
|
||||
|
||||
public static void submit(Runnable task) {
|
||||
mThreadPoolExecutor.submit(task);
|
||||
}
|
||||
|
||||
public static void shutdown() {
|
||||
mThreadPoolExecutor.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
spring:
|
||||
datasource:
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
type: com.alibaba.druid.pool.DruidDataSource
|
||||
url: jdbc:mysql://${MYSQL_HOST:172.17.0.1}:${MYSQL_PORT:3306}/gas_well_watch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
|
||||
username: ${MYSQL_USERNAME:root}
|
||||
password: ${MYSQL_USERNAME:@Root123}
|
||||
rabbitmq:
|
||||
host: ${RABBIT_MQ_HOST:172.17.0.1}
|
||||
port: ${RABBIT_MQ_PORT:5672}
|
||||
username: ${RABBIT_MQ_USERNAME:huaxia}
|
||||
password: ${RABBIT_MQ_PASSWORD:huaxia@2024}
|
||||
virtual-host: /gas
|
|
@ -11,7 +11,7 @@
|
|||
<!-- %highlight():转换说明符以粗体红色显示其级别为ERROR的事件,红色为WARN,BLUE为INFO,以及其他级别的默认颜色。 -->
|
||||
<property name="LOG_PATTERN"
|
||||
value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) --- [%15.15(%thread)] [%X{traceId}] %cyan(%-40.40(%logger{40})) : %msg%n"/>
|
||||
<property name="LOG_HOME" value="/Users/snow/log/venta/mtcp"/>
|
||||
<property name="LOG_HOME" value="./logs"/>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<!--<immediateFlush>true</immediateFlush>-->
|
||||
<encoder>
|
||||
|
@ -54,7 +54,6 @@
|
|||
</encoder>
|
||||
</appender>
|
||||
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT"/>
|
||||
<appender-ref ref="LOCAL_INFO_LOG"/>
|
||||
|
|
Binary file not shown.
Loading…
Reference in New Issue