Compare commits

..

No commits in common. "12f7292ec42470877efe840d94c942fed1f47e81" and "392ccee6ba427edc99f330316d9cf208b6e7fef8" have entirely different histories.

18 changed files with 93 additions and 188 deletions

View File

@ -112,11 +112,6 @@
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.iot.modbus.rtcp.commons</groupId>
<artifactId>modbus-rtcp-commons-keymgr</artifactId>
<version>1.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId> <artifactId>spring-boot-devtools</artifactId>

View File

@ -1,7 +1,6 @@
package com.iot.modbus_rtcp; package com.iot.modbus_rtcp;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
@ -13,13 +12,12 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling @EnableScheduling
@SpringBootApplication(scanBasePackages = {"com.iot.modbus_rtcp", "com.iot.modbus.rtcp"}) @SpringBootApplication(scanBasePackages = {"com.iot.modbus_rtcp"})
public class ModbusRtcpApplication implements ApplicationContextAware { public class ModbusRtcpApplication implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit"); private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit");
public static void main(String[] args) throws Exception { public static void main(String[] args) {
LicenseValidator.main(null);
SpringApplication.run(ModbusRtcpApplication.class, args); SpringApplication.run(ModbusRtcpApplication.class, args);
} }

View File

@ -36,8 +36,6 @@ public class EquipmentIPProperties {
if (gatewayHeartbeat.length() > 100) { if (gatewayHeartbeat.length() > 100) {
return false; return false;
} }
// 解决心跳包中可能存在的空格符和换行符
gatewayHeartbeat = HexUtil.bytesToHexString(StringUtils.trim(new String(HexUtil.hexStringToBytes(gatewayHeartbeat))).getBytes()).toUpperCase();
// 每分钟从数据库入库一次 // 每分钟从数据库入库一次
if (this.lastSyncEquipment.getAcquire() < System.currentTimeMillis() - 60_000) { if (this.lastSyncEquipment.getAcquire() < System.currentTimeMillis() - 60_000) {
this.jdbcTemplate.queryForList("select gateway_sn from device").forEach(map -> { this.jdbcTemplate.queryForList("select gateway_sn from device").forEach(map -> {

View File

@ -3,7 +3,6 @@ package com.iot.modbus_rtcp.controller;
import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import com.iot.modbus_rtcp.netty.NettyServer; import com.iot.modbus_rtcp.netty.NettyServer;
import com.iot.modbus_rtcp.netty.ThreadPoolConsumer;
import com.iot.modbus_rtcp.utils.CRCUtil; import com.iot.modbus_rtcp.utils.CRCUtil;
import com.iot.modbus_rtcp.utils.HexUtil; import com.iot.modbus_rtcp.utils.HexUtil;
import com.iot.modbus_rtcp.vo.Response; import com.iot.modbus_rtcp.vo.Response;
@ -28,15 +27,14 @@ public class ModbusTCPController implements ApplicationRunner {
@PreDestroy @PreDestroy
private void destroy() { private void destroy() {
ThreadPoolConsumer.shutdown(); this.nettyServer.stop();
nettyServer.stop();
} }
/** /**
* 采集命令API * 采集命令API
* 采集命令API使用异步访问的方式设备响应数据后会将其推送到Kafka * 采集命令API使用异步访问的方式设备响应数据后会将其推送到Kafka
* *
* @param modbusCommandBoList 采集指令 * @param modbusCommandBoList
* @return * @return
*/ */
@PostMapping("/collect") @PostMapping("/collect")
@ -44,14 +42,14 @@ public class ModbusTCPController implements ApplicationRunner {
log.debug("采集请求:{}", modbusCommandBoList); log.debug("采集请求:{}", modbusCommandBoList);
try { try {
modbusCommandBoList.forEach(modbusCommandBo -> { modbusCommandBoList.stream().forEach(modbusCommandBo -> {
modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand()))); modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand())));
modbusCommandBo.setType(CommandTypeComparable.CommandType.COLLECTION); modbusCommandBo.setType(CommandTypeComparable.CommandType.COLLECTION);
modbusCommandBo.setTimestamp(System.nanoTime()); modbusCommandBo.setTimestamp(System.nanoTime());
}); });
nettyServer.sender().send(modbusCommandBoList); this.nettyServer.sender().send(modbusCommandBoList);
} catch (Exception e) { } catch (Exception e) {
log.error("", e); log.error("", e);
return Response.failed(e.getMessage()); return Response.failed(e.getMessage());
@ -64,7 +62,7 @@ public class ModbusTCPController implements ApplicationRunner {
* 控制命令API * 控制命令API
* 控制命令API使用异步访问的方式设备响应数据自动丢弃 * 控制命令API使用异步访问的方式设备响应数据自动丢弃
* *
* @param modbusCommandBoList 控制指令 * @param modbusCommandBoList
* @return 设备响应返回 * @return 设备响应返回
*/ */
@PostMapping("/control") @PostMapping("/control")
@ -72,14 +70,14 @@ public class ModbusTCPController implements ApplicationRunner {
log.debug("控制请求:{}", modbusCommandBoList); log.debug("控制请求:{}", modbusCommandBoList);
try { try {
modbusCommandBoList.forEach(modbusCommandBo -> { modbusCommandBoList.stream().forEach(modbusCommandBo -> {
modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand()))); modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.hexStringToBytes(modbusCommandBo.getCommand())));
modbusCommandBo.setType(CommandTypeComparable.CommandType.CONTROL); modbusCommandBo.setType(CommandTypeComparable.CommandType.CONTROL);
modbusCommandBo.setTimestamp(System.nanoTime()); modbusCommandBo.setTimestamp(System.nanoTime());
}); });
nettyServer.sender().send(modbusCommandBoList); this.nettyServer.sender().send(modbusCommandBoList);
} catch (Exception e) { } catch (Exception e) {
log.error("", e); log.error("", e);
return Response.failed(e.getMessage()); return Response.failed(e.getMessage());
@ -95,18 +93,13 @@ public class ModbusTCPController implements ApplicationRunner {
*/ */
@GetMapping("/online") @GetMapping("/online")
public Response<Set<String>> online() { public Response<Set<String>> online() {
return Response.succeed(nettyServer.getGroup().onlineGateway()); return Response.succeed(this.nettyServer.getGroup().onlineGateway());
}
@GetMapping("/online/total")
public Response<Long> onlineTotal() {
return Response.succeed(nettyServer.getGroup().onlineTotal());
} }
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
nettyServer = new NettyServer(502, Math.max(20, Runtime.getRuntime().availableProcessors() - 1)); this.nettyServer = new NettyServer(502, 10);
nettyServer.start(); this.nettyServer.start();
} }
} }

View File

@ -1,15 +0,0 @@
package com.iot.modbus_rtcp.dto;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
@ToString(callSuper = true)
public class AutoCollectionDto extends ModbusCommandDto {
}

View File

@ -3,13 +3,11 @@ package com.iot.modbus_rtcp.dto;
import lombok.Data; import lombok.Data;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@Data @Data
@SuperBuilder @SuperBuilder
@NoArgsConstructor @NoArgsConstructor
@ToString(callSuper = true)
public class CommandTypeComparable implements Comparable { public class CommandTypeComparable implements Comparable {
private CommandType type; private CommandType type;

View File

@ -2,14 +2,12 @@ package com.iot.modbus_rtcp.dto;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@Data @Data
@SuperBuilder @SuperBuilder
@NoArgsConstructor @NoArgsConstructor
@ToString(callSuper = true)
public class ModbusCommandDto extends CommandTypeComparable implements CommandDto { public class ModbusCommandDto extends CommandTypeComparable implements CommandDto {
/** /**
* 12位十六进制的命令 * 12位十六进制的命令

View File

@ -3,8 +3,8 @@ package com.iot.modbus_rtcp.jobs;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import com.iot.modbus_rtcp.config.EquipmentIPProperties; import com.iot.modbus_rtcp.config.EquipmentIPProperties;
import com.iot.modbus_rtcp.controller.ModbusTCPController; import com.iot.modbus_rtcp.controller.ModbusTCPController;
import com.iot.modbus_rtcp.dto.AutoCollectionDto;
import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
@ -22,7 +22,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* @author <a href="mailto:scwsl@foxmail.com">王仕龙</a> * @author <a href="mailto:scwsl@foxmail.com">王仕龙</a>
@ -67,9 +66,10 @@ public class AutoCollectJobs {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final ModbusTCPController controller; private final ModbusTCPController controller;
private final EquipmentIPProperties equipmentIPProperties; private final EquipmentIPProperties equipmentIPProperties;
private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors())
, Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5000), new ThreadFactory() { new ArrayBlockingQueue<>(5000), new ThreadFactory() {
final AtomicInteger index = new AtomicInteger(1); AtomicInteger index = new AtomicInteger(1);
@Override @Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
@ -85,58 +85,53 @@ public class AutoCollectJobs {
int pageIndex = 0; int pageIndex = 0;
int pageSize = 1000; int pageSize = 1000;
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
while (true) { List<Map<String, Object>> resultList = null;
List<Map<String, Object>> resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++ * pageSize) + ", " + pageSize); while (Objects.isNull(resultList) || resultList.size() >= pageSize) {
resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++ * pageSize) + ", " + pageSize);
if (ObjectUtils.isEmpty(resultList)) { if (ObjectUtils.isEmpty(resultList)) {
break; continue;
} }
this.collectThreadPool.execute(() -> { resultList.stream()
this.controller.collect(resultList.stream() .filter(item -> {
.filter(item -> { String gatewaySn = StringUtils.trim(MapUtil.getStr(item, "identifier"));
String gatewaySn = StringUtils.trim(MapUtil.getStr(item, "identifier")); String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn);
String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn); if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup()
if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup() .get(gatewayIdentifier))) {
.get(gatewayIdentifier))) { item.put("identifier", gatewayIdentifier);
item.put("identifier", gatewayIdentifier); return true;
return true; }
log.warn("Gateway {} is disconnected and does not collect data", gatewaySn);
return false;
})
.collect(Collectors.groupingBy(item -> StringUtils.trim(MapUtil.getStr(item, "deviceId"))))
.forEach((deviceId, commandList) -> {
Map<String, List<Map<String, Object>>> refTypeCommandListMap = commandList.stream()
.collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType")));
List<Map<String, Object>> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE);
if (ObjectUtils.isEmpty(deviceCommandList)) {
deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE);
}
if (ObjectUtils.isNotEmpty(deviceCommandList)) {
List<ModbusCommandDto> collectCommondList = deviceCommandList.stream()
.map(item -> {
String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier"));
return ModbusCommandDto.builder()
.command(StringUtils.trim(MapUtil.getStr(item, "command")))
.identifier(identifier)
.length(MapUtil.getInt(item, "messageLength"))
.type(CommandTypeComparable.CommandType.COLLECTION)
.key(StringUtils.joinWith("/", identifier,
StringUtils.trim(MapUtil.getStr(item, "deviceId")),
StringUtils.trim(MapUtil.getStr(item, "commandId")),
timestamp))
.build();
})
.collect(Collectors.toList());
if (ObjectUtils.isNotEmpty(collectCommondList)) {
this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList));
} }
log.warn("Gateway {} is disconnected and does not collect data", gatewaySn); }
return false; });
})
.collect(Collectors.groupingBy(item -> StringUtils.trim(MapUtil.getStr(item, "deviceId"))))
.entrySet()
.stream()
.flatMap(entry -> {
List<Map<String, Object>> commandList = entry.getValue();
Map<String, List<Map<String, Object>>> refTypeCommandListMap = commandList.stream()
.collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType")));
List<Map<String, Object>> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE);
if (ObjectUtils.isEmpty(deviceCommandList)) {
deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE);
}
if (ObjectUtils.isNotEmpty(deviceCommandList)) {
return deviceCommandList.stream()
.map(item -> {
String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier"));
return AutoCollectionDto.builder()
.command(StringUtils.trim(MapUtil.getStr(item, "command")))
.identifier(identifier)
.length(MapUtil.getInt(item, "messageLength"))
.type(CommandTypeComparable.CommandType.COLLECTION)
.key(StringUtils.joinWith("/", identifier,
StringUtils.trim(MapUtil.getStr(item, "deviceId")),
StringUtils.trim(MapUtil.getStr(item, "commandId")),
timestamp))
.build();
});
}
return Stream.empty();
})
.collect(Collectors.toList()));
});
if (resultList.size() < pageSize) {
break;
}
} }
} }
} }

View File

@ -72,13 +72,6 @@ public class ChannelGroup {
.collect(Collectors.toSet()); .collect(Collectors.toSet());
} }
public Long onlineTotal() {
return this.mChannelMap.keySet()
.stream()
.filter(identifier -> !StringUtils.contains(identifier, ":"))
.count();
}
public void see() { public void see() {
log.info("当前连接有:{}", new ArrayList<>(this.mChannelMap.keySet())); log.info("当前连接有:{}", new ArrayList<>(this.mChannelMap.keySet()));
} }

View File

@ -34,7 +34,7 @@ public class ModbusDecoder extends ByteToMessageDecoder {
buffer.readBytes(b); buffer.readBytes(b);
String hex = HexUtil.bytesToHexString(b); String hex = HexUtil.bytesToHexString(b);
log.debug("解析到:{}", hex); log.info("解析到:{}", hex);
String currentAddress = ChannelGroup.getKey(ip, port); String currentAddress = ChannelGroup.getKey(ip, port);
if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳 if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳
@ -67,7 +67,7 @@ public class ModbusDecoder extends ByteToMessageDecoder {
ModbusCommandDto message = channel.getCurrentMessage(); ModbusCommandDto message = channel.getCurrentMessage();
if (message == null) { if (message == null) {
log.warn("非法传输:{},原报文:{}IP{}:{}", HexUtil.bytesToHexString(b), new String(b), ip, port); log.warn("非法传输:{},原报文:{}", HexUtil.bytesToHexString(b), new String(b));
return; return;
} }

View File

@ -35,7 +35,7 @@ public class ModbusSender {
throw new RuntimeException("设备" + identity + "的连接未配置或该设备已断开与服务器的连接,请稍后再试或联系管理员"); throw new RuntimeException("设备" + identity + "的连接未配置或该设备已断开与服务器的连接,请稍后再试或联系管理员");
} }
log.debug("{}通道({})提交{}条命令:{}", this.equipmentIPProperties.get(identity), identity, v.size(), v); log.info("{}通道({})提交{}条命令:{}", this.equipmentIPProperties.get(identity), identity, v.size(), v);
channel.addMessages(v); channel.addMessages(v);
}); });
} }

View File

@ -1,9 +1,5 @@
package com.iot.modbus_rtcp.netty; package com.iot.modbus_rtcp.netty;
//import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
import com.iot.modbus_rtcp.utils.SpringUtil;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
@ -77,7 +73,6 @@ public class NettyServer extends ChannelInitializer<SocketChannel> {
String ip = ch.remoteAddress().getHostString(); String ip = ch.remoteAddress().getHostString();
//过滤掉docker 网关请求 //过滤掉docker 网关请求
if ("172.17.0.1".equals(ip)) return; if ("172.17.0.1".equals(ip)) return;
SpringUtil.getBean(LicenseValidator.class).verifyNumber(ch::close);
int port = ch.remoteAddress().getPort(); int port = ch.remoteAddress().getPort();
this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch)); this.group.put(ChannelGroup.getKey(ip, port), new SyncPriorityChannel(ch));

View File

@ -39,10 +39,10 @@ public class SyncHandler extends ChannelInboundHandlerAdapter {
try { try {
channel.getChannelPromise().setSuccess(); channel.getChannelPromise().setSuccess();
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
log.warn("数据返回慢了,已经超时!"); log.info("数据返回慢了,已经超时!");
return; return;
} catch (Exception e) { } catch (Exception e) {
log.warn("丢失数据:{}", msg); log.info("丢失数据:{}", msg);
return; return;
} }
@ -64,7 +64,7 @@ public class SyncHandler extends ChannelInboundHandlerAdapter {
} }
String json = message.getKey() + "/" + System.currentTimeMillis() + "/" + msg; String json = message.getKey() + "/" + System.currentTimeMillis() + "/" + msg;
log.debug("推数据到MQ({}): {}", channel.getCollectQueue(), json); log.info("推数据到MQ({}): {}", channel.getCollectQueue(), json);
try { try {
this.rabbitTemplate.convertAndSend(channel.getCollectQueue(), json); this.rabbitTemplate.convertAndSend(channel.getCollectQueue(), json);
} catch (Exception e) { } catch (Exception e) {
@ -80,7 +80,7 @@ public class SyncHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.warn("异常: ", cause); log.info("异常: ", cause);
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String ipKey = ChannelGroup.getKey(remoteAddress.getHostString(), remoteAddress.getPort()); String ipKey = ChannelGroup.getKey(remoteAddress.getHostString(), remoteAddress.getPort());
this.channelGroup.remove(ipKey); this.channelGroup.remove(ipKey);

View File

@ -1,7 +1,5 @@
package com.iot.modbus_rtcp.netty; package com.iot.modbus_rtcp.netty;
import com.iot.modbus.rtcp.commons.keymgr.LicenseValidator;
import com.iot.modbus_rtcp.dto.AutoCollectionDto;
import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import com.iot.modbus_rtcp.utils.HexUtil; import com.iot.modbus_rtcp.utils.HexUtil;
@ -10,7 +8,6 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
@ -21,7 +18,6 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/** /**
* 同步控制命令优先Channel * 同步控制命令优先Channel
@ -36,7 +32,6 @@ public class SyncPriorityChannel implements Runnable {
private static final int allowRetryCount = 1; private static final int allowRetryCount = 1;
private static final boolean isControl = true; private static final boolean isControl = true;
private final AtomicBoolean close = new AtomicBoolean(Boolean.FALSE); private final AtomicBoolean close = new AtomicBoolean(Boolean.FALSE);
private final AtomicBoolean running = new AtomicBoolean(Boolean.FALSE);
private final SocketChannel channel; private final SocketChannel channel;
@ -45,7 +40,9 @@ public class SyncPriorityChannel implements Runnable {
private volatile String identifier; private volatile String identifier;
@Getter @Getter
private volatile String collectQueue; private volatile String collectQueue;
private volatile boolean running = false;
private final Object object = new Object();
private final PriorityBlockingQueue<ModbusCommandDto> messageQueue = new PriorityBlockingQueue<>(); private final PriorityBlockingQueue<ModbusCommandDto> messageQueue = new PriorityBlockingQueue<>();
private final AtomicReference<ChannelPromise> channelPromiseReference = new AtomicReference<>(); private final AtomicReference<ChannelPromise> channelPromiseReference = new AtomicReference<>();
private final AtomicReference<ModbusCommandDto> currentMessageReference = new AtomicReference<>(); private final AtomicReference<ModbusCommandDto> currentMessageReference = new AtomicReference<>();
@ -55,15 +52,13 @@ public class SyncPriorityChannel implements Runnable {
} }
public void addMessages(List<ModbusCommandDto> messages) { public void addMessages(List<ModbusCommandDto> messages) {
this.cleanAutoCollection(messages);
this.messageQueue.addAll(messages); this.messageQueue.addAll(messages);
synchronized (this.running) { synchronized (this.object) {
if (this.running.getAcquire()) { if (this.running) {
return; return;
} else { } else {
this.running.setRelease(Boolean.TRUE); this.running = true;
} }
} }
@ -85,7 +80,7 @@ public class SyncPriorityChannel implements Runnable {
// 消费完毕终止递归发送 // 消费完毕终止递归发送
if (Objects.isNull(this.getCurrentMessage())) { if (Objects.isNull(this.getCurrentMessage())) {
this.running.setRelease(Boolean.FALSE); this.running = false;
return; return;
} }
@ -95,21 +90,12 @@ public class SyncPriorityChannel implements Runnable {
this.channelPromiseReference.setRelease(this.channel.newPromise()); this.channelPromiseReference.setRelease(this.channel.newPromise());
if (isControl || Objects.equals(CommandTypeComparable.CommandType.COLLECTION, this.getCurrentMessage().getType())) { if (isControl || Objects.equals(CommandTypeComparable.CommandType.COLLECTION, this.getCurrentMessage().getType())) {
if (Objects.equals(CommandTypeComparable.CommandType.CONTROL, this.getCurrentMessage().getType())) { log.info("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
log.info("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送", this.getCurrentMessage().getCommand(),
this.getCurrentMessage().getCommand(), this.getCurrentMessage().getLength(),
this.getCurrentMessage().getLength(), IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)), ChannelGroup.getKey(ip, port),
ChannelGroup.getKey(ip, port), this.messageQueue.size());
this.messageQueue.size());
} else {
log.debug("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
this.getCurrentMessage().getCommand(),
this.getCurrentMessage().getLength(),
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
ChannelGroup.getKey(ip, port),
this.messageQueue.size());
}
this.channel.writeAndFlush(this.getCurrentMessage().getCommand()); this.channel.writeAndFlush(this.getCurrentMessage().getCommand());
} }
@ -126,12 +112,12 @@ public class SyncPriorityChannel implements Runnable {
} }
if (timeout && this.timeoutRetryCount == (allowRetryCount - 1)) { if (timeout && this.timeoutRetryCount == (allowRetryCount - 1)) {
log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送,Command:{}", log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
this.getCurrentMessage().getCommand(), this.getCurrentMessage().getCommand(),
this.getCurrentMessage().getLength(), this.getCurrentMessage().getLength(),
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)), IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
ChannelGroup.getKey(ip, port), ChannelGroup.getKey(ip, port),
this.messageQueue.size(), this.getCurrentMessage()); this.messageQueue.size());
} }
if (timeout && this.timeoutRetryCount < (allowRetryCount - 1)) { if (timeout && this.timeoutRetryCount < (allowRetryCount - 1)) {
@ -140,7 +126,11 @@ public class SyncPriorityChannel implements Runnable {
this.timeoutRetryCount = 0; this.timeoutRetryCount = 0;
} }
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10)); try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.sendNext(); this.sendNext();
} }
@ -160,14 +150,8 @@ public class SyncPriorityChannel implements Runnable {
} }
public void setIdentifier(String identifier) { public void setIdentifier(String identifier) {
if (StringUtils.isBlank(this.identifier)) { this.identifier = identifier;
this.identifier = identifier; this.setCollectQueue();
this.setCollectQueue();
} else if (!StringUtils.equals(this.identifier, identifier)) {
throw new RuntimeException("多个设备对应同一心跳包,当前已绑定心跳信息:"
+ new String(HexUtil.hexStringToBytes(identifier)) + "[" + identifier + "] --> "
+ this.channel.remoteAddress().getHostString() + ":" + this.channel.remoteAddress().getPort());
}
} }
public void setCollectQueue() { public void setCollectQueue() {
@ -175,11 +159,4 @@ public class SyncPriorityChannel implements Runnable {
SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(this.collectQueue).build()); SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(this.collectQueue).build());
} }
private void cleanAutoCollection(List<ModbusCommandDto> messages) {
SpringUtil.getBean(LicenseValidator.class).verify();
if (messages.stream().anyMatch(c -> c instanceof AutoCollectionDto)) {
this.messageQueue.removeIf(command -> command instanceof AutoCollectionDto);
}
}
} }

View File

@ -9,21 +9,13 @@ public class ThreadPoolConsumer {
static { static {
mThreadPoolExecutor = new ThreadPoolTaskExecutor(); mThreadPoolExecutor = new ThreadPoolTaskExecutor();
mThreadPoolExecutor.setQueueCapacity(5000); mThreadPoolExecutor.setCorePoolSize(4);
mThreadPoolExecutor.setMaxPoolSize(8);
mThreadPoolExecutor.setKeepAliveSeconds(60); mThreadPoolExecutor.setKeepAliveSeconds(60);
mThreadPoolExecutor.setThreadPriority(Thread.MAX_PRIORITY);
mThreadPoolExecutor.setThreadNamePrefix("Modbus-command-sender-thread-");
mThreadPoolExecutor.setCorePoolSize(Math.min(50, Runtime.getRuntime().availableProcessors() - 1));
mThreadPoolExecutor.setMaxPoolSize(Math.min(500, Runtime.getRuntime().availableProcessors() * 10));
mThreadPoolExecutor.initialize(); mThreadPoolExecutor.initialize();
} }
public static void submit(Runnable task) { public static void submit(Runnable task) {
mThreadPoolExecutor.submit(task); mThreadPoolExecutor.submit(task);
} }
public static void shutdown() {
mThreadPoolExecutor.shutdown();
}
} }

View File

@ -1,13 +0,0 @@
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://${MYSQL_HOST:172.17.0.1}:${MYSQL_PORT:3306}/gas_well_watch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
username: ${MYSQL_USERNAME:root}
password: ${MYSQL_USERNAME:@Root123}
rabbitmq:
host: ${RABBIT_MQ_HOST:172.17.0.1}
port: ${RABBIT_MQ_PORT:5672}
username: ${RABBIT_MQ_USERNAME:huaxia}
password: ${RABBIT_MQ_PASSWORD:huaxia@2024}
virtual-host: /gas

View File

@ -11,7 +11,7 @@
<!-- %highlight():转换说明符以粗体红色显示其级别为ERROR的事件红色为WARNBLUE为INFO以及其他级别的默认颜色。 --> <!-- %highlight():转换说明符以粗体红色显示其级别为ERROR的事件红色为WARNBLUE为INFO以及其他级别的默认颜色。 -->
<property name="LOG_PATTERN" <property name="LOG_PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) --- [%15.15(%thread)] [%X{traceId}] %cyan(%-40.40(%logger{40})) : %msg%n"/> value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) --- [%15.15(%thread)] [%X{traceId}] %cyan(%-40.40(%logger{40})) : %msg%n"/>
<property name="LOG_HOME" value="./logs"/> <property name="LOG_HOME" value="/Users/snow/log/venta/mtcp"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!--<immediateFlush>true</immediateFlush>--> <!--<immediateFlush>true</immediateFlush>-->
<encoder> <encoder>
@ -53,7 +53,8 @@
<charset>UTF-8</charset> <charset>UTF-8</charset>
</encoder> </encoder>
</appender> </appender>
<root level="INFO"> <root level="INFO">
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>
<appender-ref ref="LOCAL_INFO_LOG"/> <appender-ref ref="LOCAL_INFO_LOG"/>

Binary file not shown.