调整采集能力,支持按设备品牌采集数据

This commit is contained in:
wangshilong 2024-11-26 13:39:36 +08:00
parent 47041ca901
commit 8107bfdcda
1 changed files with 69 additions and 23 deletions

View File

@ -5,6 +5,7 @@ import com.iot.modbus_rtcp.controller.ModbusTCPController;
import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.core.JdbcTemplate;
@ -25,13 +26,41 @@ import java.util.stream.Collectors;
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 18:33
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AutoCollectJobs {
private static final String SQL = "SELECT c.id AS commandId,c.ref_id AS deviceId,c.command, " +
"c.message_length messageLength,d.gateway_sn AS identifier FROM commands c JOIN device d " +
"ON c.ref_id = d.id WHERE c.type = 'COLLECTION' AND c.ref_type = 'DEVICE' ORDER BY c.id ";
public static final String COMMAND_REF_TYPE_DEVICE = "DEVICE";
public static final String COMMAND_REF_TYPE_PRODUCT_CODE = "DEVICE_PRODUCT_CODE";
private static final String SQL = """
SELECT * FROM (
SELECT
c.id AS commandId,
d.id AS deviceId,
c.ref_type as refType,
c.command,
c.message_length messageLength,
d.gateway_sn AS identifier
FROM
commands c JOIN device d ON c.ref_id = d.id
WHERE
c.type = 'COLLECTION' AND c.ref_type = 'DEVICE'
UNION ALL
SELECT
c.id AS commandId,
d.id AS deviceId,
c.ref_type as refType,
c.command,
c.message_length messageLength,
d.gateway_sn AS identifier
FROM
commands c JOIN device d ON c.ref_id = d.product
WHERE
c.type = 'COLLECTION' AND c.ref_type = 'DEVICE_PRODUCT_CODE'
) a ORDER BY a.deviceId, a.commandId
""";
private final JdbcTemplate jdbcTemplate;
private final ModbusTCPController controller;
@ -42,7 +71,7 @@ public class AutoCollectJobs {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Modbus-auto-collect-thread-" + index.getAndIncrement());
Thread thread = new Thread(runnable, "Modbus-auto-collect-thread-" + this.index.getAndIncrement());
thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY);
return thread;
@ -60,9 +89,25 @@ public class AutoCollectJobs {
if (ObjectUtils.isEmpty(resultList)) {
continue;
}
List<ModbusCommandDto> collectCommondList = resultList.stream()
.filter(item -> Objects.nonNull(ModbusTCPController.nettyServer.getGroup()
.get((MapUtil.getStr(item, "identifier")))))
resultList.stream()
.filter(item -> {
String identifier = MapUtil.getStr(item, "identifier");
if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup().get(identifier))) {
return true;
}
log.warn("Gateway {} is disconnected and does not collect data", identifier);
return false;
})
.collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "deviceId")))
.forEach((deviceId, commandList) -> {
Map<String, List<Map<String, Object>>> refTypeCommandListMap = commandList.stream()
.collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType")));
List<Map<String, Object>> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE);
if (ObjectUtils.isEmpty(deviceCommandList)) {
deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE);
}
if (ObjectUtils.isNotEmpty(deviceCommandList)) {
List<ModbusCommandDto> collectCommondList = deviceCommandList.stream()
.map(item -> {
String identifier = MapUtil.getStr(item, "identifier");
return ModbusCommandDto.builder()
@ -77,10 +122,11 @@ public class AutoCollectJobs {
.build();
})
.collect(Collectors.toList());
if (ObjectUtils.isEmpty(collectCommondList)) {
continue;
}
if (ObjectUtils.isNotEmpty(collectCommondList)) {
this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList));
}
}
});
}
}
}