解析代码

This commit is contained in:
wangshilong 2024-11-24 12:03:06 +08:00
parent 1e97ba7505
commit a183cf6103
9 changed files with 266 additions and 40 deletions

27
pom.xml
View File

@ -191,6 +191,33 @@
<version>3.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok-maven-plugin</artifactId>
<version>${lombok-maven-plugin.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok-mapstruct-binding</artifactId>
<version>${lombok-mapstruct-binding.version}</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<version>3.2.8</version>
</dependency>
</dependencies>
<!-- 配置文件管理 -->

View File

@ -0,0 +1,82 @@
package com.isu.gaswellwatch.modbus.data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.experimental.SuperBuilder;
import java.io.Serial;
import java.io.Serializable;
import java.util.Map;
/**
* 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
* 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066
*
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/24 10:11
*/
@Getter
@Setter
@SuperBuilder
@NoArgsConstructor
public class ModbusMessage implements Serializable {
@Serial
private static final long serialVersionUID = 429189115651027028L;
/**
* 队列地址
*/
private String queueName;
/**
* 网关标识
*/
private String gatewayIdentifier;
/**
* 设备标识
*/
private Long deviceId;
/**
* 指令标识
*/
private Long commandId;
/**
* 采集指令下发时间
*/
private Long collectionTime;
/**
* 采集数据接收时间
*/
private Long receiveTime;
/**
* 采集原始字符串
*/
private String message;
/**
* key: 点位地址, value: 点位解析值
*/
private Map<String, MessagePoint> messagePointMap;
/**
* 点位解析数据
*/
@Getter
@Setter
@SuperBuilder
@NoArgsConstructor
public static class MessagePoint<T> implements Serializable {
@Serial
private static final long serialVersionUID = -3118301653064611676L;
/**
* 原始解析值
*/
private String originalValue;
/**
* 初始解析值
*/
private T parseValue;
/**
* 修正解析值
*/
private T value;
}
}

View File

@ -1,8 +1,8 @@
package com.isu.gaswellwatch.modbus.data.decode;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
/**
* Modbus 数据解析器
@ -15,6 +15,6 @@ public interface DecodeHandler {
String MODBUS_DEVICE_TYPE_FIELD_NAME = "modbusDeviceType";
Logger logger = LoggerFactory.getLogger("com.isu.gaswellwatch.decode");
void decode(Message message) throws Exception;
void decode(ModbusMessage modbusMessage);
}

View File

@ -1,15 +1,12 @@
package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import com.serotonin.modbus4j.msg.ReadInputRegistersResponse;
import com.serotonin.modbus4j.serial.rtu.RtuMessageParser;
import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse;
import com.serotonin.modbus4j.sero.util.queue.ByteQueue;
import lombok.RequiredArgsConstructor;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
/**
@ -18,35 +15,25 @@ import org.springframework.stereotype.Component;
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 11:23
*/
@RequiredArgsConstructor
@Component(Knpcv1DecodeHandler.MODBUS_DEVICE_TYPE)
public class Knpcv1DecodeHandler implements DecodeHandler {
public static final String MODBUS_DEVICE_TYPE = "KNPCV1";
private final RedisTemplate redisTemplate;
@Resource
private JdbcTemplate jdbcTemplate;
@Resource(name = "redisTemplate")
private RedisTemplate redisTemplate;
@Override
public void decode(Message message) throws Exception {
public void decode(ModbusMessage modbusMessage) {
// /device/4B454E454E4731343030303030333538/collect
String queueName = message.getMessageProperties().getConsumerQueue();
String messageString = new String(message.getBody());
HashOperations<String, Object, Object> hashOperations = this.redisTemplate.opsForHash();
String[] queuePath = StringUtils.split(queueName, "/");
String deviceId = queuePath[1];
String command = queuePath[2];
String[] queuePath = StringUtils.split(modbusMessage.getQueueName(), "/");
logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, queueName, messageString);
logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, modbusMessage.getQueueName(), modbusMessage.getMessage());
ByteQueue byteQueue = new ByteQueue(messageString.replaceAll(" ", ""));
RtuMessageParser masterParser = new RtuMessageParser(true);
RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) response.getModbusResponse();
int index = 0;
hashOperations.put(queueName, DecodeHandler.MODBUS_DEVICE_TYPE_FIELD_NAME, MODBUS_DEVICE_TYPE);
for (short value : readInputRegistersResponse.getShortData()) {
hashOperations.put(queueName, StringUtils.leftPad(String.valueOf(index++), 4, '0'), value);
}
}
}

View File

@ -1,15 +1,16 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Message;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
@ -28,18 +29,33 @@ public class ModbusMessageBackupListener implements BatchMessageListener {
@Override
public void onMessageBatch(List<Message> messages) {
Path filePath;
File file;
String messageString, queueName, backupFileName;
for (Message message : messages) {
messageString = new String(message.getBody());
queueName = message.getMessageProperties().getConsumerQueue();
backupFileName = StringUtils.replace(queueName, "/", "_");
messageString = new String(message.getBody());
filePath = Paths.get("D:\\modbus\\data\\" + backupFileName + ".data");
try (BufferedWriter writer = Files.newBufferedWriter(filePath, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
if (StringUtils.startsWith(backupFileName, "_")) {
backupFileName = StringUtils.substring(backupFileName, 1);
}
backupFileName = "D:\\backup\\modbus\\data\\" + backupFileName + ".data";
BufferedWriter writer = null;
try {
file = new File(backupFileName);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
if (!file.exists()) {
file.createNewFile();
}
writer = Files.newBufferedWriter(Paths.get(backupFileName), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
writer.write(messageString);
writer.write("\n");
} catch (IOException e) {
log.error("Backup message failed. QueueName {}, Message {}", queueName, messageString, e);
} finally {
IOUtils.closeQuietly(writer);
}
}
}

View File

@ -1,18 +1,47 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
import cn.hutool.core.map.MapUtil;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import com.serotonin.modbus4j.code.FunctionCode;
import com.serotonin.modbus4j.msg.ModbusResponse;
import com.serotonin.modbus4j.msg.ReadInputRegistersResponse;
import com.serotonin.modbus4j.serial.rtu.RtuMessageParser;
import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse;
import com.serotonin.modbus4j.sero.util.queue.ByteQueue;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 0:43
*/
@Slf4j
@RequiredArgsConstructor
public class ModbusMessagePersistListener implements BatchMessageListener {
private static final String COMMAND_SQL = "select * from commands where id = ?";
@Resource
private JdbcTemplate jdbcTemplate;
@Resource(name = "redisTemplate")
private RedisTemplate redisTemplate;
@Resource
private Map<String, DecodeHandler> decodeHandlerMap;
@Override
public void onMessage(Message message) {
this.onMessageBatch(List.of(message));
@ -20,7 +49,88 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
@Override
public void onMessageBatch(List<Message> messages) {
for (Message message : messages) {
try {
Long commandId;
String[] messageSplit;
Map<String, Object> commandMap;
String messageString, collectionMessage;
Map<String, ModbusMessage.MessagePoint> messagePointMap;
// 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
// 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066
for (Message message : messages) {
messageString = new String(message.getBody());
messageSplit = StringUtils.split(messageString, "/");
if (messageSplit.length < 6) {
log.error("非法数据: {}", messageString);
continue;
}
collectionMessage = messageSplit[5];
commandId = NumberUtils.createLong(messageSplit[2]);
commandMap = this.jdbcTemplate.queryForMap(COMMAND_SQL, commandId);
if (ObjectUtils.isNotEmpty(commandMap)) {
log.error("指令[{}]不存在,数据: {}", commandId, messageString);
continue;
}
int index = MapUtil.getInt(commandMap, "start_address"), stepSize = 0;
messagePointMap = new HashMap<>();
ByteQueue byteQueue = new ByteQueue(collectionMessage);
RtuMessageParser masterParser = new RtuMessageParser(true);
RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
ModbusResponse modbusResponse = response.getModbusResponse();
switch (modbusResponse.getFunctionCode()) {
case FunctionCode.READ_INPUT_REGISTERS: {
ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) modbusResponse;
for (short value : readInputRegistersResponse.getShortData()) {
stepSize = index * 4;
messagePointMap.put(StringUtils.leftPad(String.valueOf(index), 4, '0'),
ModbusMessage.MessagePoint.builder()
.parseValue(value)
.originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize))
.build());
index++;
}
}
// case FunctionCode.READ_COILS: {
// ReadCoilsResponse readCoilsResponse = (ReadCoilsResponse) modbusResponse;
// }
// case FunctionCode.READ_DISCRETE_INPUTS: {
// ReadDiscreteInputsResponse readDiscreteInputsResponse = (ReadDiscreteInputsResponse) modbusResponse;
// }
// case FunctionCode.READ_HOLDING_REGISTERS: {
// ReadHoldingRegistersResponse readHoldingRegistersResponse = (ReadHoldingRegistersResponse) modbusResponse;
// }
// case FunctionCode.READ_EXCEPTION_STATUS: {
// ReadExceptionStatusResponse readExceptionStatusResponse = (ReadExceptionStatusResponse) modbusResponse;
// }
// case FunctionCode.REPORT_SLAVE_ID: {
// ReportSlaveIdResponse reportSlaveIdResponse = (ReportSlaveIdResponse) modbusResponse;
// }
default: {
log.error("FunctionCode {}, Message {}", modbusResponse.getFunctionCode(), collectionMessage);
}
}
Optional.ofNullable(commandMap.get("decode_handler_name"))
.map(this.decodeHandlerMap::get)
.ifPresent(decodeHandler -> decodeHandler.decode(ModbusMessage.builder()
.queueName(message.getMessageProperties().getConsumerQueue())
.commandId(commandId)
.message(collectionMessage)
.gatewayIdentifier(messageSplit[0])
.deviceId(NumberUtils.createLong(messageSplit[1]))
.collectionTime(NumberUtils.createLong(messageSplit[3]))
.receiveTime(NumberUtils.createLong(messageSplit[4]))
.messagePointMap(messagePointMap)
.build()));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

View File

@ -1,8 +1,9 @@
package com.isu.gaswellwatch.modbus.data.impl;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import lombok.RequiredArgsConstructor;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -11,10 +12,13 @@ import org.springframework.stereotype.Component;
* 2024/11/23 11:55
*/
@Component
@RequiredArgsConstructor
public class Redis2DBPersistenceHandlerImpl implements PersistenceHandler {
private final RedisTemplate redisTemplate;
@Resource
private JdbcTemplate jdbcTemplate;
@Resource(name = "redisTemplate")
private RedisTemplate redisTemplate;
@Scheduled(cron = "0/10 * * * * ? ")
public void write() {

View File

@ -1,7 +1,7 @@
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1:3306/gaswellwatch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
username: root
password: 1qaz@WSX
url: jdbc:mysql://localhost:3306/gas_well_watch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
username: cq
password: cq
driver-class-name: com.mysql.cj.jdbc.Driver

View File

@ -6,7 +6,7 @@ spring:
max-file-size: 100MB #-1 无限制
max-request-size: 200MB #指定为100MB -1无限制
profiles:
active: @environment@
active: dev
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}