From d23f325579736ae57aa6fb35bffa0d4c9266c184 Mon Sep 17 00:00:00 2001 From: wangshilong Date: Tue, 10 Dec 2024 20:34:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=8C=E6=99=9F=E6=8C=87=E4=BB=A4=E8=A7=A3?= =?UTF-8?q?=E6=9E=90=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../data/Redis2DBPersistenceService.java | 3 + .../ModbusMessagePersistListener.java | 164 +++++++++--------- .../vo/command/scss/CasingMode.java | 36 +++- .../vo/command/scss/SamplingInterval.java | 4 +- .../vo/command/scss/TimeMode.java | 19 +- 5 files changed, 139 insertions(+), 87 deletions(-) diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java index c17f42c..f20b2fc 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java @@ -72,6 +72,9 @@ public class Redis2DBPersistenceService { if (Objects.isNull(deviceId)) { continue; } + if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) { + continue; + } if (Objects.nonNull(idGatewayMappingMap)) { operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", String.valueOf(idGatewayMappingMap.containsKey(deviceId))); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java index b3d3250..a1e2080 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java @@ -63,99 +63,103 @@ public class ModbusMessagePersistListener implements BatchMessageListener { Long commandId; String[] messageSplit; Map commandMap; - String messageString, collectionMessage, commandString; + String messageString = null, collectionMessage, commandString; Map messagePointMap; // 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体 // 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D0000 for (Message message : messages) { - messageString = new String(message.getBody()); - messageSplit = StringUtils.split(messageString, "/"); - if (messageSplit.length < 6) { - log.error("非法数据: {}", messageString); - continue; - } - - messagePointMap = new HashMap<>(); - collectionMessage = messageSplit[5]; - commandId = NumberUtils.createLong(messageSplit[2]); - commandString = Objects.toString(this.redisTemplate.opsForValue().get(PersistenceHandler.COMMAND_CACHE + commandId), ""); - commandMap = StringUtils.isNotEmpty(commandString) ? JSONUtil.parseObj(commandString) : null; - if (ObjectUtils.isEmpty(commandMap)) { - commandMap = this.jdbcTemplate.queryForMap(COMMAND_SQL, commandId); - if (ObjectUtils.isEmpty(commandMap)) { - log.error("指令[{}]不存在,数据: {}", commandId, messageString); + try { + messageString = new String(message.getBody()); + messageSplit = StringUtils.split(messageString, "/"); + if (messageSplit.length < 6) { + log.error("非法数据: {}", messageString); continue; } - this.redisTemplate.opsForValue().setIfAbsent(PersistenceHandler.COMMAND_CACHE + commandId, JSONUtil.toJsonStr(commandMap), Duration.ofDays(1)); - } - try { - String address; - int startAddress = MapUtil.getInt(commandMap, "start_address"), index = 0, stepSize = 0; - ByteQueue byteQueue = new ByteQueue(collectionMessage); - RtuMessageParser masterParser = new RtuMessageParser(true); - RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue); - ModbusResponse modbusResponse = response.getModbusResponse(); - byte[] byteValues = null; - short[] shortValues = null; - switch (modbusResponse.getFunctionCode()) { - case FunctionCode.READ_COILS -> { - byteValues = ((ReadCoilsResponse) modbusResponse).getData(); - } - case FunctionCode.READ_DISCRETE_INPUTS -> { - byteValues = ((ReadDiscreteInputsResponse) modbusResponse).getData(); - } - case FunctionCode.READ_INPUT_REGISTERS -> { - shortValues = ((ReadInputRegistersResponse) modbusResponse).getShortData(); - } - case FunctionCode.READ_HOLDING_REGISTERS -> { - shortValues = ((ReadHoldingRegistersResponse) modbusResponse).getShortData(); - } - case FunctionCode.READ_EXCEPTION_STATUS -> { - shortValues = new short[]{((ReadExceptionStatusResponse) modbusResponse).getExceptionStatus()}; - } - default -> { - throw new RuntimeException("Funcetion code not supported: " + modbusResponse.getFunctionCode()); + messagePointMap = new HashMap<>(); + collectionMessage = messageSplit[5]; + commandId = NumberUtils.createLong(messageSplit[2]); + commandString = Objects.toString(this.redisTemplate.opsForValue().get(PersistenceHandler.COMMAND_CACHE + commandId), ""); + commandMap = StringUtils.isNotEmpty(commandString) ? JSONUtil.parseObj(commandString) : null; + if (ObjectUtils.isEmpty(commandMap)) { + commandMap = this.jdbcTemplate.queryForMap(COMMAND_SQL, commandId); + if (ObjectUtils.isEmpty(commandMap)) { + log.error("指令[{}]不存在,数据: {}", commandId, messageString); + continue; } + this.redisTemplate.opsForValue().setIfAbsent(PersistenceHandler.COMMAND_CACHE + commandId, JSONUtil.toJsonStr(commandMap), Duration.ofDays(1)); } - if (Objects.nonNull(shortValues)) { - for (short value : shortValues) { - stepSize = index * 4; - messagePointMap.put(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'), - ModbusMessage.MessagePoint.builder() - .parseValue(String.valueOf(value)) - .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize)) - .build()); - index++; - startAddress++; + try { + String address; + int startAddress = MapUtil.getInt(commandMap, "start_address"), index = 0, stepSize = 0; + ByteQueue byteQueue = new ByteQueue(collectionMessage); + RtuMessageParser masterParser = new RtuMessageParser(true); + RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue); + ModbusResponse modbusResponse = response.getModbusResponse(); + + byte[] byteValues = null; + short[] shortValues = null; + switch (modbusResponse.getFunctionCode()) { + case FunctionCode.READ_COILS -> { + byteValues = ((ReadCoilsResponse) modbusResponse).getData(); + } + case FunctionCode.READ_DISCRETE_INPUTS -> { + byteValues = ((ReadDiscreteInputsResponse) modbusResponse).getData(); + } + case FunctionCode.READ_INPUT_REGISTERS -> { + shortValues = ((ReadInputRegistersResponse) modbusResponse).getShortData(); + } + case FunctionCode.READ_HOLDING_REGISTERS -> { + shortValues = ((ReadHoldingRegistersResponse) modbusResponse).getShortData(); + } + case FunctionCode.READ_EXCEPTION_STATUS -> { + shortValues = new short[]{((ReadExceptionStatusResponse) modbusResponse).getExceptionStatus()}; + } + default -> { + throw new RuntimeException("Funcetion code not supported: " + modbusResponse.getFunctionCode()); + } } - } else if (Objects.nonNull(byteValues)) { - for (byte value : byteValues) { - stepSize = index * 2; - messagePointMap.put(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'), - ModbusMessage.MessagePoint.builder() - .parseValue(String.valueOf(value)) - .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 8 + stepSize)) - .build()); - index++; - startAddress++; + if (Objects.nonNull(shortValues)) { + for (short value : shortValues) { + stepSize = index * 4; + messagePointMap.put(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'), + ModbusMessage.MessagePoint.builder() + .parseValue(String.valueOf(value)) + .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize)) + .build()); + index++; + startAddress++; + } + } else if (Objects.nonNull(byteValues)) { + for (byte value : byteValues) { + stepSize = index * 2; + messagePointMap.put(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'), + ModbusMessage.MessagePoint.builder() + .parseValue(String.valueOf(value)) + .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 8 + stepSize)) + .build()); + index++; + startAddress++; + } } + } catch (Exception e) { + log.error("初始数据解析异常: {}", messageString, e); + continue; + } + if (ObjectUtils.isNotEmpty(messagePointMap)) { + decode(ModbusMessage.builder() + .commandId(commandId) + .message(collectionMessage) + .messagePointMap(messagePointMap) + .gatewayIdentifier(messageSplit[0]) + .deviceId(NumberUtils.createLong(messageSplit[1])) + .receiveTime(NumberUtils.createLong(messageSplit[4])) + .collectionTime(NumberUtils.createLong(messageSplit[3])) + .queueName(message.getMessageProperties().getConsumerQueue()) + .build()); } } catch (Exception e) { - log.error("初始数据解析异常: {}", messageString, e); - continue; - } - if (ObjectUtils.isNotEmpty(messagePointMap)) { - decode(ModbusMessage.builder() - .commandId(commandId) - .message(collectionMessage) - .messagePointMap(messagePointMap) - .gatewayIdentifier(messageSplit[0]) - .deviceId(NumberUtils.createLong(messageSplit[1])) - .receiveTime(NumberUtils.createLong(messageSplit[4])) - .collectionTime(NumberUtils.createLong(messageSplit[3])) - .queueName(message.getMessageProperties().getConsumerQueue()) - .build()); + log.error("初始数据解析异常: {}, {}", message.getMessageProperties().getConsumerQueue(), messageString, e); } } } diff --git a/src/main/java/com/isu/gaswellwatch/vo/command/scss/CasingMode.java b/src/main/java/com/isu/gaswellwatch/vo/command/scss/CasingMode.java index 1edaffc..bd9ad67 100644 --- a/src/main/java/com/isu/gaswellwatch/vo/command/scss/CasingMode.java +++ b/src/main/java/com/isu/gaswellwatch/vo/command/scss/CasingMode.java @@ -8,10 +8,13 @@ import jakarta.validation.constraints.NotNull; import lombok.Getter; import lombok.Setter; import lombok.experimental.SuperBuilder; +import org.apache.commons.lang3.StringUtils; import java.io.Serial; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; /** * 套压模式 @@ -27,21 +30,25 @@ public class CasingMode extends Command implements Timing { private static final long serialVersionUID = -6609483152571838191L; /** * 开井套压 + * 152 */ @NotNull(message = "开井套压不能为空") private BigDecimal wellOpenPressureValue = BigDecimal.ZERO; /** * 关井套压 + * 160 */ @NotNull(message = "关井套压不能为空") private BigDecimal wellClosePressureValue = BigDecimal.ZERO; /** * 最小关井时间 + * 166 */ @NotBlank(message = "最小关井时间不能为空,格式:HH:mm:ss") private String minWellCloseTimeDuration; /** * 最大关井时间 + * 168 */ @NotBlank(message = "最大关井时间不能为空,格式:HH:mm:ss") private String maxWellCloseTimeDuration; @@ -53,6 +60,33 @@ public class CasingMode extends Command implements Timing { @Override protected Collection builderModbusCommand() { - return null; + List resultList = new ArrayList(3); + // 地址码 功能码 起始地址 连续长度 连续字长 控制模式 + // 01 10 0032 0002 04 6(00000006, 6:套压(开)-套压(关)模式) + resultList.add(ModbusCommandDto.builder().command("0110003200020400000006").length(16).build()); + + StringBuilder command = new StringBuilder(250); + // 地址码 功能码 起始地址 连续长度 连续字长 + // 01 10 0098 0002 04 + command.append("01100098000204"); + command.append(StringUtils.leftPad(Long.toHexString(this.wellOpenPressureValue.multiply(ONE_HUNDRED).intValue()), 8, "0")); + resultList.add(ModbusCommandDto.builder().command(command.toString()).length(16).build()); + + command.setLength(0); + // 地址码 功能码 起始地址 连续长度 连续字长 + // 01 10 00A0 0002 04 + command.append("011000A0000204"); + command.append(StringUtils.leftPad(Long.toHexString(this.wellClosePressureValue.multiply(ONE_HUNDRED).intValue()), 8, "0")); + resultList.add(ModbusCommandDto.builder().command(command.toString()).length(16).build()); + + command.setLength(0); + // 地址码 功能码 起始地址 连续长度 连续字长 + // 01 10 00A6 0004 08 + command.append("011000A6000204"); + command.append(StringUtils.leftPad(Long.toHexString(this.toSeconds(this.minWellCloseTimeDuration)), 8, "0")); + command.append(StringUtils.leftPad(Long.toHexString(this.toSeconds(this.maxWellCloseTimeDuration)), 8, "0")); + resultList.add(ModbusCommandDto.builder().command(command.toString()).length(16).build()); + + return resultList; } } \ No newline at end of file diff --git a/src/main/java/com/isu/gaswellwatch/vo/command/scss/SamplingInterval.java b/src/main/java/com/isu/gaswellwatch/vo/command/scss/SamplingInterval.java index 205b296..fdcfb06 100644 --- a/src/main/java/com/isu/gaswellwatch/vo/command/scss/SamplingInterval.java +++ b/src/main/java/com/isu/gaswellwatch/vo/command/scss/SamplingInterval.java @@ -50,8 +50,8 @@ public class SamplingInterval extends Command { protected Collection builderModbusCommand() { StringBuilder command = new StringBuilder(250); // 地址码 功能码 起始地址 连续长度 连续字长 - // 01 10 0055 0004 08 - command.append("01100055000408"); + // 01 10 0054 0004 08 + command.append("01100054000408"); command.append(StringUtils.leftPad(Integer.toHexString(this.continuousSamplingIntervalDuration), 8, "0")); command.append(StringUtils.leftPad(Integer.toHexString(this.sensorSignalEffectiveLevel.multiply(ONE_HUNDRED).intValue()), 8, "0")); return List.of(ModbusCommandDto.builder().command(command.toString()).length(16).build()); diff --git a/src/main/java/com/isu/gaswellwatch/vo/command/scss/TimeMode.java b/src/main/java/com/isu/gaswellwatch/vo/command/scss/TimeMode.java index b9b7d47..e8dd84c 100644 --- a/src/main/java/com/isu/gaswellwatch/vo/command/scss/TimeMode.java +++ b/src/main/java/com/isu/gaswellwatch/vo/command/scss/TimeMode.java @@ -44,7 +44,7 @@ public class TimeMode extends Command implements Timing { /** * 柱塞延迟时间 - * 200 2 + * 180 2 */ @NotBlank(message = "柱塞延迟时间不能为空,格式:HH:mm:ss") private String plungerDelayDuration; @@ -64,6 +64,10 @@ public class TimeMode extends Command implements Timing { @Override protected Collection builderModbusCommand() { List resultList = new ArrayList(3); + // 地址码 功能码 起始地址 连续长度 连续字长 控制模式 + // 01 10 0032 0002 04 1(00000001, 1:时间(开)-时间(开)模式) + resultList.add(ModbusCommandDto.builder().command("0110003200020400000001").length(16).build()); + StringBuilder command = new StringBuilder(250); // 地址码 功能码 起始地址 连续长度 连续字长 // 01 10 0096 0002 04 @@ -73,9 +77,16 @@ public class TimeMode extends Command implements Timing { command.setLength(0); // 地址码 功能码 起始地址 连续长度 连续字长 - // 01 10 0096 0002 04 - command.append("01100096000204"); - command.append(StringUtils.leftPad(Long.toHexString(this.toSeconds(this.wellOpenTimeTimestamp)), 8, "0")); + // 01 10 009E 0002 04 + command.append("0110009E000204"); + command.append(StringUtils.leftPad(Long.toHexString(this.toSeconds(this.wellCloseTimeTimestamp)), 8, "0")); + resultList.add(ModbusCommandDto.builder().command(command.toString()).length(16).build()); + + command.setLength(0); + // 地址码 功能码 起始地址 连续长度 连续字长 + // 01 10 00B4 0002 04 + command.append("011000B4000204"); + command.append(StringUtils.leftPad(Long.toHexString(this.toSeconds(this.plungerDelayDuration)), 8, "0")); resultList.add(ModbusCommandDto.builder().command(command.toString()).length(16).build()); return resultList;