diff --git a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java index c6cb129..1aac1c0 100644 --- a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java +++ b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java @@ -5,6 +5,7 @@ import com.iot.modbus_rtcp.controller.ModbusTCPController; import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.ModbusCommandDto; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.jdbc.core.JdbcTemplate; @@ -25,13 +26,41 @@ import java.util.stream.Collectors; * @author ็Ž‹ไป•้พ™ * 2024/11/23 18:33 */ +@Slf4j @Component @RequiredArgsConstructor public class AutoCollectJobs { - private static final String SQL = "SELECT c.id AS commandId,c.ref_id AS deviceId,c.command, " + - "c.message_length messageLength,d.gateway_sn AS identifier FROM commands c JOIN device d " + - "ON c.ref_id = d.id WHERE c.type = 'COLLECTION' AND c.ref_type = 'DEVICE' ORDER BY c.id "; + public static final String COMMAND_REF_TYPE_DEVICE = "DEVICE"; + public static final String COMMAND_REF_TYPE_PRODUCT_CODE = "DEVICE_PRODUCT_CODE"; + + private static final String SQL = """ + SELECT * FROM ( + SELECT + c.id AS commandId, + d.id AS deviceId, + c.ref_type as refType, + c.command, + c.message_length messageLength, + d.gateway_sn AS identifier + FROM + commands c JOIN device d ON c.ref_id = d.id + WHERE + c.type = 'COLLECTION' AND c.ref_type = 'DEVICE' + UNION ALL + SELECT + c.id AS commandId, + d.id AS deviceId, + c.ref_type as refType, + c.command, + c.message_length messageLength, + d.gateway_sn AS identifier + FROM + commands c JOIN device d ON c.ref_id = d.product + WHERE + c.type = 'COLLECTION' AND c.ref_type = 'DEVICE_PRODUCT_CODE' + ) a ORDER BY a.deviceId, a.commandId + """; private final JdbcTemplate jdbcTemplate; private final ModbusTCPController controller; @@ -42,7 +71,7 @@ public class AutoCollectJobs { @Override public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "Modbus-auto-collect-thread-" + index.getAndIncrement()); + Thread thread = new Thread(runnable, "Modbus-auto-collect-thread-" + this.index.getAndIncrement()); thread.setDaemon(true); thread.setPriority(Thread.MIN_PRIORITY); return thread; @@ -60,27 +89,44 @@ public class AutoCollectJobs { if (ObjectUtils.isEmpty(resultList)) { continue; } - List collectCommondList = resultList.stream() - .filter(item -> Objects.nonNull(ModbusTCPController.nettyServer.getGroup() - .get((MapUtil.getStr(item, "identifier"))))) - .map(item -> { + resultList.stream() + .filter(item -> { String identifier = MapUtil.getStr(item, "identifier"); - return ModbusCommandDto.builder() - .command(MapUtil.getStr(item, "command")) - .identifier(identifier) - .length(MapUtil.getInt(item, "messageLength")) - .type(CommandTypeComparable.CommandType.COLLECTION) - .key(StringUtils.joinWith("/", identifier, - MapUtil.getStr(item, "deviceId"), - MapUtil.getStr(item, "commandId"), - timestamp)) - .build(); + if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup().get(identifier))) { + return true; + } + log.warn("Gateway {} is disconnected and does not collect data", identifier); + return false; }) - .collect(Collectors.toList()); - if (ObjectUtils.isEmpty(collectCommondList)) { - continue; - } - this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList)); + .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "deviceId"))) + .forEach((deviceId, commandList) -> { + Map>> refTypeCommandListMap = commandList.stream() + .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "refType"))); + List> deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_DEVICE); + if (ObjectUtils.isEmpty(deviceCommandList)) { + deviceCommandList = refTypeCommandListMap.get(COMMAND_REF_TYPE_PRODUCT_CODE); + } + if (ObjectUtils.isNotEmpty(deviceCommandList)) { + List collectCommondList = deviceCommandList.stream() + .map(item -> { + String identifier = MapUtil.getStr(item, "identifier"); + return ModbusCommandDto.builder() + .command(MapUtil.getStr(item, "command")) + .identifier(identifier) + .length(MapUtil.getInt(item, "messageLength")) + .type(CommandTypeComparable.CommandType.COLLECTION) + .key(StringUtils.joinWith("/", identifier, + MapUtil.getStr(item, "deviceId"), + MapUtil.getStr(item, "commandId"), + timestamp)) + .build(); + }) + .collect(Collectors.toList()); + if (ObjectUtils.isNotEmpty(collectCommondList)) { + this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList)); + } + } + }); } } }