解析数据落库

This commit is contained in:
wangshilong 2024-11-24 18:51:30 +08:00
parent dd3b0cd4b7
commit 05c2c43e54
12 changed files with 472 additions and 122 deletions

View File

@ -98,6 +98,11 @@
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>

View File

@ -8,6 +8,7 @@ import lombok.experimental.SuperBuilder;
import java.io.Serial;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
/**
* 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
@ -63,7 +64,7 @@ public class ModbusMessage implements Serializable {
@Setter
@SuperBuilder
@NoArgsConstructor
public static class MessagePoint<T> implements Serializable {
public static class MessagePoint implements Serializable {
@Serial
private static final long serialVersionUID = -3118301653064611676L;
/**
@ -73,10 +74,14 @@ public class ModbusMessage implements Serializable {
/**
* 初始解析值
*/
private T parseValue;
private String parseValue;
/**
* 修正解析值
*/
private T value;
private String value;
public String getValue() {
return Objects.isNull(this.value) ? this.parseValue : this.value;
}
}
}

View File

@ -7,5 +7,11 @@ package com.isu.gaswellwatch.modbus.data;
* 2024/11/23 11:53
*/
public interface PersistenceHandler {
public static final String MODBUS_DEVICE_TYPE = "KNPCV1";
String DEVICE_DATA_CACHE = "data:device:";
void createTable(String tableName);
void insert(String tableName, String cacheKey);
}

View File

@ -12,9 +12,8 @@ import org.slf4j.LoggerFactory;
*/
public interface DecodeHandler {
String MODBUS_DEVICE_TYPE_FIELD_NAME = "modbusDeviceType";
Logger logger = LoggerFactory.getLogger("com.isu.gaswellwatch.decode");
void decode(ModbusMessage modbusMessage);
void decode(ModbusMessage.MessagePoint point);
}

View File

@ -3,8 +3,7 @@ package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.HashOperations;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@ -15,10 +14,13 @@ import org.springframework.stereotype.Component;
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 11:23
*/
@Component(Knpcv1DecodeHandler.MODBUS_DEVICE_TYPE)
@Slf4j
@SuppressWarnings("all")
@Component(Knpcv1DecodeHandler.MODBUS_DEVICE_TYPE + "_1")
public class Knpcv1DecodeHandler implements DecodeHandler {
public static final String MODBUS_DEVICE_TYPE = "KNPCV1";
private static final String COMMAND_POINT_SQL = "select * from command_points where command_id = ?";
@Resource
private JdbcTemplate jdbcTemplate;
@ -27,12 +29,7 @@ public class Knpcv1DecodeHandler implements DecodeHandler {
private RedisTemplate redisTemplate;
@Override
public void decode(ModbusMessage modbusMessage) {
HashOperations<String, Object, Object> hashOperations = this.redisTemplate.opsForHash();
String[] queuePath = StringUtils.split(modbusMessage.getQueueName(), "/");
logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, modbusMessage.getQueueName(), modbusMessage.getMessage());
public void decode(ModbusMessage.MessagePoint point) {
}

View File

@ -1,10 +1,10 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
import cn.hutool.core.map.MapUtil;
import cn.hutool.json.JSONUtil;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import com.serotonin.modbus4j.code.FunctionCode;
import com.serotonin.modbus4j.msg.ModbusResponse;
import com.serotonin.modbus4j.msg.ReadInputRegistersResponse;
import com.serotonin.modbus4j.serial.rtu.RtuMessageParser;
import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse;
@ -17,13 +17,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.time.Duration;
import java.util.*;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
@ -31,9 +30,12 @@ import java.util.Optional;
*/
@Slf4j
@RequiredArgsConstructor
@SuppressWarnings("all")
public class ModbusMessagePersistListener implements BatchMessageListener {
public static final String COMMAND_CACHE = "modbus:command:";
private static final String COMMAND_SQL = "select * from commands where id = ?";
private static final String COMMAND_POINT_SQL = "select * from command_points where command_id = ?";
@Resource
private JdbcTemplate jdbcTemplate;
@ -49,89 +51,137 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
@Override
public void onMessageBatch(List<Message> messages) {
Long commandId;
String[] messageSplit;
Map<String, Object> commandMap;
String messageString, collectionMessage, commandString;
Map<String, ModbusMessage.MessagePoint> messagePointMap;
// 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
// 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D0000
for (Message message : messages) {
messageString = new String(message.getBody());
messageSplit = StringUtils.split(messageString, "/");
if (messageSplit.length < 6) {
log.error("非法数据: {}", messageString);
continue;
}
try {
Long commandId;
String[] messageSplit;
Map<String, Object> commandMap;
String messageString, collectionMessage;
Map<String, ModbusMessage.MessagePoint> messagePointMap;
// 网关标识/设备标识/命令标识/采集下发时间/采集数据接收时间/采集数据体
// 4B454E454E4731343030303030333538/1/10000/1732365690000/1732365691000/01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066
for (Message message : messages) {
messageString = new String(message.getBody());
messageSplit = StringUtils.split(messageString, "/");
if (messageSplit.length < 6) {
log.error("非法数据: {}", messageString);
continue;
}
collectionMessage = messageSplit[5];
commandId = NumberUtils.createLong(messageSplit[2]);
collectionMessage = messageSplit[5];
commandId = NumberUtils.createLong(messageSplit[2]);
commandString = Objects.toString(this.redisTemplate.opsForValue().get(COMMAND_CACHE + commandId), "");
commandMap = StringUtils.isNotEmpty(commandString) ? JSONUtil.parseObj(commandString) : null;
if (ObjectUtils.isEmpty(commandMap)) {
commandMap = this.jdbcTemplate.queryForMap(COMMAND_SQL, commandId);
if (ObjectUtils.isNotEmpty(commandMap)) {
log.error("指令[{}]不存在,数据: {}", commandId, messageString);
continue;
}
this.redisTemplate.opsForValue()
.setIfAbsent(COMMAND_CACHE + commandId, JSONUtil.toJsonStr(commandMap), Duration.ofDays(1));
}
DecodeHandler decodeHandler = Optional.ofNullable(MapUtil.getStr(commandMap, "decode_handler_name"))
.map(this.decodeHandlerMap::get)
.orElse(null);
if (Objects.isNull(decodeHandler)) {
log.error("指令[{}]未配置解析器,数据: {}", commandId, messageString);
continue;
}
messagePointMap = new HashMap<>();
try {
String address;
int index = MapUtil.getInt(commandMap, "start_address"), stepSize = 0;
messagePointMap = new HashMap<>();
ByteQueue byteQueue = new ByteQueue(collectionMessage);
RtuMessageParser masterParser = new RtuMessageParser(true);
RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
ModbusResponse modbusResponse = response.getModbusResponse();
switch (modbusResponse.getFunctionCode()) {
case FunctionCode.READ_INPUT_REGISTERS: {
ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) modbusResponse;
for (short value : readInputRegistersResponse.getShortData()) {
stepSize = index * 4;
messagePointMap.put(StringUtils.leftPad(String.valueOf(index), 4, '0'),
ModbusMessage.MessagePoint.builder()
.parseValue(value)
.originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize))
.build());
index++;
}
}
// case FunctionCode.READ_COILS: {
// ReadCoilsResponse readCoilsResponse = (ReadCoilsResponse) modbusResponse;
// }
// case FunctionCode.READ_DISCRETE_INPUTS: {
// ReadDiscreteInputsResponse readDiscreteInputsResponse = (ReadDiscreteInputsResponse) modbusResponse;
// }
// case FunctionCode.READ_HOLDING_REGISTERS: {
// ReadHoldingRegistersResponse readHoldingRegistersResponse = (ReadHoldingRegistersResponse) modbusResponse;
// }
// case FunctionCode.READ_EXCEPTION_STATUS: {
// ReadExceptionStatusResponse readExceptionStatusResponse = (ReadExceptionStatusResponse) modbusResponse;
// }
// case FunctionCode.REPORT_SLAVE_ID: {
// ReportSlaveIdResponse reportSlaveIdResponse = (ReportSlaveIdResponse) modbusResponse;
// }
default: {
log.error("FunctionCode {}, Message {}", modbusResponse.getFunctionCode(), collectionMessage);
}
ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) response.getModbusResponse();
for (short value : readInputRegistersResponse.getShortData()) {
stepSize = index * 4;
messagePointMap.put(StringUtils.leftPad(String.valueOf(index), 4, '0'),
ModbusMessage.MessagePoint.builder()
.parseValue(String.valueOf(value))
.originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize))
.build());
index++;
}
Optional.ofNullable(commandMap.get("decode_handler_name"))
.map(this.decodeHandlerMap::get)
.ifPresent(decodeHandler -> decodeHandler.decode(ModbusMessage.builder()
.queueName(message.getMessageProperties().getConsumerQueue())
.commandId(commandId)
.message(collectionMessage)
.gatewayIdentifier(messageSplit[0])
.deviceId(NumberUtils.createLong(messageSplit[1]))
.collectionTime(NumberUtils.createLong(messageSplit[3]))
.receiveTime(NumberUtils.createLong(messageSplit[4]))
.messagePointMap(messagePointMap)
.build()));
} catch (Exception e) {
log.error("初始数据解析异常: {}", messageString, e);
continue;
}
if (ObjectUtils.isNotEmpty(messagePointMap)) {
decode(ModbusMessage.builder()
.commandId(commandId)
.message(collectionMessage)
.messagePointMap(messagePointMap)
.gatewayIdentifier(messageSplit[0])
.deviceId(NumberUtils.createLong(messageSplit[1]))
.receiveTime(NumberUtils.createLong(messageSplit[4]))
.collectionTime(NumberUtils.createLong(messageSplit[3]))
.queueName(message.getMessageProperties().getConsumerQueue())
.build());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void decode(ModbusMessage modbusMessage) {
Long commandId = modbusMessage.getCommandId();
String cacheName = ModbusMessagePersistListener.COMMAND_CACHE + commandId + ":points";
String cachePoints = Objects.toString(this.redisTemplate.opsForValue().get(cacheName), "");
List<Map<String, Object>> commandPointList = StringUtils.isNotEmpty(cachePoints) ? (List) JSONUtil.parseArray(cachePoints) : null;
if (ObjectUtils.isEmpty(commandPointList)) {
commandPointList = this.jdbcTemplate.queryForList(COMMAND_POINT_SQL, commandId);
if (ObjectUtils.isNotEmpty(commandPointList)) {
log.error("指令[{}]点表配置为空", commandId, cachePoints);
throw new RuntimeException("指令[" + commandId + "]点表未配置");
}
this.redisTemplate.opsForValue().setIfAbsent(cacheName, JSONUtil.toJsonStr(commandPointList), Duration.ofDays(1));
}
DecodeHandler decodeHandler;
String fieldName, decodeName;
int startAddress = 0, stepSize = 0;
ModbusMessage.MessagePoint messagePoint;
String deviceDataCacheName = PersistenceHandler.DEVICE_DATA_CACHE + modbusMessage.getDeviceId();
HashOperations<String, Object, Object> hashOperations = this.redisTemplate.opsForHash();
for (Map<String, Object> point : commandPointList) {
fieldName = MapUtil.getStr(point, "field");
stepSize = MapUtil.getInt(point, "step_size");
decodeName = MapUtil.getStr(point, "decode_name");
startAddress = MapUtil.getInt(point, "start_address");
String value;
if (stepSize <= 1) {
messagePoint = modbusMessage.getMessagePointMap()
.get(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'));
decodeMessage(decodeName, messagePoint);
value = messagePoint.getValue();
} else {
value = decodeCommandPoint(modbusMessage.getMessagePointMap(), decodeName,
MapUtil.getStr(point, "format"), startAddress, stepSize);
}
hashOperations.put(deviceDataCacheName, fieldName, value);
}
}
private void decodeMessage(String decodeName, ModbusMessage.MessagePoint messagePoint) {
if (StringUtils.isNotBlank(decodeName)) {
DecodeHandler decodeHandler = this.decodeHandlerMap.get(decodeName);
if (Objects.nonNull(decodeHandler)) {
decodeHandler.decode(messagePoint);
}
}
}
private String decodeCommandPoint(Map<String, ModbusMessage.MessagePoint> pointMap,
String decodeName, String format, int startAddress, int stepSize) {
String[] values = new String[stepSize];
ModbusMessage.MessagePoint messagePoint;
for (int i = 0; i < stepSize; i++) {
messagePoint = pointMap.get(StringUtils.leftPad(String.valueOf(startAddress + i), 4, '0'));
decodeMessage(decodeName, messagePoint);
values[i] = messagePoint.getValue();
}
return String.format(format, values);
}
}

View File

@ -0,0 +1,135 @@
package com.isu.gaswellwatch.modbus.data.impl;
import cn.hutool.core.map.MapUtil;
import com.isu.gaswellwatch.config.SnowflakeConfig;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import jakarta.annotation.Resource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.stereotype.Component;
import org.springframework.util.ResourceUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Map;
import java.util.Objects;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/24 17:33
*/
@Component(PersistenceHandler.MODBUS_DEVICE_TYPE)
public class Knpcv1PersistenceHandler implements PersistenceHandler {
@Resource
private JdbcTemplate jdbcTemplate;
@Resource
private SnowflakeConfig snowflakeConfig;
@Resource(name = "redisTemplate")
private RedisTemplate redisTemplate;
@Override
public void createTable(String tableName) {
String createTableSQL = getResource("sql/CREATE_KNPCV1.sql");
this.jdbcTemplate.execute(StringUtils.replace(createTableSQL, "$TableName$", tableName));
}
@Override
public void insert(String tableName, String cacheKey) {
String insertTableSQL = getResource("sql/INSERT_KNPCV1.sql");
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
Map<String, Object> row = this.redisTemplate.opsForHash().entries(cacheKey);
this.jdbcTemplate.execute(insertTableSQL, new PreparedStatementCallback<Object>() {
@Override
public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
ps.setLong(1, Knpcv1PersistenceHandler.this.snowflakeConfig.snowflakeId());
Knpcv1PersistenceHandler.this.setValue(ps, row, 2, "deviceId", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 3, "collectionTime", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 4, "receiveTime", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 5, "status", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 6, "device_time", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 7, "runMode", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 8, "wellStatus", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 9, "plugStatus", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 10, "statusOpenTime", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 11, "statusCloseTime", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 12, "oilPressure", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 13, "casPressure", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 14, "prePressure", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 15, "pipePressure", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 16, "liquidLevel", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 17, "temperature", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 18, "humidity", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 19, "opmode", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 20, "well_ctl", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 21, "timer1", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 22, "timer2", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 23, "timer1Open", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 24, "timer1Close", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 25, "timer2Open", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 26, "timer2Close", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 27, "timingOpen", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 28, "timingClose", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 29, "timingCelay", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 30, "presource", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 31, "pressureOpen", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 32, "pressureClose", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 33, "triggerType", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 34, "stabilityTime", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 35, "maxOpenWell", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 36, "maxCloseWell", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 37, "minOpenWell", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 38, "minCloseWell", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 39, "presproTect", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 40, "presproSource", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 41, "openWellLimitMax", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 42, "openWellLimitMin", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 43, "plugInitStatus", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 44, "plugSustainTime", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 45, "plugCloseTime", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 46, "tpInitStatus", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 47, "tpOpenSource", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 48, "tpOpenTrigger", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 49, "tpOpenPressure", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 50, "tpOpenTime", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 51, "tpCloseSource", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 52, "tpCloseTrigger", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 53, "tpClosePressure", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 54, "tpCloseTime", Types.BIGINT);
Knpcv1PersistenceHandler.this.setValue(ps, row, 55, "tpStabilityTime", Types.BIGINT);
return ps.executeUpdate();
}
});
}
public void setValue(PreparedStatement ps, Map<String, Object> row, int index, String key, int sqlType) throws SQLException {
String value = MapUtil.getStr(row, key);
if (Objects.isNull(value)) {
ps.setNull(index, sqlType);
} else {
ps.setObject(index, value);
}
}
public static String getResource(String classPath) {
try {
File file = ResourceUtils.getFile("classpath:" + classPath);
return IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,28 +0,0 @@
package com.isu.gaswellwatch.modbus.data.impl;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 11:55
*/
@Component
public class Redis2DBPersistenceHandlerImpl implements PersistenceHandler {
@Resource
private JdbcTemplate jdbcTemplate;
@Resource(name = "redisTemplate")
private RedisTemplate redisTemplate;
@Scheduled(cron = "0/10 * * * * ? ")
public void write() {
this.redisTemplate.getClientList();
}
}

View File

@ -0,0 +1,87 @@
package com.isu.gaswellwatch.modbus.data.impl;
import cn.hutool.core.map.MapUtil;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 11:55
*/
@Slf4j
@Component
@SuppressWarnings("all")
public class Redis2DBPersistenceService {
private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='gaswellwatch' AND TABLE_NAME=?";
@Resource
private JdbcTemplate jdbcTemplate;
@Resource(name = "redisTemplate")
private RedisTemplate redisTemplate;
@Resource
private Map<String, PersistenceHandler> persistenceHandlerMap;
private RestTemplate restTemplate = new RestTemplate();
@Order(100)
@Scheduled(cron = "5/30 * * * * ? ")
public void write() {
Map<Long, String> idGatewayMappingMap = null;
try {
String onlineGateways = this.restTemplate.getForObject("http://127.0.0.1:9999/modbus-tcp/online", String.class);
if (StringUtils.isBlank(onlineGateways)) {
return;
}
List<Map<String, Object>> idGatewayMappingList = this.jdbcTemplate
.queryForList("select id, gateway_sn from devices where gateway_sn in (" + onlineGateways + ")");
if (ObjectUtils.isEmpty(idGatewayMappingList)) {
return;
}
idGatewayMappingMap = new HashMap<Long, String>();
for (Map<String, Object> map : idGatewayMappingList) {
idGatewayMappingMap.put(MapUtil.getLong(map, "id"), MapUtil.getStr(map, "gateway_sn"));
}
} catch (Exception e) {
log.error("Get online devices list fail", e);
}
HashOperations operations = this.redisTemplate.opsForHash();
try (Cursor<String> cursor = this.redisTemplate.scan(ScanOptions.scanOptions()
.match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) {
Long deviceId;
String cacheKey, tableName;
Map<String, Object> deviceMap, temp;
PersistenceHandler persistenceHandler;
while (cursor.hasNext()) {
deviceMap = operations.entries(cacheKey = cursor.next());
deviceId = MapUtil.getLong(deviceMap, "deviceId");
operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId,
"online", idGatewayMappingMap.containsKey(deviceId));
persistenceHandler = persistenceHandlerMap.get("KNPCV1");
tableName = "t_device_data_" + deviceId;
temp = this.jdbcTemplate.queryForMap(EXISTS_TABLE_SQL, tableName); //$TableName$
if (StringUtils.isBlank(MapUtil.getStr(temp, "TABLE_NAME"))) {
persistenceHandler.createTable(tableName);
}
persistenceHandler.insert(tableName, cacheKey);
}
}
}
}

View File

@ -0,0 +1,61 @@
CREATE TABLE `$TableName$`
(
`id` bigint NOT NULL COMMENT '主键',
`device_id` int NOT NULL COMMENT '设备标识',
`created_time` datetime NOT NULL COMMENT '数据落库时间',
`collection_time` datetime NOT NULL COMMENT '采集指令下发时间',
`receive_time` datetime NOT NULL COMMENT '接收到数据时间',
`status` tinyint NULL DEFAULT NULL COMMENT '开关井。寄存器地址0:0001允许值0关井 1开井',
`device_time` datetime NULL DEFAULT NULL COMMENT '设备时间。\r\n寄存器地址3:00003:00013:00023:00033:00043:0005\r\n',
`run_mode` tinyint NULL DEFAULT NULL COMMENT '运行模式:\r\n0手动模式 \r\n1定时器模式 \r\n2计时器模式 \r\n3压力模式\r\n4柱塞模式\r\n5时压模式',
`gas_status` tinyint NULL DEFAULT NULL COMMENT '气井状态:\r\n0关闭 \r\n1打开',
`plug_status` tinyint NULL DEFAULT NULL COMMENT '柱塞状态:\r\n0离开\r\n1上升中\r\n2到达',
`status_start_time` time NULL DEFAULT NULL COMMENT '当前状态开始时间',
`status_end_time` time NULL DEFAULT NULL COMMENT '当前状态结束时间',
`oil_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '油压单位Mpa',
`cas_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '套压单位Mpa',
`pre_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '输压单位Mpa',
`pipe_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '管压单位Mpa',
`liquid_level` decimal(10, 2) NULL DEFAULT NULL COMMENT '流量单位mm',
`temperature` decimal(10, 2) NULL DEFAULT NULL COMMENT '温度:单位℃,*100最高位表示正负0正1负即零下的温度加32768',
`humidity` decimal(10, 2) NULL DEFAULT NULL COMMENT '湿度:单位%',
`op_mode` tinyint NULL DEFAULT NULL COMMENT '运行模式:\r\n0 手动模式 hand_mode\r\n1定时器模式 timer_mode\r\n2计时器模式 t2mode\r\n3压力模式 pressure_mode\r\n4柱塞模式 piston_mode\r\n5时压模式 tp_mode',
`well_ctl` tinyint NULL DEFAULT NULL COMMENT '开关井,保持现有的模式,同显示屏按钮:\r\n0关井 close_well\r\n1开井 open_well',
`timer_able1` tinyint NULL DEFAULT NULL COMMENT '定时模式定时器1使能\r\n0禁止 disable\r\n1使能 enable',
`timer_able2` tinyint NULL DEFAULT NULL COMMENT '定时模式定时器2使能\r\n0禁止 disable\r\n1使能 ',
`timer_open1` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timer_close1` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timer_open2` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timer_close2` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timing_open` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timing_close` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timing_delay` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`presource` tinyint NULL DEFAULT NULL COMMENT '触发压力源:\r\n0油压 oil_pressure\r\n1套压 cas_pressure\r\n2输压 pre_pressure\r\n3差压 diff_pressure',
`pressure_open` decimal(10, 2) NULL DEFAULT NULL COMMENT '开井压力单位Mpa*100',
`pressure_close` decimal(10, 2) NULL DEFAULT NULL COMMENT '关井压力: 单位Mpa*100',
`trigger_type` tinyint NULL DEFAULT NULL COMMENT '压力触发类型:\n0大于开井压力开井、小于关井压力关井 gre_less\n1大于开井压力开井、大于关井压力关井 gre_gre\n2小于开井压力开井、小于关井压力关井 less_less\n3小于开井压力开井、大于关井压力关井 less_gre',
`stability_time` smallint NULL DEFAULT NULL COMMENT '传感器压力稳定时间:秒',
`max_open_well` time NULL DEFAULT NULL COMMENT '时0~999分0~59秒0~59',
`max_close_well` time NULL DEFAULT NULL COMMENT '时0~999分0~59秒0~59',
`min_open_well` time NULL DEFAULT NULL COMMENT '时0~999分0~59秒0~59',
`min_close_well` time NULL DEFAULT NULL COMMENT '时0~999分0~59秒0~59',
`prespro_tect` tinyint NULL DEFAULT NULL COMMENT '开井压力保护使能:\r\n0禁止 disable\r\n1使能 enable',
`prespro_source` tinyint NULL DEFAULT NULL COMMENT '开井压力保护源:\r\n0油压 oil_pressure\r\n1套压 cas_pressure\r\n2输压 pre_pressure\r\n3差压 diff_pressure',
`open_well_limit_max` decimal(10, 2) NULL DEFAULT NULL COMMENT '开井压力限制上限单位MPa *100',
`open_well_limit_min` decimal(10, 2) NULL DEFAULT NULL COMMENT '开井压力限制下限单位MPa *100',
`plug_init_status` tinyint NULL DEFAULT NULL COMMENT '柱塞模式初始气井状态:\r\n0关井 close_well\r\n1开井 open_well',
`plug_sustain_time` time NULL DEFAULT NULL COMMENT '时0~999分0~59秒0~59',
`plug_close_time` time NULL DEFAULT NULL COMMENT '时0~999分0~59秒0~59',
`tp_init_status` tinyint NULL DEFAULT NULL COMMENT '时压模式初始气井状态:\r\n0关井 close_well\r\n1开井 open_well',
`tp_open_source` tinyint NULL DEFAULT NULL COMMENT '时压模式开井源:\r\n0油压 oil_pressure\r\n1套压 cas_pressure\r\n2输压 pre_pressure\r\n3差压 diff_pressure\r\n4时间 time_pressure',
`tp_open_trigger` tinyint NULL DEFAULT NULL COMMENT '时压模式开井压力触发模式:\r\n0大于压力触发\r\n1小于压力触发',
`tp_open_pressure` tinyint NULL DEFAULT NULL COMMENT '时压模式开井压力',
`tp_open_time` time NULL DEFAULT NULL COMMENT '时0~999分0~59秒0~59',
`tp_close_source` tinyint NULL DEFAULT NULL COMMENT '时压模式关井源:\r\n0油压 oil_pressure\r\n1套压 cas_pressure\r\n2输压 pre_pressure\r\n3差压 diff_pressure\r\n4时间 time_pressure',
`tp_close_trigger` tinyint NULL DEFAULT NULL COMMENT '时压模式关井压力触发模式:\r\n0大于压力触发 max_pressure\r\n1小于压力触发 min_pressure',
`tp_close_pressure` smallint NULL DEFAULT NULL COMMENT '时压模式关井压力',
`tp_close_time` time NULL DEFAULT NULL COMMENT '时0~999分0~59秒0~59',
`tp_stability_time` tinyint NULL DEFAULT NULL COMMENT '时压模式压力稳定时长:秒',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `udx_device_create_time` (`device_id` ASC, `created_time` ASC) USING BTREE COMMENT '设备采集数据唯一键'
) ENGINE = InnoDB COMMENT = '设备ID1的采集数据';

View File

@ -0,0 +1,14 @@
INSERT INTO `$TableName$` (`id`, `device_id`, `created_time`, `collection_time`, `receive_time`, `status`,
`device_time`, `run_mode`, `gas_status`, `plug_status`, `status_start_time`,
`status_end_time`, `oil_pressure`, `cas_pressure`, `pre_pressure`, `pipe_pressure`,
`liquid_level`, `temperature`, `humidity`, `op_mode`, `well_ctl`, `timer_able1`,
`timer_able2`, `timer_open1`, `timer_close1`, `timer_open2`, `timer_close2`,
`timing_open`, `timing_close`, `timing_delay`, `presource`, `pressure_open`,
`pressure_close`, `trigger_type`, `stability_time`, `max_open_well`, `max_close_well`,
`min_open_well`, `min_close_well`, `prespro_tect`, `prespro_source`,
`open_well_limit_max`, `open_well_limit_min`, `plug_init_status`, `plug_sustain_time`,
`plug_close_time`, `tp_init_status`, `tp_open_source`, `tp_open_trigger`,
`tp_open_pressure`, `tp_open_time`, `tp_close_source`, `tp_close_trigger`,
`tp_close_pressure`, `tp_close_time`, `tp_stability_time`)
VALUES (?, ?, NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);

View File

@ -1,13 +1,32 @@
package com.isu.gaswellwatch;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.ResourceUtils;
@SpringBootTest
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
//@SpringBootTest
class GasWellWatchApplicationTests {
@Test
void contextLoads() {
void contextLoads() throws Exception {
String fileName = "sql/CREATE_KNPCV1.sql";
System.out.println(ResourceUtils.getURL(fileName));
System.out.println(getResource(fileName));
}
public static String getResource(String classPath) {
try {
File file = ResourceUtils.getFile("classpath:" + classPath);
return IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}