Merge remote-tracking branch 'origin/develop' into develop
This commit is contained in:
commit
dd3b0cd4b7
27
pom.xml
27
pom.xml
|
@ -191,6 +191,33 @@
|
|||
<version>3.0.3</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>1.18.34</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok-maven-plugin</artifactId>
|
||||
<version>${lombok-maven-plugin.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok-mapstruct-binding</artifactId>
|
||||
<version>${lombok-mapstruct-binding.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mapstruct</groupId>
|
||||
<artifactId>mapstruct-processor</artifactId>
|
||||
<version>${mapstruct.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
<version>3.2.8</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<!-- 配置文件管理 -->
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
package com.isu.gaswellwatch.modbus.data;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
|
||||
* 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066
|
||||
*
|
||||
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
|
||||
* 2024/11/24 10:11
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
public class ModbusMessage implements Serializable {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 429189115651027028L;
|
||||
/**
|
||||
* 队列地址
|
||||
*/
|
||||
private String queueName;
|
||||
/**
|
||||
* 网关标识
|
||||
*/
|
||||
private String gatewayIdentifier;
|
||||
/**
|
||||
* 设备标识
|
||||
*/
|
||||
private Long deviceId;
|
||||
/**
|
||||
* 指令标识
|
||||
*/
|
||||
private Long commandId;
|
||||
/**
|
||||
* 采集指令下发时间
|
||||
*/
|
||||
private Long collectionTime;
|
||||
/**
|
||||
* 采集数据接收时间
|
||||
*/
|
||||
private Long receiveTime;
|
||||
/**
|
||||
* 采集原始字符串
|
||||
*/
|
||||
private String message;
|
||||
/**
|
||||
* key: 点位地址, value: 点位解析值
|
||||
*/
|
||||
private Map<String, MessagePoint> messagePointMap;
|
||||
|
||||
/**
|
||||
* 点位解析数据
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
public static class MessagePoint<T> implements Serializable {
|
||||
@Serial
|
||||
private static final long serialVersionUID = -3118301653064611676L;
|
||||
/**
|
||||
* 原始解析值
|
||||
*/
|
||||
private String originalValue;
|
||||
/**
|
||||
* 初始解析值
|
||||
*/
|
||||
private T parseValue;
|
||||
/**
|
||||
* 修正解析值
|
||||
*/
|
||||
private T value;
|
||||
}
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
package com.isu.gaswellwatch.modbus.data.decode;
|
||||
|
||||
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.core.Message;
|
||||
|
||||
/**
|
||||
* Modbus 数据解析器
|
||||
|
@ -15,6 +15,6 @@ public interface DecodeHandler {
|
|||
String MODBUS_DEVICE_TYPE_FIELD_NAME = "modbusDeviceType";
|
||||
Logger logger = LoggerFactory.getLogger("com.isu.gaswellwatch.decode");
|
||||
|
||||
void decode(Message message) throws Exception;
|
||||
void decode(ModbusMessage modbusMessage);
|
||||
|
||||
}
|
||||
|
|
|
@ -1,15 +1,12 @@
|
|||
package com.isu.gaswellwatch.modbus.data.decode.impl;
|
||||
|
||||
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
|
||||
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
|
||||
import com.serotonin.modbus4j.msg.ReadInputRegistersResponse;
|
||||
import com.serotonin.modbus4j.serial.rtu.RtuMessageParser;
|
||||
import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse;
|
||||
import com.serotonin.modbus4j.sero.util.queue.ByteQueue;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.data.redis.core.HashOperations;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
|
@ -18,35 +15,25 @@ import org.springframework.stereotype.Component;
|
|||
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
|
||||
* 2024/11/23 11:23
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Component(Knpcv1DecodeHandler.MODBUS_DEVICE_TYPE)
|
||||
public class Knpcv1DecodeHandler implements DecodeHandler {
|
||||
|
||||
public static final String MODBUS_DEVICE_TYPE = "KNPCV1";
|
||||
private final RedisTemplate redisTemplate;
|
||||
|
||||
@Resource
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Resource(name = "redisTemplate")
|
||||
private RedisTemplate redisTemplate;
|
||||
|
||||
@Override
|
||||
public void decode(Message message) throws Exception {
|
||||
public void decode(ModbusMessage modbusMessage) {
|
||||
|
||||
// /device/4B454E454E4731343030303030333538/collect
|
||||
String queueName = message.getMessageProperties().getConsumerQueue();
|
||||
String messageString = new String(message.getBody());
|
||||
HashOperations<String, Object, Object> hashOperations = this.redisTemplate.opsForHash();
|
||||
String[] queuePath = StringUtils.split(queueName, "/");
|
||||
String deviceId = queuePath[1];
|
||||
String command = queuePath[2];
|
||||
String[] queuePath = StringUtils.split(modbusMessage.getQueueName(), "/");
|
||||
|
||||
logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, queueName, messageString);
|
||||
logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, modbusMessage.getQueueName(), modbusMessage.getMessage());
|
||||
|
||||
ByteQueue byteQueue = new ByteQueue(messageString.replaceAll(" ", ""));
|
||||
RtuMessageParser masterParser = new RtuMessageParser(true);
|
||||
RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
|
||||
ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) response.getModbusResponse();
|
||||
int index = 0;
|
||||
hashOperations.put(queueName, DecodeHandler.MODBUS_DEVICE_TYPE_FIELD_NAME, MODBUS_DEVICE_TYPE);
|
||||
for (short value : readInputRegistersResponse.getShortData()) {
|
||||
hashOperations.put(queueName, StringUtils.leftPad(String.valueOf(index++), 4, '0'), value);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,15 +1,16 @@
|
|||
package com.isu.gaswellwatch.modbus.data.decode.listener;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.compress.utils.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.amqp.core.BatchMessageListener;
|
||||
import org.springframework.amqp.core.Message;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.List;
|
||||
|
@ -28,18 +29,33 @@ public class ModbusMessageBackupListener implements BatchMessageListener {
|
|||
|
||||
@Override
|
||||
public void onMessageBatch(List<Message> messages) {
|
||||
Path filePath;
|
||||
File file;
|
||||
String messageString, queueName, backupFileName;
|
||||
for (Message message : messages) {
|
||||
messageString = new String(message.getBody());
|
||||
queueName = message.getMessageProperties().getConsumerQueue();
|
||||
backupFileName = StringUtils.replace(queueName, "/", "_");
|
||||
messageString = new String(message.getBody());
|
||||
filePath = Paths.get("D:\\modbus\\data\\" + backupFileName + ".data");
|
||||
try (BufferedWriter writer = Files.newBufferedWriter(filePath, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
|
||||
if (StringUtils.startsWith(backupFileName, "_")) {
|
||||
backupFileName = StringUtils.substring(backupFileName, 1);
|
||||
}
|
||||
backupFileName = "D:\\backup\\modbus\\data\\" + backupFileName + ".data";
|
||||
|
||||
BufferedWriter writer = null;
|
||||
try {
|
||||
file = new File(backupFileName);
|
||||
if (!file.getParentFile().exists()) {
|
||||
file.getParentFile().mkdirs();
|
||||
}
|
||||
if (!file.exists()) {
|
||||
file.createNewFile();
|
||||
}
|
||||
writer = Files.newBufferedWriter(Paths.get(backupFileName), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
|
||||
writer.write(messageString);
|
||||
writer.write("\n");
|
||||
} catch (IOException e) {
|
||||
log.error("Backup message failed. QueueName {}, Message {}", queueName, messageString, e);
|
||||
} finally {
|
||||
IOUtils.closeQuietly(writer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,18 +1,47 @@
|
|||
package com.isu.gaswellwatch.modbus.data.decode.listener;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
|
||||
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
|
||||
import com.serotonin.modbus4j.code.FunctionCode;
|
||||
import com.serotonin.modbus4j.msg.ModbusResponse;
|
||||
import com.serotonin.modbus4j.msg.ReadInputRegistersResponse;
|
||||
import com.serotonin.modbus4j.serial.rtu.RtuMessageParser;
|
||||
import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse;
|
||||
import com.serotonin.modbus4j.sero.util.queue.ByteQueue;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.springframework.amqp.core.BatchMessageListener;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
|
||||
* 2024/11/23 0:43
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class ModbusMessagePersistListener implements BatchMessageListener {
|
||||
|
||||
private static final String COMMAND_SQL = "select * from commands where id = ?";
|
||||
|
||||
@Resource
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
@Resource(name = "redisTemplate")
|
||||
private RedisTemplate redisTemplate;
|
||||
@Resource
|
||||
private Map<String, DecodeHandler> decodeHandlerMap;
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
this.onMessageBatch(List.of(message));
|
||||
|
@ -20,7 +49,88 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
|
|||
|
||||
@Override
|
||||
public void onMessageBatch(List<Message> messages) {
|
||||
|
||||
try {
|
||||
Long commandId;
|
||||
String[] messageSplit;
|
||||
Map<String, Object> commandMap;
|
||||
String messageString, collectionMessage;
|
||||
Map<String, ModbusMessage.MessagePoint> messagePointMap;
|
||||
// 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
|
||||
// 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066
|
||||
for (Message message : messages) {
|
||||
messageString = new String(message.getBody());
|
||||
messageSplit = StringUtils.split(messageString, "/");
|
||||
if (messageSplit.length < 6) {
|
||||
log.error("非法数据: {}", messageString);
|
||||
continue;
|
||||
}
|
||||
|
||||
collectionMessage = messageSplit[5];
|
||||
commandId = NumberUtils.createLong(messageSplit[2]);
|
||||
commandMap = this.jdbcTemplate.queryForMap(COMMAND_SQL, commandId);
|
||||
if (ObjectUtils.isNotEmpty(commandMap)) {
|
||||
log.error("指令[{}]不存在,数据: {}", commandId, messageString);
|
||||
continue;
|
||||
}
|
||||
int index = MapUtil.getInt(commandMap, "start_address"), stepSize = 0;
|
||||
|
||||
messagePointMap = new HashMap<>();
|
||||
ByteQueue byteQueue = new ByteQueue(collectionMessage);
|
||||
RtuMessageParser masterParser = new RtuMessageParser(true);
|
||||
RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
|
||||
ModbusResponse modbusResponse = response.getModbusResponse();
|
||||
switch (modbusResponse.getFunctionCode()) {
|
||||
case FunctionCode.READ_INPUT_REGISTERS: {
|
||||
ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) modbusResponse;
|
||||
for (short value : readInputRegistersResponse.getShortData()) {
|
||||
stepSize = index * 4;
|
||||
messagePointMap.put(StringUtils.leftPad(String.valueOf(index), 4, '0'),
|
||||
ModbusMessage.MessagePoint.builder()
|
||||
.parseValue(value)
|
||||
.originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize))
|
||||
.build());
|
||||
index++;
|
||||
}
|
||||
}
|
||||
// case FunctionCode.READ_COILS: {
|
||||
// ReadCoilsResponse readCoilsResponse = (ReadCoilsResponse) modbusResponse;
|
||||
// }
|
||||
// case FunctionCode.READ_DISCRETE_INPUTS: {
|
||||
// ReadDiscreteInputsResponse readDiscreteInputsResponse = (ReadDiscreteInputsResponse) modbusResponse;
|
||||
// }
|
||||
// case FunctionCode.READ_HOLDING_REGISTERS: {
|
||||
// ReadHoldingRegistersResponse readHoldingRegistersResponse = (ReadHoldingRegistersResponse) modbusResponse;
|
||||
// }
|
||||
// case FunctionCode.READ_EXCEPTION_STATUS: {
|
||||
// ReadExceptionStatusResponse readExceptionStatusResponse = (ReadExceptionStatusResponse) modbusResponse;
|
||||
// }
|
||||
// case FunctionCode.REPORT_SLAVE_ID: {
|
||||
// ReportSlaveIdResponse reportSlaveIdResponse = (ReportSlaveIdResponse) modbusResponse;
|
||||
// }
|
||||
default: {
|
||||
log.error("FunctionCode {}, Message {}", modbusResponse.getFunctionCode(), collectionMessage);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Optional.ofNullable(commandMap.get("decode_handler_name"))
|
||||
.map(this.decodeHandlerMap::get)
|
||||
.ifPresent(decodeHandler -> decodeHandler.decode(ModbusMessage.builder()
|
||||
.queueName(message.getMessageProperties().getConsumerQueue())
|
||||
.commandId(commandId)
|
||||
.message(collectionMessage)
|
||||
.gatewayIdentifier(messageSplit[0])
|
||||
.deviceId(NumberUtils.createLong(messageSplit[1]))
|
||||
.collectionTime(NumberUtils.createLong(messageSplit[3]))
|
||||
.receiveTime(NumberUtils.createLong(messageSplit[4]))
|
||||
.messagePointMap(messagePointMap)
|
||||
.build()));
|
||||
|
||||
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
package com.isu.gaswellwatch.modbus.data.impl;
|
||||
|
||||
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
@ -11,10 +12,13 @@ import org.springframework.stereotype.Component;
|
|||
* 2024/11/23 11:55
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class Redis2DBPersistenceHandlerImpl implements PersistenceHandler {
|
||||
|
||||
private final RedisTemplate redisTemplate;
|
||||
@Resource
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Resource(name = "redisTemplate")
|
||||
private RedisTemplate redisTemplate;
|
||||
|
||||
@Scheduled(cron = "0/10 * * * * ? ")
|
||||
public void write() {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
spring:
|
||||
datasource:
|
||||
type: com.alibaba.druid.pool.DruidDataSource
|
||||
url: jdbc:mysql://127.0.0.1:3306/gaswellwatch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
|
||||
username: root
|
||||
password: 1qaz@WSX
|
||||
url: jdbc:mysql://localhost:3306/gas_well_watch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
|
||||
username: cq
|
||||
password: cq
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
|
@ -6,7 +6,7 @@ spring:
|
|||
max-file-size: 100MB #-1 无限制
|
||||
max-request-size: 200MB #指定为100MB -1无限制
|
||||
profiles:
|
||||
active: @environment@
|
||||
active: dev
|
||||
redis:
|
||||
host: ${REDIS_HOST:localhost}
|
||||
port: ${REDIS_PORT:6379}
|
||||
|
|
Loading…
Reference in New Issue