diff --git a/pom.xml b/pom.xml
index 16c02e5..d8ccf78 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,11 @@
org.aspectj
aspectjweaver
+
+ commons-io
+ commons-io
+ 2.18.0
+
org.apache.commons
commons-pool2
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java b/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java
index d10de18..7bbc459 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java
@@ -8,6 +8,7 @@ import lombok.experimental.SuperBuilder;
import java.io.Serial;
import java.io.Serializable;
import java.util.Map;
+import java.util.Objects;
/**
* 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
@@ -63,7 +64,7 @@ public class ModbusMessage implements Serializable {
@Setter
@SuperBuilder
@NoArgsConstructor
- public static class MessagePoint implements Serializable {
+ public static class MessagePoint implements Serializable {
@Serial
private static final long serialVersionUID = -3118301653064611676L;
/**
@@ -73,10 +74,14 @@ public class ModbusMessage implements Serializable {
/**
* 初始解析值
*/
- private T parseValue;
+ private String parseValue;
/**
* 修正解析值
*/
- private T value;
+ private String value;
+
+ public String getValue() {
+ return Objects.isNull(this.value) ? this.parseValue : this.value;
+ }
}
}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java
index 691ce17..3b176ea 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java
@@ -7,5 +7,11 @@ package com.isu.gaswellwatch.modbus.data;
* 2024/11/23 11:53
*/
public interface PersistenceHandler {
+ public static final String MODBUS_DEVICE_TYPE = "KNPCV1";
+ String DEVICE_DATA_CACHE = "data:device:";
+
+ void createTable(String tableName);
+
+ void insert(String tableName, String cacheKey);
}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java
index 98e4454..4d65ef9 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java
@@ -12,9 +12,8 @@ import org.slf4j.LoggerFactory;
*/
public interface DecodeHandler {
- String MODBUS_DEVICE_TYPE_FIELD_NAME = "modbusDeviceType";
Logger logger = LoggerFactory.getLogger("com.isu.gaswellwatch.decode");
- void decode(ModbusMessage modbusMessage);
+ void decode(ModbusMessage.MessagePoint point);
}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java
index 4bf9df7..8c2394b 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java
@@ -3,8 +3,7 @@ package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import jakarta.annotation.Resource;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.data.redis.core.HashOperations;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@@ -15,10 +14,13 @@ import org.springframework.stereotype.Component;
* @author 王仕龙
* 2024/11/23 11:23
*/
-@Component(Knpcv1DecodeHandler.MODBUS_DEVICE_TYPE)
+@Slf4j
+@SuppressWarnings("all")
+@Component(Knpcv1DecodeHandler.MODBUS_DEVICE_TYPE + "_1")
public class Knpcv1DecodeHandler implements DecodeHandler {
public static final String MODBUS_DEVICE_TYPE = "KNPCV1";
+ private static final String COMMAND_POINT_SQL = "select * from command_points where command_id = ?";
@Resource
private JdbcTemplate jdbcTemplate;
@@ -27,12 +29,7 @@ public class Knpcv1DecodeHandler implements DecodeHandler {
private RedisTemplate redisTemplate;
@Override
- public void decode(ModbusMessage modbusMessage) {
-
- HashOperations hashOperations = this.redisTemplate.opsForHash();
- String[] queuePath = StringUtils.split(modbusMessage.getQueueName(), "/");
-
- logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, modbusMessage.getQueueName(), modbusMessage.getMessage());
+ public void decode(ModbusMessage.MessagePoint point) {
}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java
index 8c89723..a821e5a 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java
@@ -1,10 +1,10 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
import cn.hutool.core.map.MapUtil;
+import cn.hutool.json.JSONUtil;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
+import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
-import com.serotonin.modbus4j.code.FunctionCode;
-import com.serotonin.modbus4j.msg.ModbusResponse;
import com.serotonin.modbus4j.msg.ReadInputRegistersResponse;
import com.serotonin.modbus4j.serial.rtu.RtuMessageParser;
import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse;
@@ -17,13 +17,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Message;
+import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.time.Duration;
+import java.util.*;
/**
* @author 王仕龙
@@ -31,9 +30,12 @@ import java.util.Optional;
*/
@Slf4j
@RequiredArgsConstructor
+@SuppressWarnings("all")
public class ModbusMessagePersistListener implements BatchMessageListener {
+ public static final String COMMAND_CACHE = "modbus:command:";
private static final String COMMAND_SQL = "select * from commands where id = ?";
+ private static final String COMMAND_POINT_SQL = "select * from command_points where command_id = ?";
@Resource
private JdbcTemplate jdbcTemplate;
@@ -49,89 +51,137 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
@Override
public void onMessageBatch(List messages) {
+ Long commandId;
+ String[] messageSplit;
+ Map commandMap;
+ String messageString, collectionMessage, commandString;
+ Map messagePointMap;
+ // 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
+ // 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D0000
+ for (Message message : messages) {
+ messageString = new String(message.getBody());
+ messageSplit = StringUtils.split(messageString, "/");
+ if (messageSplit.length < 6) {
+ log.error("非法数据: {}", messageString);
+ continue;
+ }
- try {
- Long commandId;
- String[] messageSplit;
- Map commandMap;
- String messageString, collectionMessage;
- Map messagePointMap;
-// 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
-// 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066
- for (Message message : messages) {
- messageString = new String(message.getBody());
- messageSplit = StringUtils.split(messageString, "/");
- if (messageSplit.length < 6) {
- log.error("非法数据: {}", messageString);
- continue;
- }
-
- collectionMessage = messageSplit[5];
- commandId = NumberUtils.createLong(messageSplit[2]);
+ collectionMessage = messageSplit[5];
+ commandId = NumberUtils.createLong(messageSplit[2]);
+ commandString = Objects.toString(this.redisTemplate.opsForValue().get(COMMAND_CACHE + commandId), "");
+ commandMap = StringUtils.isNotEmpty(commandString) ? JSONUtil.parseObj(commandString) : null;
+ if (ObjectUtils.isEmpty(commandMap)) {
commandMap = this.jdbcTemplate.queryForMap(COMMAND_SQL, commandId);
if (ObjectUtils.isNotEmpty(commandMap)) {
log.error("指令[{}]不存在,数据: {}", commandId, messageString);
continue;
}
+ this.redisTemplate.opsForValue()
+ .setIfAbsent(COMMAND_CACHE + commandId, JSONUtil.toJsonStr(commandMap), Duration.ofDays(1));
+ }
+ DecodeHandler decodeHandler = Optional.ofNullable(MapUtil.getStr(commandMap, "decode_handler_name"))
+ .map(this.decodeHandlerMap::get)
+ .orElse(null);
+ if (Objects.isNull(decodeHandler)) {
+ log.error("指令[{}]未配置解析器,数据: {}", commandId, messageString);
+ continue;
+ }
+ messagePointMap = new HashMap<>();
+ try {
+ String address;
int index = MapUtil.getInt(commandMap, "start_address"), stepSize = 0;
-
- messagePointMap = new HashMap<>();
ByteQueue byteQueue = new ByteQueue(collectionMessage);
RtuMessageParser masterParser = new RtuMessageParser(true);
RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
- ModbusResponse modbusResponse = response.getModbusResponse();
- switch (modbusResponse.getFunctionCode()) {
- case FunctionCode.READ_INPUT_REGISTERS: {
- ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) modbusResponse;
- for (short value : readInputRegistersResponse.getShortData()) {
- stepSize = index * 4;
- messagePointMap.put(StringUtils.leftPad(String.valueOf(index), 4, '0'),
- ModbusMessage.MessagePoint.builder()
- .parseValue(value)
- .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize))
- .build());
- index++;
- }
- }
-// case FunctionCode.READ_COILS: {
-// ReadCoilsResponse readCoilsResponse = (ReadCoilsResponse) modbusResponse;
-// }
-// case FunctionCode.READ_DISCRETE_INPUTS: {
-// ReadDiscreteInputsResponse readDiscreteInputsResponse = (ReadDiscreteInputsResponse) modbusResponse;
-// }
-// case FunctionCode.READ_HOLDING_REGISTERS: {
-// ReadHoldingRegistersResponse readHoldingRegistersResponse = (ReadHoldingRegistersResponse) modbusResponse;
-// }
-// case FunctionCode.READ_EXCEPTION_STATUS: {
-// ReadExceptionStatusResponse readExceptionStatusResponse = (ReadExceptionStatusResponse) modbusResponse;
-// }
-// case FunctionCode.REPORT_SLAVE_ID: {
-// ReportSlaveIdResponse reportSlaveIdResponse = (ReportSlaveIdResponse) modbusResponse;
-// }
- default: {
- log.error("FunctionCode {}, Message {}", modbusResponse.getFunctionCode(), collectionMessage);
- }
+ ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) response.getModbusResponse();
+ for (short value : readInputRegistersResponse.getShortData()) {
+ stepSize = index * 4;
+ messagePointMap.put(StringUtils.leftPad(String.valueOf(index), 4, '0'),
+ ModbusMessage.MessagePoint.builder()
+ .parseValue(String.valueOf(value))
+ .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize))
+ .build());
+ index++;
}
-
-
- Optional.ofNullable(commandMap.get("decode_handler_name"))
- .map(this.decodeHandlerMap::get)
- .ifPresent(decodeHandler -> decodeHandler.decode(ModbusMessage.builder()
- .queueName(message.getMessageProperties().getConsumerQueue())
- .commandId(commandId)
- .message(collectionMessage)
- .gatewayIdentifier(messageSplit[0])
- .deviceId(NumberUtils.createLong(messageSplit[1]))
- .collectionTime(NumberUtils.createLong(messageSplit[3]))
- .receiveTime(NumberUtils.createLong(messageSplit[4]))
- .messagePointMap(messagePointMap)
- .build()));
-
-
+ } catch (Exception e) {
+ log.error("初始数据解析异常: {}", messageString, e);
+ continue;
+ }
+ if (ObjectUtils.isNotEmpty(messagePointMap)) {
+ decode(ModbusMessage.builder()
+ .commandId(commandId)
+ .message(collectionMessage)
+ .messagePointMap(messagePointMap)
+ .gatewayIdentifier(messageSplit[0])
+ .deviceId(NumberUtils.createLong(messageSplit[1]))
+ .receiveTime(NumberUtils.createLong(messageSplit[4]))
+ .collectionTime(NumberUtils.createLong(messageSplit[3]))
+ .queueName(message.getMessageProperties().getConsumerQueue())
+ .build());
}
- } catch (Exception e) {
- throw new RuntimeException(e);
-
}
}
+
+
+ public void decode(ModbusMessage modbusMessage) {
+ Long commandId = modbusMessage.getCommandId();
+ String cacheName = ModbusMessagePersistListener.COMMAND_CACHE + commandId + ":points";
+ String cachePoints = Objects.toString(this.redisTemplate.opsForValue().get(cacheName), "");
+
+ List