diff --git a/pom.xml b/pom.xml
index 67decf2..16c02e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -191,6 +191,33 @@
3.0.3
test
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.34
+
+
+ org.projectlombok
+ lombok-maven-plugin
+ ${lombok-maven-plugin.version}
+
+
+ org.projectlombok
+ lombok-mapstruct-binding
+ ${lombok-mapstruct-binding.version}
+
+
+ org.mapstruct
+ mapstruct-processor
+ ${mapstruct.version}
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ 3.2.8
+
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java b/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java
new file mode 100644
index 0000000..d10de18
--- /dev/null
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java
@@ -0,0 +1,82 @@
+package com.isu.gaswellwatch.modbus.data;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.experimental.SuperBuilder;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
+ * 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066
+ *
+ * @author 王仕龙
+ * 2024/11/24 10:11
+ */
+@Getter
+@Setter
+@SuperBuilder
+@NoArgsConstructor
+public class ModbusMessage implements Serializable {
+ @Serial
+ private static final long serialVersionUID = 429189115651027028L;
+ /**
+ * 队列地址
+ */
+ private String queueName;
+ /**
+ * 网关标识
+ */
+ private String gatewayIdentifier;
+ /**
+ * 设备标识
+ */
+ private Long deviceId;
+ /**
+ * 指令标识
+ */
+ private Long commandId;
+ /**
+ * 采集指令下发时间
+ */
+ private Long collectionTime;
+ /**
+ * 采集数据接收时间
+ */
+ private Long receiveTime;
+ /**
+ * 采集原始字符串
+ */
+ private String message;
+ /**
+ * key: 点位地址, value: 点位解析值
+ */
+ private Map messagePointMap;
+
+ /**
+ * 点位解析数据
+ */
+ @Getter
+ @Setter
+ @SuperBuilder
+ @NoArgsConstructor
+ public static class MessagePoint implements Serializable {
+ @Serial
+ private static final long serialVersionUID = -3118301653064611676L;
+ /**
+ * 原始解析值
+ */
+ private String originalValue;
+ /**
+ * 初始解析值
+ */
+ private T parseValue;
+ /**
+ * 修正解析值
+ */
+ private T value;
+ }
+}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java
index b48d326..98e4454 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java
@@ -1,8 +1,8 @@
package com.isu.gaswellwatch.modbus.data.decode;
+import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.amqp.core.Message;
/**
* Modbus 数据解析器
@@ -15,6 +15,6 @@ public interface DecodeHandler {
String MODBUS_DEVICE_TYPE_FIELD_NAME = "modbusDeviceType";
Logger logger = LoggerFactory.getLogger("com.isu.gaswellwatch.decode");
- void decode(Message message) throws Exception;
+ void decode(ModbusMessage modbusMessage);
}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java
index d80d94c..4bf9df7 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java
@@ -1,15 +1,12 @@
package com.isu.gaswellwatch.modbus.data.decode.impl;
+import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
-import com.serotonin.modbus4j.msg.ReadInputRegistersResponse;
-import com.serotonin.modbus4j.serial.rtu.RtuMessageParser;
-import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse;
-import com.serotonin.modbus4j.sero.util.queue.ByteQueue;
-import lombok.RequiredArgsConstructor;
+import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
-import org.springframework.amqp.core.Message;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
/**
@@ -18,35 +15,25 @@ import org.springframework.stereotype.Component;
* @author 王仕龙
* 2024/11/23 11:23
*/
-@RequiredArgsConstructor
@Component(Knpcv1DecodeHandler.MODBUS_DEVICE_TYPE)
public class Knpcv1DecodeHandler implements DecodeHandler {
public static final String MODBUS_DEVICE_TYPE = "KNPCV1";
- private final RedisTemplate redisTemplate;
+
+ @Resource
+ private JdbcTemplate jdbcTemplate;
+
+ @Resource(name = "redisTemplate")
+ private RedisTemplate redisTemplate;
@Override
- public void decode(Message message) throws Exception {
+ public void decode(ModbusMessage modbusMessage) {
- // /device/4B454E454E4731343030303030333538/collect
- String queueName = message.getMessageProperties().getConsumerQueue();
- String messageString = new String(message.getBody());
HashOperations hashOperations = this.redisTemplate.opsForHash();
- String[] queuePath = StringUtils.split(queueName, "/");
- String deviceId = queuePath[1];
- String command = queuePath[2];
+ String[] queuePath = StringUtils.split(modbusMessage.getQueueName(), "/");
- logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, queueName, messageString);
+ logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, modbusMessage.getQueueName(), modbusMessage.getMessage());
- ByteQueue byteQueue = new ByteQueue(messageString.replaceAll(" ", ""));
- RtuMessageParser masterParser = new RtuMessageParser(true);
- RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
- ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) response.getModbusResponse();
- int index = 0;
- hashOperations.put(queueName, DecodeHandler.MODBUS_DEVICE_TYPE_FIELD_NAME, MODBUS_DEVICE_TYPE);
- for (short value : readInputRegistersResponse.getShortData()) {
- hashOperations.put(queueName, StringUtils.leftPad(String.valueOf(index++), 4, '0'), value);
- }
}
}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java
index 9cd9b93..dd604c0 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java
@@ -1,15 +1,16 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Message;
import java.io.BufferedWriter;
+import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
-import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
@@ -28,18 +29,33 @@ public class ModbusMessageBackupListener implements BatchMessageListener {
@Override
public void onMessageBatch(List messages) {
- Path filePath;
+ File file;
String messageString, queueName, backupFileName;
for (Message message : messages) {
+ messageString = new String(message.getBody());
queueName = message.getMessageProperties().getConsumerQueue();
backupFileName = StringUtils.replace(queueName, "/", "_");
- messageString = new String(message.getBody());
- filePath = Paths.get("D:\\modbus\\data\\" + backupFileName + ".data");
- try (BufferedWriter writer = Files.newBufferedWriter(filePath, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
+ if (StringUtils.startsWith(backupFileName, "_")) {
+ backupFileName = StringUtils.substring(backupFileName, 1);
+ }
+ backupFileName = "D:\\backup\\modbus\\data\\" + backupFileName + ".data";
+
+ BufferedWriter writer = null;
+ try {
+ file = new File(backupFileName);
+ if (!file.getParentFile().exists()) {
+ file.getParentFile().mkdirs();
+ }
+ if (!file.exists()) {
+ file.createNewFile();
+ }
+ writer = Files.newBufferedWriter(Paths.get(backupFileName), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
writer.write(messageString);
writer.write("\n");
} catch (IOException e) {
log.error("Backup message failed. QueueName {}, Message {}", queueName, messageString, e);
+ } finally {
+ IOUtils.closeQuietly(writer);
}
}
}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java
index 5f6529e..8c89723 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java
@@ -1,18 +1,47 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
+import cn.hutool.core.map.MapUtil;
+import com.isu.gaswellwatch.modbus.data.ModbusMessage;
+import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
+import com.serotonin.modbus4j.code.FunctionCode;
+import com.serotonin.modbus4j.msg.ModbusResponse;
+import com.serotonin.modbus4j.msg.ReadInputRegistersResponse;
+import com.serotonin.modbus4j.serial.rtu.RtuMessageParser;
+import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse;
+import com.serotonin.modbus4j.sero.util.queue.ByteQueue;
+import jakarta.annotation.Resource;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Message;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.jdbc.core.JdbcTemplate;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
/**
* @author 王仕龙
* 2024/11/23 0:43
*/
@Slf4j
+@RequiredArgsConstructor
public class ModbusMessagePersistListener implements BatchMessageListener {
+ private static final String COMMAND_SQL = "select * from commands where id = ?";
+
+ @Resource
+ private JdbcTemplate jdbcTemplate;
+ @Resource(name = "redisTemplate")
+ private RedisTemplate redisTemplate;
+ @Resource
+ private Map decodeHandlerMap;
+
@Override
public void onMessage(Message message) {
this.onMessageBatch(List.of(message));
@@ -20,7 +49,88 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
@Override
public void onMessageBatch(List messages) {
- for (Message message : messages) {
+
+ try {
+ Long commandId;
+ String[] messageSplit;
+ Map commandMap;
+ String messageString, collectionMessage;
+ Map messagePointMap;
+// 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
+// 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066
+ for (Message message : messages) {
+ messageString = new String(message.getBody());
+ messageSplit = StringUtils.split(messageString, "/");
+ if (messageSplit.length < 6) {
+ log.error("非法数据: {}", messageString);
+ continue;
+ }
+
+ collectionMessage = messageSplit[5];
+ commandId = NumberUtils.createLong(messageSplit[2]);
+ commandMap = this.jdbcTemplate.queryForMap(COMMAND_SQL, commandId);
+ if (ObjectUtils.isNotEmpty(commandMap)) {
+ log.error("指令[{}]不存在,数据: {}", commandId, messageString);
+ continue;
+ }
+ int index = MapUtil.getInt(commandMap, "start_address"), stepSize = 0;
+
+ messagePointMap = new HashMap<>();
+ ByteQueue byteQueue = new ByteQueue(collectionMessage);
+ RtuMessageParser masterParser = new RtuMessageParser(true);
+ RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
+ ModbusResponse modbusResponse = response.getModbusResponse();
+ switch (modbusResponse.getFunctionCode()) {
+ case FunctionCode.READ_INPUT_REGISTERS: {
+ ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) modbusResponse;
+ for (short value : readInputRegistersResponse.getShortData()) {
+ stepSize = index * 4;
+ messagePointMap.put(StringUtils.leftPad(String.valueOf(index), 4, '0'),
+ ModbusMessage.MessagePoint.builder()
+ .parseValue(value)
+ .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize))
+ .build());
+ index++;
+ }
+ }
+// case FunctionCode.READ_COILS: {
+// ReadCoilsResponse readCoilsResponse = (ReadCoilsResponse) modbusResponse;
+// }
+// case FunctionCode.READ_DISCRETE_INPUTS: {
+// ReadDiscreteInputsResponse readDiscreteInputsResponse = (ReadDiscreteInputsResponse) modbusResponse;
+// }
+// case FunctionCode.READ_HOLDING_REGISTERS: {
+// ReadHoldingRegistersResponse readHoldingRegistersResponse = (ReadHoldingRegistersResponse) modbusResponse;
+// }
+// case FunctionCode.READ_EXCEPTION_STATUS: {
+// ReadExceptionStatusResponse readExceptionStatusResponse = (ReadExceptionStatusResponse) modbusResponse;
+// }
+// case FunctionCode.REPORT_SLAVE_ID: {
+// ReportSlaveIdResponse reportSlaveIdResponse = (ReportSlaveIdResponse) modbusResponse;
+// }
+ default: {
+ log.error("FunctionCode {}, Message {}", modbusResponse.getFunctionCode(), collectionMessage);
+ }
+ }
+
+
+ Optional.ofNullable(commandMap.get("decode_handler_name"))
+ .map(this.decodeHandlerMap::get)
+ .ifPresent(decodeHandler -> decodeHandler.decode(ModbusMessage.builder()
+ .queueName(message.getMessageProperties().getConsumerQueue())
+ .commandId(commandId)
+ .message(collectionMessage)
+ .gatewayIdentifier(messageSplit[0])
+ .deviceId(NumberUtils.createLong(messageSplit[1]))
+ .collectionTime(NumberUtils.createLong(messageSplit[3]))
+ .receiveTime(NumberUtils.createLong(messageSplit[4]))
+ .messagePointMap(messagePointMap)
+ .build()));
+
+
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java
index ddb945a..fe27be0 100644
--- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java
+++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java
@@ -1,8 +1,9 @@
package com.isu.gaswellwatch.modbus.data.impl;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
-import lombok.RequiredArgsConstructor;
+import jakarta.annotation.Resource;
import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -11,10 +12,13 @@ import org.springframework.stereotype.Component;
* 2024/11/23 11:55
*/
@Component
-@RequiredArgsConstructor
public class Redis2DBPersistenceHandlerImpl implements PersistenceHandler {
- private final RedisTemplate redisTemplate;
+ @Resource
+ private JdbcTemplate jdbcTemplate;
+
+ @Resource(name = "redisTemplate")
+ private RedisTemplate redisTemplate;
@Scheduled(cron = "0/10 * * * * ? ")
public void write() {
diff --git a/src/main/resources/application-dev.yaml b/src/main/resources/application-dev.yaml
index 46a7cdb..ed1feb9 100644
--- a/src/main/resources/application-dev.yaml
+++ b/src/main/resources/application-dev.yaml
@@ -1,7 +1,7 @@
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
- url: jdbc:mysql://127.0.0.1:3306/gaswellwatch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
- username: root
- password: 1qaz@WSX
+ url: jdbc:mysql://localhost:3306/gas_well_watch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
+ username: cq
+ password: cq
driver-class-name: com.mysql.cj.jdbc.Driver
\ No newline at end of file
diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml
index 08dab3a..5a79c3a 100644
--- a/src/main/resources/application.yaml
+++ b/src/main/resources/application.yaml
@@ -6,7 +6,7 @@ spring:
max-file-size: 100MB #-1 无限制
max-request-size: 200MB #指定为100MB -1无限制
profiles:
- active: @environment@
+ active: dev
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}