From a183cf61034968a485f3072df1a42836281bbb4d Mon Sep 17 00:00:00 2001 From: wangshilong Date: Sun, 24 Nov 2024 12:03:06 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E6=9E=90=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 27 +++++ .../modbus/data/ModbusMessage.java | 82 +++++++++++++ .../modbus/data/decode/DecodeHandler.java | 4 +- .../data/decode/impl/Knpcv1DecodeHandler.java | 37 ++---- .../listener/ModbusMessageBackupListener.java | 26 +++- .../ModbusMessagePersistListener.java | 112 +++++++++++++++++- .../impl/Redis2DBPersistenceHandlerImpl.java | 10 +- src/main/resources/application-dev.yaml | 6 +- src/main/resources/application.yaml | 2 +- 9 files changed, 266 insertions(+), 40 deletions(-) create mode 100644 src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java diff --git a/pom.xml b/pom.xml index 67decf2..16c02e5 100644 --- a/pom.xml +++ b/pom.xml @@ -191,6 +191,33 @@ 3.0.3 test + + + + org.projectlombok + lombok + 1.18.34 + + + org.projectlombok + lombok-maven-plugin + ${lombok-maven-plugin.version} + + + org.projectlombok + lombok-mapstruct-binding + ${lombok-mapstruct-binding.version} + + + org.mapstruct + mapstruct-processor + ${mapstruct.version} + + + org.springframework.boot + spring-boot-configuration-processor + 3.2.8 + diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java b/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java new file mode 100644 index 0000000..d10de18 --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/ModbusMessage.java @@ -0,0 +1,82 @@ +package com.isu.gaswellwatch.modbus.data; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.experimental.SuperBuilder; + +import java.io.Serial; +import java.io.Serializable; +import java.util.Map; + +/** + * 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体 + * 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066 + * + * @author 王仕龙 + * 2024/11/24 10:11 + */ +@Getter +@Setter +@SuperBuilder +@NoArgsConstructor +public class ModbusMessage implements Serializable { + @Serial + private static final long serialVersionUID = 429189115651027028L; + /** + * 队列地址 + */ + private String queueName; + /** + * 网关标识 + */ + private String gatewayIdentifier; + /** + * 设备标识 + */ + private Long deviceId; + /** + * 指令标识 + */ + private Long commandId; + /** + * 采集指令下发时间 + */ + private Long collectionTime; + /** + * 采集数据接收时间 + */ + private Long receiveTime; + /** + * 采集原始字符串 + */ + private String message; + /** + * key: 点位地址, value: 点位解析值 + */ + private Map messagePointMap; + + /** + * 点位解析数据 + */ + @Getter + @Setter + @SuperBuilder + @NoArgsConstructor + public static class MessagePoint implements Serializable { + @Serial + private static final long serialVersionUID = -3118301653064611676L; + /** + * 原始解析值 + */ + private String originalValue; + /** + * 初始解析值 + */ + private T parseValue; + /** + * 修正解析值 + */ + private T value; + } +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java index b48d326..98e4454 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java @@ -1,8 +1,8 @@ package com.isu.gaswellwatch.modbus.data.decode; +import com.isu.gaswellwatch.modbus.data.ModbusMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.amqp.core.Message; /** * Modbus 数据解析器 @@ -15,6 +15,6 @@ public interface DecodeHandler { String MODBUS_DEVICE_TYPE_FIELD_NAME = "modbusDeviceType"; Logger logger = LoggerFactory.getLogger("com.isu.gaswellwatch.decode"); - void decode(Message message) throws Exception; + void decode(ModbusMessage modbusMessage); } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java index d80d94c..4bf9df7 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java @@ -1,15 +1,12 @@ package com.isu.gaswellwatch.modbus.data.decode.impl; +import com.isu.gaswellwatch.modbus.data.ModbusMessage; import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; -import com.serotonin.modbus4j.msg.ReadInputRegistersResponse; -import com.serotonin.modbus4j.serial.rtu.RtuMessageParser; -import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse; -import com.serotonin.modbus4j.sero.util.queue.ByteQueue; -import lombok.RequiredArgsConstructor; +import jakarta.annotation.Resource; import org.apache.commons.lang3.StringUtils; -import org.springframework.amqp.core.Message; import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; /** @@ -18,35 +15,25 @@ import org.springframework.stereotype.Component; * @author 王仕龙 * 2024/11/23 11:23 */ -@RequiredArgsConstructor @Component(Knpcv1DecodeHandler.MODBUS_DEVICE_TYPE) public class Knpcv1DecodeHandler implements DecodeHandler { public static final String MODBUS_DEVICE_TYPE = "KNPCV1"; - private final RedisTemplate redisTemplate; + + @Resource + private JdbcTemplate jdbcTemplate; + + @Resource(name = "redisTemplate") + private RedisTemplate redisTemplate; @Override - public void decode(Message message) throws Exception { + public void decode(ModbusMessage modbusMessage) { - // /device/4B454E454E4731343030303030333538/collect - String queueName = message.getMessageProperties().getConsumerQueue(); - String messageString = new String(message.getBody()); HashOperations hashOperations = this.redisTemplate.opsForHash(); - String[] queuePath = StringUtils.split(queueName, "/"); - String deviceId = queuePath[1]; - String command = queuePath[2]; + String[] queuePath = StringUtils.split(modbusMessage.getQueueName(), "/"); - logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, queueName, messageString); + logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, modbusMessage.getQueueName(), modbusMessage.getMessage()); - ByteQueue byteQueue = new ByteQueue(messageString.replaceAll(" ", "")); - RtuMessageParser masterParser = new RtuMessageParser(true); - RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue); - ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) response.getModbusResponse(); - int index = 0; - hashOperations.put(queueName, DecodeHandler.MODBUS_DEVICE_TYPE_FIELD_NAME, MODBUS_DEVICE_TYPE); - for (short value : readInputRegistersResponse.getShortData()) { - hashOperations.put(queueName, StringUtils.leftPad(String.valueOf(index++), 4, '0'), value); - } } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java index 9cd9b93..dd604c0 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java @@ -1,15 +1,16 @@ package com.isu.gaswellwatch.modbus.data.decode.listener; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.BatchMessageListener; import org.springframework.amqp.core.Message; import java.io.BufferedWriter; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.List; @@ -28,18 +29,33 @@ public class ModbusMessageBackupListener implements BatchMessageListener { @Override public void onMessageBatch(List messages) { - Path filePath; + File file; String messageString, queueName, backupFileName; for (Message message : messages) { + messageString = new String(message.getBody()); queueName = message.getMessageProperties().getConsumerQueue(); backupFileName = StringUtils.replace(queueName, "/", "_"); - messageString = new String(message.getBody()); - filePath = Paths.get("D:\\modbus\\data\\" + backupFileName + ".data"); - try (BufferedWriter writer = Files.newBufferedWriter(filePath, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) { + if (StringUtils.startsWith(backupFileName, "_")) { + backupFileName = StringUtils.substring(backupFileName, 1); + } + backupFileName = "D:\\backup\\modbus\\data\\" + backupFileName + ".data"; + + BufferedWriter writer = null; + try { + file = new File(backupFileName); + if (!file.getParentFile().exists()) { + file.getParentFile().mkdirs(); + } + if (!file.exists()) { + file.createNewFile(); + } + writer = Files.newBufferedWriter(Paths.get(backupFileName), StandardCharsets.UTF_8, StandardOpenOption.APPEND); writer.write(messageString); writer.write("\n"); } catch (IOException e) { log.error("Backup message failed. QueueName {}, Message {}", queueName, messageString, e); + } finally { + IOUtils.closeQuietly(writer); } } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java index 5f6529e..8c89723 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java @@ -1,18 +1,47 @@ package com.isu.gaswellwatch.modbus.data.decode.listener; +import cn.hutool.core.map.MapUtil; +import com.isu.gaswellwatch.modbus.data.ModbusMessage; +import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; +import com.serotonin.modbus4j.code.FunctionCode; +import com.serotonin.modbus4j.msg.ModbusResponse; +import com.serotonin.modbus4j.msg.ReadInputRegistersResponse; +import com.serotonin.modbus4j.serial.rtu.RtuMessageParser; +import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse; +import com.serotonin.modbus4j.sero.util.queue.ByteQueue; +import jakarta.annotation.Resource; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.springframework.amqp.core.BatchMessageListener; import org.springframework.amqp.core.Message; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.jdbc.core.JdbcTemplate; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; /** * @author 王仕龙 * 2024/11/23 0:43 */ @Slf4j +@RequiredArgsConstructor public class ModbusMessagePersistListener implements BatchMessageListener { + private static final String COMMAND_SQL = "select * from commands where id = ?"; + + @Resource + private JdbcTemplate jdbcTemplate; + @Resource(name = "redisTemplate") + private RedisTemplate redisTemplate; + @Resource + private Map decodeHandlerMap; + @Override public void onMessage(Message message) { this.onMessageBatch(List.of(message)); @@ -20,7 +49,88 @@ public class ModbusMessagePersistListener implements BatchMessageListener { @Override public void onMessageBatch(List messages) { - for (Message message : messages) { + + try { + Long commandId; + String[] messageSplit; + Map commandMap; + String messageString, collectionMessage; + Map messagePointMap; +// 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体 +// 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066 + for (Message message : messages) { + messageString = new String(message.getBody()); + messageSplit = StringUtils.split(messageString, "/"); + if (messageSplit.length < 6) { + log.error("非法数据: {}", messageString); + continue; + } + + collectionMessage = messageSplit[5]; + commandId = NumberUtils.createLong(messageSplit[2]); + commandMap = this.jdbcTemplate.queryForMap(COMMAND_SQL, commandId); + if (ObjectUtils.isNotEmpty(commandMap)) { + log.error("指令[{}]不存在,数据: {}", commandId, messageString); + continue; + } + int index = MapUtil.getInt(commandMap, "start_address"), stepSize = 0; + + messagePointMap = new HashMap<>(); + ByteQueue byteQueue = new ByteQueue(collectionMessage); + RtuMessageParser masterParser = new RtuMessageParser(true); + RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue); + ModbusResponse modbusResponse = response.getModbusResponse(); + switch (modbusResponse.getFunctionCode()) { + case FunctionCode.READ_INPUT_REGISTERS: { + ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) modbusResponse; + for (short value : readInputRegistersResponse.getShortData()) { + stepSize = index * 4; + messagePointMap.put(StringUtils.leftPad(String.valueOf(index), 4, '0'), + ModbusMessage.MessagePoint.builder() + .parseValue(value) + .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize)) + .build()); + index++; + } + } +// case FunctionCode.READ_COILS: { +// ReadCoilsResponse readCoilsResponse = (ReadCoilsResponse) modbusResponse; +// } +// case FunctionCode.READ_DISCRETE_INPUTS: { +// ReadDiscreteInputsResponse readDiscreteInputsResponse = (ReadDiscreteInputsResponse) modbusResponse; +// } +// case FunctionCode.READ_HOLDING_REGISTERS: { +// ReadHoldingRegistersResponse readHoldingRegistersResponse = (ReadHoldingRegistersResponse) modbusResponse; +// } +// case FunctionCode.READ_EXCEPTION_STATUS: { +// ReadExceptionStatusResponse readExceptionStatusResponse = (ReadExceptionStatusResponse) modbusResponse; +// } +// case FunctionCode.REPORT_SLAVE_ID: { +// ReportSlaveIdResponse reportSlaveIdResponse = (ReportSlaveIdResponse) modbusResponse; +// } + default: { + log.error("FunctionCode {}, Message {}", modbusResponse.getFunctionCode(), collectionMessage); + } + } + + + Optional.ofNullable(commandMap.get("decode_handler_name")) + .map(this.decodeHandlerMap::get) + .ifPresent(decodeHandler -> decodeHandler.decode(ModbusMessage.builder() + .queueName(message.getMessageProperties().getConsumerQueue()) + .commandId(commandId) + .message(collectionMessage) + .gatewayIdentifier(messageSplit[0]) + .deviceId(NumberUtils.createLong(messageSplit[1])) + .collectionTime(NumberUtils.createLong(messageSplit[3])) + .receiveTime(NumberUtils.createLong(messageSplit[4])) + .messagePointMap(messagePointMap) + .build())); + + + } + } catch (Exception e) { + throw new RuntimeException(e); } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java index ddb945a..fe27be0 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java @@ -1,8 +1,9 @@ package com.isu.gaswellwatch.modbus.data.impl; import com.isu.gaswellwatch.modbus.data.PersistenceHandler; -import lombok.RequiredArgsConstructor; +import jakarta.annotation.Resource; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -11,10 +12,13 @@ import org.springframework.stereotype.Component; * 2024/11/23 11:55 */ @Component -@RequiredArgsConstructor public class Redis2DBPersistenceHandlerImpl implements PersistenceHandler { - private final RedisTemplate redisTemplate; + @Resource + private JdbcTemplate jdbcTemplate; + + @Resource(name = "redisTemplate") + private RedisTemplate redisTemplate; @Scheduled(cron = "0/10 * * * * ? ") public void write() { diff --git a/src/main/resources/application-dev.yaml b/src/main/resources/application-dev.yaml index 46a7cdb..ed1feb9 100644 --- a/src/main/resources/application-dev.yaml +++ b/src/main/resources/application-dev.yaml @@ -1,7 +1,7 @@ spring: datasource: type: com.alibaba.druid.pool.DruidDataSource - url: jdbc:mysql://127.0.0.1:3306/gaswellwatch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai - username: root - password: 1qaz@WSX + url: jdbc:mysql://localhost:3306/gas_well_watch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai + username: cq + password: cq driver-class-name: com.mysql.cj.jdbc.Driver \ No newline at end of file diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 08dab3a..5a79c3a 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -6,7 +6,7 @@ spring: max-file-size: 100MB #-1 无限制 max-request-size: 200MB #指定为100MB -1无限制 profiles: - active: @environment@ + active: dev redis: host: ${REDIS_HOST:localhost} port: ${REDIS_PORT:6379}