调整采集逻辑

This commit is contained in:
wangshilong 2024-12-20 22:57:24 +08:00
parent 392ccee6ba
commit ee03c86167
3 changed files with 91 additions and 48 deletions

View File

@ -36,6 +36,8 @@ public class EquipmentIPProperties {
if (gatewayHeartbeat.length() > 100) { if (gatewayHeartbeat.length() > 100) {
return false; return false;
} }
// 解决心跳包中可能存在的空格符和换行符
gatewayHeartbeat = HexUtil.bytesToHexString(StringUtils.trim(new String(HexUtil.hexStringToBytes(gatewayHeartbeat))).getBytes()).toUpperCase();
// 每分钟从数据库入库一次 // 每分钟从数据库入库一次
if (this.lastSyncEquipment.getAcquire() < System.currentTimeMillis() - 60_000) { if (this.lastSyncEquipment.getAcquire() < System.currentTimeMillis() - 60_000) {
this.jdbcTemplate.queryForList("select gateway_sn from device").forEach(map -> { this.jdbcTemplate.queryForList("select gateway_sn from device").forEach(map -> {

View File

@ -22,6 +22,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* @author <a href="mailto:scwsl@foxmail.com">王仕龙</a> * @author <a href="mailto:scwsl@foxmail.com">王仕龙</a>
@ -66,10 +67,10 @@ public class AutoCollectJobs {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final ModbusTCPController controller; private final ModbusTCPController controller;
private final EquipmentIPProperties equipmentIPProperties; private final EquipmentIPProperties equipmentIPProperties;
private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors()) private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(50, Runtime.getRuntime().availableProcessors())
, Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS, , Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5000), new ThreadFactory() { new ArrayBlockingQueue<>(5000), new ThreadFactory() {
AtomicInteger index = new AtomicInteger(1); final AtomicInteger index = new AtomicInteger(1);
@Override @Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
@ -85,13 +86,13 @@ public class AutoCollectJobs {
int pageIndex = 0; int pageIndex = 0;
int pageSize = 1000; int pageSize = 1000;
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
List<Map<String, Object>> resultList = null; while (true) {
while (Objects.isNull(resultList) || resultList.size() >= pageSize) { List<Map<String, Object>> resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++ * pageSize) + ", " + pageSize);
resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++ * pageSize) + ", " + pageSize);
if (ObjectUtils.isEmpty(resultList)) { if (ObjectUtils.isEmpty(resultList)) {
continue; break;
} }
resultList.stream() this.collectThreadPool.execute(() -> {
this.controller.collect(resultList.stream()
.filter(item -> { .filter(item -> {
String gatewaySn = StringUtils.trim(MapUtil.getStr(item, "identifier")); String gatewaySn = StringUtils.trim(MapUtil.getStr(item, "identifier"));
String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn); String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn);
@ -104,7 +105,10 @@ public class AutoCollectJobs {
return false; return false;
}) })
.collect(Collectors.groupingBy(item -> StringUtils.trim(MapUtil.getStr(item, "deviceId")))) .collect(Collectors.groupingBy(item -> StringUtils.trim(MapUtil.getStr(item, "deviceId"))))
.forEach((deviceId, commandList) -> { .entrySet()
.stream()
.flatMap(entry -> {
List<Map<String, Object>> commandList = entry.getValue();
Map<String, List<Map<String, Object>>> refTypeCommandListMap = commandList.stream() Map<String, List<Map<String, Object>>> refTypeCommandListMap = commandList.stream()
.collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType"))); .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType")));
List<Map<String, Object>> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE); List<Map<String, Object>> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE);
@ -112,7 +116,7 @@ public class AutoCollectJobs {
deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE); deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE);
} }
if (ObjectUtils.isNotEmpty(deviceCommandList)) { if (ObjectUtils.isNotEmpty(deviceCommandList)) {
List<ModbusCommandDto> collectCommondList = deviceCommandList.stream() return deviceCommandList.stream()
.map(item -> { .map(item -> {
String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier")); String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier"));
return ModbusCommandDto.builder() return ModbusCommandDto.builder()
@ -125,13 +129,43 @@ public class AutoCollectJobs {
StringUtils.trim(MapUtil.getStr(item, "commandId")), StringUtils.trim(MapUtil.getStr(item, "commandId")),
timestamp)) timestamp))
.build(); .build();
})
.collect(Collectors.toList());
if (ObjectUtils.isNotEmpty(collectCommondList)) {
this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList));
}
}
}); });
} }
return Stream.empty();
})
.collect(Collectors.toList()));
// .forEach((deviceId, commandList) -> {
// Map<String, List<Map<String, Object>>> refTypeCommandListMap = commandList.stream()
// .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType")));
// List<Map<String, Object>> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE);
// if (ObjectUtils.isEmpty(deviceCommandList)) {
// deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE);
// }
// if (ObjectUtils.isNotEmpty(deviceCommandList)) {
// List<ModbusCommandDto> collectCommondList = deviceCommandList.stream()
// .map(item -> {
// String identifier = StringUtils.trim(MapUtil.getStr(item, "identifier"));
// return ModbusCommandDto.builder()
// .command(StringUtils.trim(MapUtil.getStr(item, "command")))
// .identifier(identifier)
// .length(MapUtil.getInt(item, "messageLength"))
// .type(CommandTypeComparable.CommandType.COLLECTION)
// .key(StringUtils.joinWith("/", identifier,
// StringUtils.trim(MapUtil.getStr(item, "deviceId")),
// StringUtils.trim(MapUtil.getStr(item, "commandId")),
// timestamp))
// .build();
// })
// .collect(Collectors.toList());
// if (ObjectUtils.isNotEmpty(collectCommondList)) {
// this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList));
// }
// }
// });
});
if (resultList.size() < pageSize) {
break;
}
}
} }
} }

View File

@ -8,6 +8,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
@ -150,8 +151,14 @@ public class SyncPriorityChannel implements Runnable {
} }
public void setIdentifier(String identifier) { public void setIdentifier(String identifier) {
if (StringUtils.isBlank(this.identifier)) {
this.identifier = identifier; this.identifier = identifier;
this.setCollectQueue(); this.setCollectQueue();
} else if (!StringUtils.equals(this.identifier, identifier)) {
throw new RuntimeException("多个设备对应同一心跳包,当前已绑定心跳信息:"
+ new String(HexUtil.hexStringToBytes(identifier)) + "[" + identifier + "] --> "
+ this.channel.remoteAddress().getHostString() + ":" + this.channel.remoteAddress().getPort());
}
} }
public void setCollectQueue() { public void setCollectQueue() {