Merge remote-tracking branch 'origin/develop' into develop

This commit is contained in:
qinjie 2024-11-26 03:13:02 +08:00
commit fb3470ae67
25 changed files with 995 additions and 130 deletions

View File

@ -0,0 +1,40 @@
package com.isu.gaswellwatch.modbus.data;
import lombok.Getter;
import java.util.HashMap;
import java.util.Map;
@Getter
public enum ModbusDeviceTypeEnum {
KNPCV1("4", PersistenceHandler.KNPCV1_MODBUS_TYPE),
ETC("5", PersistenceHandler.ETC_MODBUS_TYPE),
SCSS("6", PersistenceHandler.SCSS_MODBUS_TYPE);
private final String id;
private final String code;
ModbusDeviceTypeEnum(String id, String code) {
this.id = id;
this.code = code;
}
@Override
public String toString() {
return getCode();
}
public static String getCodeById(String id) {
return enumMap.get(id);
}
private static final Map<String, String> enumMap = new HashMap<String, String>();
static {
for (ModbusDeviceTypeEnum type : ModbusDeviceTypeEnum.values()) {
enumMap.put(type.id, type.code);
}
}
}

View File

@ -1,5 +1,18 @@
package com.isu.gaswellwatch.modbus.data;
import cn.hutool.core.map.MapUtil;
import org.apache.commons.io.IOUtils;
import org.springframework.util.ResourceUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Map;
import java.util.Objects;
/**
* 数据持久化处理器
*
@ -7,11 +20,35 @@ package com.isu.gaswellwatch.modbus.data;
* 2024/11/23 11:53
*/
public interface PersistenceHandler {
public static final String MODBUS_DEVICE_TYPE = "KNPCV1";
String KNPCV1_MODBUS_TYPE = "knpc";
String ETC_MODBUS_TYPE = "etc";
String SCSS_MODBUS_TYPE = "scss";
String DEVICE_INFO_CACHE = "info:device:";
String DEVICE_DATA_CACHE = "data:device:";
void createTable(String tableName, Long deviceId);
void insert(String tableName, String cacheKey);
default void setValue(PreparedStatement ps, Map<String, Object> row, int index, String key, int sqlType) throws SQLException {
String value = MapUtil.getStr(row, key);
if (Objects.isNull(value)) {
ps.setNull(index, sqlType);
} else {
ps.setObject(index, value);
}
}
static String getResource(String classPath) {
try {
File file = ResourceUtils.getFile("classpath:" + classPath);
return IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,8 +1,7 @@
package com.isu.gaswellwatch.modbus.data.impl;
package com.isu.gaswellwatch.modbus.data;
import cn.hutool.core.map.MapUtil;
import com.isu.gaswellwatch.entity.Response;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
@ -34,8 +33,10 @@ import java.util.stream.Collectors;
@SuppressWarnings("all")
public class Redis2DBPersistenceService {
private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA" +
" IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'";
public static final String DEFAULT_DATA_TABLE = "t_device_data_";
private static final String DEVICE_INFO_SQL = "SELECT * from device where id = ";
private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA"
+ " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'";
@Resource
private JdbcTemplate jdbcTemplate;
@Resource(name = "stringRedisTemplate")
@ -65,11 +66,20 @@ public class Redis2DBPersistenceService {
continue;
}
if (Objects.nonNull(idGatewayMappingMap)) {
operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId,
"online", String.valueOf(idGatewayMappingMap.containsKey(deviceId)));
operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online",
String.valueOf(idGatewayMappingMap.containsKey(deviceId)));
}
persistenceHandler = persistenceHandlerMap.get("KNPCV1");
tableName = "t_device_data_" + deviceId;
String modbusDeviceTypeId = (String) operations.get(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "product");
if (StringUtils.isEmpty(modbusDeviceTypeId)) {
Map<String, Object> deviceInfo = this.jdbcTemplate.queryForMap(DEVICE_INFO_SQL + deviceId);
Map<String, Object> cacheDeviceInfo = new HashMap<>(deviceInfo.size());
deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value)));
modbusDeviceTypeId = (String) cacheDeviceInfo.get("product");
operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo);
}
persistenceHandler = persistenceHandlerMap.get(ModbusDeviceTypeEnum.getCodeById(modbusDeviceTypeId));
tableName = DEFAULT_DATA_TABLE + deviceId;
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
if (ObjectUtils.isEmpty(existsTableList)
|| ObjectUtils.isEmpty(existsTableList.get(0))
@ -84,18 +94,14 @@ public class Redis2DBPersistenceService {
private Map<Long, String> getOnlineGateway() {
try {
RequestEntity request = RequestEntity.get("http://127.0.0.1:9999/modbus-tcp/online").build();
ResponseEntity<Response<List<String>>> response = this.restTemplate.exchange(request,
new ParameterizedTypeReference<Response<List<String>>>() {
ResponseEntity<Response<List<String>>> response = this.restTemplate.exchange(request, new ParameterizedTypeReference<Response<List<String>>>() {
});
if (Objects.isNull(response)
|| Objects.isNull(response.getBody())
|| ObjectUtils.isEmpty(response.getBody().getData())) {
if (Objects.isNull(response) || Objects.isNull(response.getBody()) || ObjectUtils.isEmpty(response.getBody().getData())) {
return null;
}
List<Map<String, Object>> idGatewayMappingList = this.jdbcTemplate
.queryForList("select id, gateway_sn from devices where gateway_sn in (" +
response.getBody().getData().stream().map(value -> "'" + value + "'")
.collect(Collectors.joining(",")) + ")");
List<Map<String, Object>> idGatewayMappingList = this.jdbcTemplate.queryForList("select id, gateway_sn" +
" from device where gateway_sn in (" +
response.getBody().getData().stream().map(value -> "'" + value + "'").collect(Collectors.joining(",")) + ")");
if (ObjectUtils.isEmpty(idGatewayMappingList)) {
return null;
}

View File

@ -13,6 +13,7 @@ import java.util.Map;
* 2024/11/23 11:20
*/
public interface DecodeHandler {
String DECODE_NAME = "DecodeHandler";
Logger logger = LoggerFactory.getLogger("com.isu.gaswellwatch.decode");

View File

@ -0,0 +1,35 @@
package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import java.util.Collection;
import java.util.Map;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/25 16:31
*/
public class ComposeDecodeHandler implements DecodeHandler {
public final Collection<DecodeHandler> collection;
public ComposeDecodeHandler(Collection<DecodeHandler> collection) {
this.collection = collection;
}
@Override
public String decode(Map<String, Object> commandPointMap, String value) {
for (DecodeHandler handler : this.collection) {
value = handler.decode(commandPointMap, value);
}
return value;
}
@Override
public void decode(Map<String, Object> commandPointMap, ModbusMessage.MessagePoint point) {
for (DecodeHandler handler : this.collection) {
handler.decode(commandPointMap, point);
}
}
}

View File

@ -3,7 +3,6 @@ package com.isu.gaswellwatch.modbus.data.decode.impl;
import cn.hutool.core.map.MapUtil;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
@ -23,10 +22,10 @@ import java.util.Map;
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/24 21:23
*/
@Slf4j
@SuppressWarnings("all")
@Component("factorDecodeHandler")
@Component(FactorDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
public class FactorDecodeHandler implements DecodeHandler {
public static final String NAME = "factor";
@Override
public String decode(Map<String, Object> commandPointMap, String value) {
@ -35,11 +34,11 @@ public class FactorDecodeHandler implements DecodeHandler {
@Override
public void decode(Map<String, Object> commandPointMap, ModbusMessage.MessagePoint point) {
point.setValue(decode(commandPointMap, point.getParseValue()));
point.setValue(this.decode(commandPointMap, point.getParseValue()));
String factorStr = MapUtil.getStr(commandPointMap, "factor");
if (StringUtils.isNotBlank(factorStr) && StringUtils.isNotBlank(point.getParseValue())) {
BigDecimal factor = new BigDecimal(factorStr);
BigDecimal value = new BigDecimal(point.getParseValue());
BigDecimal value = new BigDecimal(point.getValue());
int precision = MapUtil.getInt(commandPointMap, "precision", 0);
switch (factor.compareTo(BigDecimal.ZERO)) {
case -1 -> point.setValue(value.multiply(factor.abs()).toString());

View File

@ -0,0 +1,30 @@
package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 高低位4字节解析器
*
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/25 16:36
*/
@Component(HighLowBinDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
public class HighLowBinDecodeHandler implements DecodeHandler {
public static final String NAME = "highLowBin";
@Override
public String decode(Map<String, Object> commandPointMap, String value) {
return String.valueOf(Integer.parseInt(value, 16));
}
@Override
public void decode(Map<String, Object> commandPointMap, ModbusMessage.MessagePoint point) {
throw new RuntimeException("not supported");
}
}

View File

@ -2,7 +2,6 @@ package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
@ -15,10 +14,10 @@ import java.util.Map;
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/25 11:51
*/
@Slf4j
@SuppressWarnings("all")
@Component("localDateDecodeHandler")
@Component(LocalDateDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
public class LocalDateDecodeHandler implements DecodeHandler {
public static final String NAME = "localDate";
public static final DateTimeFormatter IN_FORMATTER = DateTimeFormatter.ofPattern("yyyy-M-d");
public static final DateTimeFormatter OUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
@ -30,7 +29,7 @@ public class LocalDateDecodeHandler implements DecodeHandler {
@Override
public void decode(Map<String, Object> commandPointMap, ModbusMessage.MessagePoint point) {
point.setValue(decode(commandPointMap, point.getParseValue()));
point.setValue(this.decode(commandPointMap, point.getValue()));
}
}

View File

@ -2,7 +2,6 @@ package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@ -15,10 +14,10 @@ import java.util.Map;
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/24 21:34
*/
@Slf4j
@SuppressWarnings("all")
@Component("localDateTimeDecodeHandler")
@Component(LocalDateTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
public class LocalDateTimeDecodeHandler implements DecodeHandler {
public static final String NAME = "fullYearLocalDateTime";
public static final DateTimeFormatter IN_FORMATTER = DateTimeFormatter.ofPattern("yyyy-M-d H:m:s");
public static final DateTimeFormatter OUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@ -30,7 +29,7 @@ public class LocalDateTimeDecodeHandler implements DecodeHandler {
@Override
public void decode(Map<String, Object> commandPointMap, ModbusMessage.MessagePoint point) {
point.setValue(decode(commandPointMap, point.getParseValue()));
point.setValue(this.decode(commandPointMap, point.getValue()));
}
}

View File

@ -2,7 +2,6 @@ package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@ -15,10 +14,9 @@ import java.util.Map;
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/25 11:58
*/
@Slf4j
@SuppressWarnings("all")
@Component("localTimeDecodeHandler")
@Component(LocalTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
public class LocalTimeDecodeHandler implements DecodeHandler {
public static final String NAME = "localTime";
public static final DateTimeFormatter IN_FORMATTER = DateTimeFormatter.ofPattern("H:m:s");
public static final DateTimeFormatter OUT_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");
@ -30,7 +28,7 @@ public class LocalTimeDecodeHandler implements DecodeHandler {
@Override
public void decode(Map<String, Object> commandPointMap, ModbusMessage.MessagePoint point) {
point.setValue(decode(commandPointMap, point.getParseValue()));
point.setValue(this.decode(commandPointMap, point.getValue()));
}
}

View File

@ -0,0 +1,34 @@
package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import com.isu.gaswellwatch.utils.ReverseComplementCodeUtils;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 有符号整数转换
*
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/25 16:24
*/
@Component(SignedNumberDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
public class SignedNumberDecodeHandler implements DecodeHandler {
public static final String NAME = "signedNumber";
@Override
public String decode(Map<String, Object> commandPointMap, String value) {
int decimalNumber = Integer.parseInt(value, 16);
String binaryNumber = Integer.toBinaryString(decimalNumber);
binaryNumber = ReverseComplementCodeUtils.binOriginalToReverse(binaryNumber);
binaryNumber = ReverseComplementCodeUtils.binReverseToComplement(binaryNumber, "1");
return ReverseComplementCodeUtils.binComplementToDec(binaryNumber);
}
@Override
public void decode(Map<String, Object> commandPointMap, ModbusMessage.MessagePoint point) {
point.setValue(this.decode(commandPointMap, point.getParseValue()));
}
}

View File

@ -0,0 +1,35 @@
package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
/**
* 本地日期时间类型解析器
*
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/24 21:34
*/
@Component(SimpleLocalDateTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
public class SimpleLocalDateTimeDecodeHandler implements DecodeHandler {
public static final String NAME = "simpleLocalDateTime";
public static final DateTimeFormatter IN_FORMATTER = DateTimeFormatter.ofPattern("yy-M-d H:m:s");
public static final DateTimeFormatter OUT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public String decode(Map<String, Object> commandPointMap, String value) {
return LocalDateTime.parse(value, IN_FORMATTER).format(OUT_FORMATTER);
}
@Override
public void decode(Map<String, Object> commandPointMap, ModbusMessage.MessagePoint point) {
point.setValue(this.decode(commandPointMap, point.getValue()));
}
}

View File

@ -2,7 +2,6 @@ package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
@ -17,10 +16,9 @@ import java.util.stream.Collectors;
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/24 21:38
*/
@Slf4j
@SuppressWarnings("all")
@Component("stringTimeDecodeHandler")
@Component(StringTimeDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
public class StringTimeDecodeHandler implements DecodeHandler {
public static final String NAME = "stringTime";
@Override
public String decode(Map<String, Object> commandPointMap, String value) {
@ -34,7 +32,7 @@ public class StringTimeDecodeHandler implements DecodeHandler {
@Override
public void decode(Map<String, Object> commandPointMap, ModbusMessage.MessagePoint point) {
point.setValue(decode(commandPointMap, point.getParseValue()));
point.setValue(this.decode(commandPointMap, point.getValue()));
}
}

View File

@ -0,0 +1,106 @@
package com.isu.gaswellwatch.modbus.data.impl;
import com.isu.gaswellwatch.config.SnowflakeConfig;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.stereotype.Component;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Map;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/24 17:33
*/
@Component(PersistenceHandler.ETC_MODBUS_TYPE)
public class EtcPersistenceHandler implements PersistenceHandler {
@Resource
private JdbcTemplate jdbcTemplate;
@Resource
private SnowflakeConfig snowflakeConfig;
@Resource(name = "stringRedisTemplate")
private RedisTemplate redisTemplate;
@Override
public void createTable(String tableName, Long deviceId) {
String createTableSQL = PersistenceHandler.getResource("sql/CREATE_ETC.sql");
createTableSQL = StringUtils.replace(createTableSQL, "$TableName$", tableName);
createTableSQL = StringUtils.replace(createTableSQL, "$DeviceId$", String.valueOf(deviceId));
this.jdbcTemplate.execute(createTableSQL);
}
@Override
public void insert(String tableName, String cacheKey) {
String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_ETC.sql");
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
Map<String, Object> row = this.redisTemplate.opsForHash().entries(cacheKey);
this.jdbcTemplate.execute(insertTableSQL, new PreparedStatementCallback<Object>() {
@Override
public Object doInPreparedStatement(PreparedStatement ps) throws SQLException {
ps.setLong(1, EtcPersistenceHandler.this.snowflakeConfig.snowflakeId());
EtcPersistenceHandler.this.setValue(ps, row, 2, "deviceId", Types.BIGINT);
EtcPersistenceHandler.this.setValue(ps, row, 3, "collectionTime", Types.TIMESTAMP);
EtcPersistenceHandler.this.setValue(ps, row, 4, "receiveTime", Types.TIMESTAMP);
EtcPersistenceHandler.this.setValue(ps, row, 5, "deviceTime", Types.TIMESTAMP);
EtcPersistenceHandler.this.setValue(ps, row, 6, "runMode", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 7, "wellStatus", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 8, "plugStatus", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 9, "statusOpenTime", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 10, "statusCloseTime", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 11, "oilPressure", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 12, "casPressure", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 13, "prePressure", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 14, "pipePressure", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 15, "liquidLevel", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 16, "temperature", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 17, "humidity", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 18, "opmode", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 19, "timer1", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 20, "timer2", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 21, "timer1Open", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 22, "timer1Close", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 23, "timer2Open", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 24, "timer2Close", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 25, "timingOpen", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 26, "timingClose", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 27, "timingCelay", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 28, "presource", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 29, "pressureOpen", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 30, "pressureClose", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 31, "triggerType", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 32, "stabilityTime", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 33, "maxOpenWell", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 34, "maxCloseWell", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 35, "minOpenWell", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 36, "minCloseWell", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 37, "presproTect", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 38, "presproSource", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 39, "openWellLimitMax", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 40, "openWellLimitMin", Types.DECIMAL);
EtcPersistenceHandler.this.setValue(ps, row, 41, "plugInitStatus", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 42, "plugSustainTime", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 43, "plugCloseTime", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 44, "tpInitStatus", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 45, "tpOpenSource", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 46, "tpOpenTrigger", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 47, "tpOpenPressure", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 48, "tpOpenTime", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 49, "tpCloseSource", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 50, "tpCloseTrigger", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 51, "tpClosePressure", Types.INTEGER);
EtcPersistenceHandler.this.setValue(ps, row, 52, "tpCloseTime", Types.VARCHAR);
EtcPersistenceHandler.this.setValue(ps, row, 53, "tpStabilityTime", Types.INTEGER);
return ps.executeUpdate();
}
});
}
}

View File

@ -1,32 +1,24 @@
package com.isu.gaswellwatch.modbus.data.impl;
import cn.hutool.core.map.MapUtil;
import com.isu.gaswellwatch.config.SnowflakeConfig;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import jakarta.annotation.Resource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.stereotype.Component;
import org.springframework.util.ResourceUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Map;
import java.util.Objects;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/24 17:33
*/
@Component(PersistenceHandler.MODBUS_DEVICE_TYPE)
@Component(PersistenceHandler.KNPCV1_MODBUS_TYPE)
public class Knpcv1PersistenceHandler implements PersistenceHandler {
@Resource
@ -38,7 +30,7 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler {
@Override
public void createTable(String tableName, Long deviceId) {
String createTableSQL = getResource("sql/CREATE_KNPCV1.sql");
String createTableSQL = PersistenceHandler.getResource("sql/CREATE_KNPCV1.sql");
createTableSQL = StringUtils.replace(createTableSQL, "$TableName$", tableName);
createTableSQL = StringUtils.replace(createTableSQL, "$DeviceId$", String.valueOf(deviceId));
this.jdbcTemplate.execute(createTableSQL);
@ -46,7 +38,7 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler {
@Override
public void insert(String tableName, String cacheKey) {
String insertTableSQL = getResource("sql/INSERT_KNPCV1.sql");
String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_KNPCV1.sql");
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
Map<String, Object> row = this.redisTemplate.opsForHash().entries(cacheKey);
@ -62,8 +54,8 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler {
Knpcv1PersistenceHandler.this.setValue(ps, row, 6, "runMode", Types.INTEGER);
Knpcv1PersistenceHandler.this.setValue(ps, row, 7, "wellStatus", Types.INTEGER);
Knpcv1PersistenceHandler.this.setValue(ps, row, 8, "plugStatus", Types.INTEGER);
Knpcv1PersistenceHandler.this.setValue(ps, row, 9, "statusOpenTime", Types.TIME);
Knpcv1PersistenceHandler.this.setValue(ps, row, 10, "statusCloseTime", Types.TIME);
Knpcv1PersistenceHandler.this.setValue(ps, row, 9, "statusOpenTime", Types.VARCHAR);
Knpcv1PersistenceHandler.this.setValue(ps, row, 10, "statusCloseTime", Types.VARCHAR);
Knpcv1PersistenceHandler.this.setValue(ps, row, 11, "oilPressure", Types.DECIMAL);
Knpcv1PersistenceHandler.this.setValue(ps, row, 12, "casPressure", Types.DECIMAL);
Knpcv1PersistenceHandler.this.setValue(ps, row, 13, "prePressure", Types.DECIMAL);
@ -74,10 +66,10 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler {
Knpcv1PersistenceHandler.this.setValue(ps, row, 18, "opmode", Types.INTEGER);
Knpcv1PersistenceHandler.this.setValue(ps, row, 19, "timer1", Types.INTEGER);
Knpcv1PersistenceHandler.this.setValue(ps, row, 20, "timer2", Types.INTEGER);
Knpcv1PersistenceHandler.this.setValue(ps, row, 21, "timer1Open", Types.TIME);
Knpcv1PersistenceHandler.this.setValue(ps, row, 22, "timer1Close", Types.TIME);
Knpcv1PersistenceHandler.this.setValue(ps, row, 23, "timer2Open", Types.TIME);
Knpcv1PersistenceHandler.this.setValue(ps, row, 24, "timer2Close", Types.TIME);
Knpcv1PersistenceHandler.this.setValue(ps, row, 21, "timer1Open", Types.VARCHAR);
Knpcv1PersistenceHandler.this.setValue(ps, row, 22, "timer1Close", Types.VARCHAR);
Knpcv1PersistenceHandler.this.setValue(ps, row, 23, "timer2Open", Types.VARCHAR);
Knpcv1PersistenceHandler.this.setValue(ps, row, 24, "timer2Close", Types.VARCHAR);
Knpcv1PersistenceHandler.this.setValue(ps, row, 25, "timingOpen", Types.VARCHAR);
Knpcv1PersistenceHandler.this.setValue(ps, row, 26, "timingClose", Types.VARCHAR);
Knpcv1PersistenceHandler.this.setValue(ps, row, 27, "timingCelay", Types.VARCHAR);
@ -110,25 +102,5 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler {
return ps.executeUpdate();
}
});
}
public void setValue(PreparedStatement ps, Map<String, Object> row, int index, String key, int sqlType) throws SQLException {
String value = MapUtil.getStr(row, key);
if (Objects.isNull(value)) {
ps.setNull(index, sqlType);
} else {
ps.setObject(index, value);
}
}
public static String getResource(String classPath) {
try {
File file = ResourceUtils.getFile("classpath:" + classPath);
return IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,138 @@
package com.isu.gaswellwatch.modbus.data.impl;
import com.isu.gaswellwatch.config.SnowflakeConfig;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.stereotype.Component;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Map;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/24 17:33
*/
@Component(PersistenceHandler.SCSS_MODBUS_TYPE)
public class ScssPersistenceHandler implements PersistenceHandler {
@Resource
private JdbcTemplate jdbcTemplate;
@Resource
private SnowflakeConfig snowflakeConfig;
@Resource(name = "stringRedisTemplate")
private RedisTemplate redisTemplate;
@Override
public void createTable(String tableName, Long deviceId) {
String createTableSQL = PersistenceHandler.getResource("sql/CREATE_SCSS.sql");
createTableSQL = StringUtils.replace(createTableSQL, "$TableName$", tableName);
createTableSQL = StringUtils.replace(createTableSQL, "$DeviceId$", String.valueOf(deviceId));
this.jdbcTemplate.execute(createTableSQL);
}
@Override
public void insert(String tableName, String cacheKey) {
String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_SCSS.sql");
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
Map<String, Object> row = this.redisTemplate.opsForHash().entries(cacheKey);
this.jdbcTemplate.execute(insertTableSQL, new PreparedStatementCallback<Object>() {
@Override
public Object doInPreparedStatement(PreparedStatement ps) throws SQLException {
ps.setLong(1, ScssPersistenceHandler.this.snowflakeConfig.snowflakeId());
ScssPersistenceHandler.this.setValue(ps, row, 2, "deviceId", Types.BIGINT);
ScssPersistenceHandler.this.setValue(ps, row, 3, "collectionTime", Types.TIMESTAMP);
ScssPersistenceHandler.this.setValue(ps, row, 4, "receiveTime", Types.TIMESTAMP);
ScssPersistenceHandler.this.setValue(ps, row, 5, "casPressure", Types.DECIMAL);
ScssPersistenceHandler.this.setValue(ps, row, 6, "oilPressure", Types.DECIMAL);
ScssPersistenceHandler.this.setValue(ps, row, 7, "firstSolenoidStatus", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 8, "batteryVoltage", Types.DECIMAL);
ScssPersistenceHandler.this.setValue(ps, row, 9, "solarVoltage", Types.DECIMAL);
ScssPersistenceHandler.this.setValue(ps, row, 10, "remainingTimeAction", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 11, "secondSolenoidStatus", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 12, "preTransmission", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 13, "internetTraffic", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 14, "loadFactor", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 15, "dataTime", Types.TIMESTAMP);
ScssPersistenceHandler.this.setValue(ps, row, 16, "showDelay", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 17, "openWellSamplingInterval", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 18, "closeWellSamplingInterval", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 19, "ctlModel", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 20, "minPressure", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 21, "maxPressure", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 22, "pressureMinVoltage", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 23, "pressureMaxVoltage", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 24, "oilMin", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 25, "oliMax", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 26, "oilMinVoltage", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 27, "oilMaxVoltage", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 28, "inputPressureMinValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 29, "inputPressureMaxValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 30, "inputVoltageMinValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 31, "inputVoltageMaxValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 32, "flowRateMinValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 33, "flowRateMaxValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 34, "flowVoltageMinValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 35, "flowVoltageMaxValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 36, "continuousSamplingIntervalDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 37, "sensorSignalEffectiveLevel", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 38, "pressureCompensationPolarityFlag", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 39, "pressureCompensationValueSetting", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 40, "oilPressureCompensationPolarityFlag", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 41, "oilPressureCompensationValueSetting", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 42, "inputPressureCompensationPolarityFlag", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 43, "inputPressureCompensationValueSetting", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 44, "flowCompensationPolarityFlag", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 45, "flowCompensationValueSetting", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 46, "wellOpenTimeTimestamp", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 47, "wellOpenPressureValue", Types.DECIMAL);
ScssPersistenceHandler.this.setValue(ps, row, 48, "wellOpenOilPressureValue", Types.DECIMAL);
ScssPersistenceHandler.this.setValue(ps, row, 49, "wellOpenLoadFactorPresets", Types.DECIMAL);
ScssPersistenceHandler.this.setValue(ps, row, 50, "wellCloseTimeTimestamp", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 51, "wellClosePressureValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 52, "wellCloseOilPressureValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 53, "wellCloseFlowValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 54, "minWellOpenTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 55, "maxWellOpenTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 56, "minWellCloseTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 57, "maxWellCloseTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 58, "pressureStabilizationDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 59, "flowStabilizationDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 60, "loadFactorStabilizationDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 61, "plungerDelayDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 62, "plungerRiseDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 63, "continuosFlowDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 64, "wellCloseTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 65, "wellCloseTimeNotReachedDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 66, "wellCloseNotReachedCountValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 67, "plungerDelayDurationRepeat", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 68, "targetTimeTimestamp", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 69, "targetTimeRangeValue", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 70, "continuosFlowIncreaseDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 71, "continuosFlowDecreaseDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 72, "wellCloseIncreaseDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 73, "wellCloseDecreaseDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 74, "minWellCloseTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 75, "maxWellCloseTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 76, "minContinuosFlowTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 77, "maxContinuosFlowTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 78, "minWellOpenTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 79, "maxWellOpenTimeDuration", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 80, "wellOpenPressureValueAtOpen", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 81, "wellOpenOilPressureValueAtOpen", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 82, "wellOpenLoadFactorPresetsAtOpen", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 83, "wellClosePressureValueAtClose", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 84, "wellCloseOilPressureValueAtClose", Types.INTEGER);
ScssPersistenceHandler.this.setValue(ps, row, 85, "wellCloseFlowValueAtClose", Types.INTEGER);
return ps.executeUpdate();
}
});
}
}

View File

@ -1,4 +1,4 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
package com.isu.gaswellwatch.modbus.data.listener;
import com.isu.gaswellwatch.utils.SpringUtil;
import org.springframework.amqp.core.BatchMessageListener;

View File

@ -1,10 +1,11 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
package com.isu.gaswellwatch.modbus.data.listener;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 0:32
*/
import cn.hutool.core.map.MapUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@ -12,17 +13,22 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
public class DynamicRabbitListener implements ApplicationRunner {
private static final String GATEWAY_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d ORDER BY d.id ";
private final AmqpAdmin amqpAdmin;
private final JdbcTemplate jdbcTemplate;
private final ComposeModbusMessageListener composeListener;
private final SimpleMessageListenerContainer listenerContainer;
public DynamicRabbitListener(ConnectionFactory connectionFactory) {
public DynamicRabbitListener(ConnectionFactory connectionFactory, JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.amqpAdmin = new RabbitAdmin(connectionFactory);
this.listenerContainer = new SimpleMessageListenerContainer();
this.listenerContainer.setConnectionFactory(connectionFactory);
@ -35,17 +41,14 @@ public class DynamicRabbitListener implements ApplicationRunner {
public void run(ApplicationArguments args) throws Exception {
this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener());
// TODO 根据设备自动绑定队列
this.addListenerQueue("/modbus/device/4B454E454E4731343030303030333538/collect");
this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item -> {
this.addListenerQueue(String.format("/modbus/device/%s/collect", MapUtil.getStr(item, "identifier")), null, null);
});
}
// @RabbitListener(queues = "/modbus/device/4B454E454E4731343030303030333538/collect")
// public void modbusMessage(Message message) {
// System.out.println("modbusMessage:" + new String(message.getBody()));
// }
public void addListenerQueue(String queueName) {
this.addListenerQueue(queueName, null, null);
public void addListenerQueue(String gatewaySn) {
this.addListenerQueue(String.format("/modbus/device/%s/collect", gatewaySn), null, null);
}
public void addListenerQueue(String queueName, String exchangeName, String routingKey) {

View File

@ -1,4 +1,4 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
package com.isu.gaswellwatch.modbus.data.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.IOUtils;

View File

@ -1,10 +1,12 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
package com.isu.gaswellwatch.modbus.data.listener;
import cn.hutool.core.map.MapUtil;
import cn.hutool.json.JSONUtil;
import com.isu.gaswellwatch.modbus.data.ModbusMessage;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import com.isu.gaswellwatch.modbus.data.decode.impl.ComposeDecodeHandler;
import com.isu.gaswellwatch.modbus.data.decode.impl.HighLowBinDecodeHandler;
import com.isu.gaswellwatch.modbus.data.decode.impl.LocalDateTimeDecodeHandler;
import com.serotonin.modbus4j.code.FunctionCode;
import com.serotonin.modbus4j.msg.*;
@ -27,10 +29,7 @@ import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
@ -49,6 +48,9 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
private JdbcTemplate jdbcTemplate;
@Resource(name = "stringRedisTemplate")
private RedisTemplate redisTemplate;
@Resource(name = HighLowBinDecodeHandler.NAME + DecodeHandler.DECODE_NAME)
private DecodeHandler highLowBinDecodeHandler;
@Resource
private Map<String, DecodeHandler> decodeHandlerMap;
@ -85,12 +87,11 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
log.error("指令[{}]不存在,数据: {}", commandId, messageString);
continue;
}
this.redisTemplate.opsForValue()
.setIfAbsent(COMMAND_CACHE + commandId, JSONUtil.toJsonStr(commandMap), Duration.ofDays(1));
this.redisTemplate.opsForValue().setIfAbsent(COMMAND_CACHE + commandId, JSONUtil.toJsonStr(commandMap), Duration.ofDays(1));
}
try {
String address;
int index = MapUtil.getInt(commandMap, "start_address"), stepSize = 0;
int startAddress = MapUtil.getInt(commandMap, "start_address"), index = 0, stepSize = 0;
ByteQueue byteQueue = new ByteQueue(collectionMessage);
RtuMessageParser masterParser = new RtuMessageParser(true);
RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
@ -110,21 +111,22 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
case FunctionCode.READ_HOLDING_REGISTERS -> {
values = ((ReadHoldingRegistersResponse) modbusResponse).getShortData();
}
// case FunctionCode.READ_EXCEPTION_STATUS -> {
// values = new short[]{((ReadExceptionStatusResponse) modbusResponse).getExceptionStatus()};
// }
case FunctionCode.READ_EXCEPTION_STATUS -> {
values = new short[]{((ReadExceptionStatusResponse) modbusResponse).getExceptionStatus()};
}
default -> {
throw new RuntimeException("Funcetion code not supported: " + modbusResponse.getFunctionCode());
}
}
for (short value : values) {
stepSize = index * 4;
messagePointMap.put(StringUtils.leftPad(String.valueOf(index), 4, '0'),
messagePointMap.put(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'),
ModbusMessage.MessagePoint.builder()
.parseValue(String.valueOf(value))
.originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize))
.build());
index++;
startAddress++;
}
} catch (Exception e) {
log.error("初始数据解析异常: {}", messageString, e);
@ -168,11 +170,11 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
String deviceDataCacheName = PersistenceHandler.DEVICE_DATA_CACHE + modbusMessage.getDeviceId();
HashOperations<String, Object, Object> hashOperations = this.redisTemplate.opsForHash();
hashOperations.put(deviceDataCacheName, "deviceId", String.valueOf(modbusMessage.getDeviceId()));
hashOperations.put(deviceDataCacheName, "receiveTime", LocalDateTime.ofInstant(Instant
.ofEpochMilli(modbusMessage.getReceiveTime()), ZoneOffset.systemDefault())
hashOperations.put(deviceDataCacheName, "receiveTime", LocalDateTime
.ofInstant(Instant.ofEpochMilli(modbusMessage.getReceiveTime()), ZoneOffset.systemDefault())
.format(LocalDateTimeDecodeHandler.OUT_FORMATTER));
hashOperations.put(deviceDataCacheName, "collectionTime", LocalDateTime.ofInstant(Instant
.ofEpochMilli(modbusMessage.getCollectionTime()), ZoneOffset.systemDefault())
hashOperations.put(deviceDataCacheName, "collectionTime", LocalDateTime
.ofInstant(Instant.ofEpochMilli(modbusMessage.getCollectionTime()), ZoneOffset.systemDefault())
.format(LocalDateTimeDecodeHandler.OUT_FORMATTER));
for (Map<String, Object> point : commandPointList) {
fieldName = MapUtil.getStr(point, "field");
@ -180,13 +182,17 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
decodeName = MapUtil.getStr(point, "decode_name");
startAddress = MapUtil.getInt(point, "start_address");
String value;
if (stepSize <= 1) {
if (StringUtils.equals(decodeName, HighLowBinDecodeHandler.NAME)) {
value = decodeHighLowCommandPoint(modbusMessage.getMessagePointMap(), decodeName, point, startAddress);
} else if (StringUtils.contains(decodeName, ",") && StringUtils.startsWith(decodeName, HighLowBinDecodeHandler.NAME)) {
value = decodeHighLowStepCommandPoint(modbusMessage.getMessagePointMap(), decodeName, point, startAddress, stepSize / 2);
} else if (stepSize <= 1) {
messagePoint = modbusMessage.getMessagePointMap()
.get(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'));
decodeMessage(decodeName, point, messagePoint);
value = messagePoint.getValue();
} else {
value = decodeCommandPoint(modbusMessage.getMessagePointMap(), decodeName, point, startAddress, stepSize);
value = decodeStepCommandPoint(modbusMessage.getMessagePointMap(), decodeName, point, startAddress, stepSize);
}
hashOperations.put(deviceDataCacheName, fieldName, value);
}
@ -194,14 +200,44 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
private void decodeMessage(String decodeName, Map<String, Object> commandPointMap, ModbusMessage.MessagePoint messagePoint) {
if (StringUtils.isNotBlank(decodeName)) {
DecodeHandler decodeHandler = this.decodeHandlerMap.get(decodeName + "DecodeHandler");
DecodeHandler decodeHandler;
if (StringUtils.contains(decodeName, ",")) {
decodeHandler = new ComposeDecodeHandler(Arrays.stream(StringUtils.split(decodeName, ","))
.map(name -> this.decodeHandlerMap.get(name + DecodeHandler.DECODE_NAME)).filter(Objects::nonNull).toList());
} else {
decodeHandler = this.decodeHandlerMap.get(decodeName + DecodeHandler.DECODE_NAME);
}
if (Objects.nonNull(decodeHandler)) {
decodeHandler.decode(commandPointMap, messagePoint);
}
}
}
private String decodeCommandPoint(Map<String, ModbusMessage.MessagePoint> pointMap, String decodeName,
private String decodeMessage(String decodeName, Map<String, Object> commandPointMap, String value) {
if (StringUtils.isNotBlank(decodeName)) {
DecodeHandler decodeHandler;
if (StringUtils.contains(decodeName, ",")) {
List<DecodeHandler> handlers = Arrays.stream(StringUtils.split(decodeName, ","))
.filter(name -> !StringUtils.equals(name, HighLowBinDecodeHandler.NAME))
.map(name -> this.decodeHandlerMap.get(name + DecodeHandler.DECODE_NAME))
.filter(Objects::nonNull)
.toList();
if (handlers.size() == 1) {
decodeHandler = handlers.get(0);
} else {
decodeHandler = new ComposeDecodeHandler(handlers);
}
} else {
decodeHandler = this.decodeHandlerMap.get(decodeName + DecodeHandler.DECODE_NAME);
}
if (Objects.nonNull(decodeHandler)) {
return decodeHandler.decode(commandPointMap, value);
}
}
return value;
}
private String decodeStepCommandPoint(Map<String, ModbusMessage.MessagePoint> pointMap, String decodeName,
Map<String, Object> commandPointMap, int startAddress, int stepSize) {
String[] values = new String[stepSize];
ModbusMessage.MessagePoint messagePoint;
@ -210,14 +246,36 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
values[i] = messagePoint.getValue();
}
String format = MapUtil.getStr(commandPointMap, "format");
String result = StringUtils.isBlank(format) ? StringUtils.join(values) : String.format(format, values);
if (StringUtils.isNotBlank(decodeName)) {
DecodeHandler decodeHandler = this.decodeHandlerMap.get(decodeName + "DecodeHandler");
if (Objects.nonNull(decodeHandler)) {
return decodeHandler.decode(commandPointMap, result);
}
}
return result;
return decodeMessage(decodeName, commandPointMap,
StringUtils.isBlank(format) ? StringUtils.join(values) : String.format(format, values));
}
private String decodeHighLowCommandPoint(Map<String, ModbusMessage.MessagePoint> pointMap, String decodeName,
Map<String, Object> commandPointMap, int startAddress) {
String[] values = new String[2];
ModbusMessage.MessagePoint messagePoint;
for (int i = 0; i < 2; i++) {
messagePoint = pointMap.get(StringUtils.leftPad(String.valueOf(startAddress + i), 4, '0'));
values[i] = messagePoint.getOriginalValue();
}
return highLowBinDecodeHandler.decode(commandPointMap, StringUtils.join(values));
}
private String decodeHighLowStepCommandPoint(Map<String, ModbusMessage.MessagePoint> pointMap, String decodeName,
Map<String, Object> commandPointMap, int startAddress, int stepSize) {
String[] values = new String[stepSize];
ModbusMessage.MessagePoint highPoint;
ModbusMessage.MessagePoint lowPoint;
for (int i = 0; i < stepSize; i++) {
highPoint = pointMap.get(StringUtils.leftPad(String.valueOf(startAddress + i * 2), 4, '0'));
lowPoint = pointMap.get(StringUtils.leftPad(String.valueOf(startAddress + i * 2 + 1), 4, '0'));
values[i] = highLowBinDecodeHandler.decode(commandPointMap, highPoint.getOriginalValue() + lowPoint.getOriginalValue());
}
String format = MapUtil.getStr(commandPointMap, "format");
return decodeMessage(decodeName, commandPointMap,
StringUtils.isBlank(format) ? StringUtils.join(values) : String.format(format, values));
}
}

View File

@ -0,0 +1,102 @@
package com.isu.gaswellwatch.utils;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/25 16:07
*/
public class ReverseComplementCodeUtils {
/**
* 原码转反码
*
* @param source 二进制原码
* @return
*/
public static String binOriginalToReverse(String source) {
if (source.startsWith("0")) {
return source;
} else {
StringBuffer sbf = new StringBuffer();
sbf.append("1");
String f_str = source.substring(1);
for (int i = 0; i < f_str.length(); i++) {
String s_str = String.valueOf(f_str.charAt(i));
if (s_str.equals("0")) {
sbf.append("1");
} else if (s_str.equals("1")) {
sbf.append("0");
}
}
return sbf.toString();
}
}
/**
* 反码转补码, 默认补1
*
* @param reverse 二进制反码字符串
* @return
*/
public static String binReverseToComplement(String reverse) {
return binReverseToComplement(reverse, "1");
}
/**
* 反码转补码
*
* @param reverse 二进制反码字符串
* @param complement 二进制补数字符串
* @return
*/
public static String binReverseToComplement(String reverse, String complement) {
if (reverse.startsWith("0")) {
return reverse;
}
StringBuilder sb = new StringBuilder();
int x = 0;
int y = 0;
int pre = 0;//进位
int sum = 0;//存储进位和另两个位的和
while (reverse.length() != complement.length()) {//将两个二进制的数位数补齐,在短的前面添0
if (reverse.length() > complement.length()) {
complement = "0" + complement;
} else {
reverse = "0" + reverse;
}
}
for (int i = reverse.length() - 1; i >= 0; i--) {
x = reverse.charAt(i) - '0';
y = complement.charAt(i) - '0';
sum = x + y + pre;//从低位做加法
if (sum >= 2) {
pre = 1;//进位
sb.append(sum - 2);
} else {
pre = 0;
sb.append(sum);
}
}
if (pre == 1) {
sb.append("1");
}
// 翻转返回
return sb.reverse().toString();
}
/**
* 补码转十进制
*
* @param
* @return
*/
public static String binComplementToDec(String complement) {
String h_number = complement.substring(1);
if (complement.startsWith("0")) {
return "" + Long.valueOf(h_number, 2);
} else if (complement.startsWith("1")) {
return "-" + Long.valueOf(h_number, 2);
}
return null;
}
}

View File

@ -9,8 +9,8 @@ CREATE TABLE `$TableName$`
`run_mode` smallint NULL DEFAULT NULL COMMENT '运行模式:\r\n0手动模式 \r\n1定时器模式 \r\n2计时器模式 \r\n3压力模式\r\n4柱塞模式\r\n5时压模式',
`gas_status` smallint NULL DEFAULT NULL COMMENT '气井状态:\r\n0关闭 \r\n1打开',
`plug_status` smallint NULL DEFAULT NULL COMMENT '柱塞状态:\r\n0离开\r\n1上升中\r\n2到达',
`status_start_time` time NULL DEFAULT NULL COMMENT '当前状态开始时间',
`status_end_time` time NULL DEFAULT NULL COMMENT '当前状态结束时间',
`status_start_time` varchar(10) NULL DEFAULT NULL COMMENT '当前状态开始时间',
`status_end_time` varchar(10) NULL DEFAULT NULL COMMENT '当前状态结束时间',
`oil_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '油压单位Mpa',
`cas_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '套压单位Mpa',
`pre_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '输压单位Mpa',
@ -21,10 +21,10 @@ CREATE TABLE `$TableName$`
`op_mode` smallint NULL DEFAULT NULL COMMENT '运行模式:\r\n0 手动模式 hand_mode\r\n1定时器模式 timer_mode\r\n2计时器模式 t2mode\r\n3压力模式 pressure_mode\r\n4柱塞模式 piston_mode\r\n5时压模式 tp_mode',
`timer_able1` smallint NULL DEFAULT NULL COMMENT '定时模式定时器1使能\r\n0禁止 disable\r\n1使能 enable',
`timer_able2` smallint NULL DEFAULT NULL COMMENT '定时模式定时器2使能\r\n0禁止 disable\r\n1使能 ',
`timer_open1` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timer_close1` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timer_open2` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timer_close2` time NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timer_open1` varchar(10) NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timer_close1` varchar(10) NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timer_open2` varchar(10) NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timer_close2` varchar(10) NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timing_open` varchar(10) NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timing_close` varchar(10) NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',
`timing_delay` varchar(10) NULL DEFAULT NULL COMMENT '时0~23分0~59秒0~59',

View File

@ -0,0 +1,91 @@
CREATE TABLE `$TableName$`
(
`id` bigint NOT NULL COMMENT '主键',
`device_id` int NOT NULL COMMENT '设备标识',
`created_time` datetime NOT NULL COMMENT '数据落库时间',
`collection_time` datetime NOT NULL COMMENT '采集指令下发时间',
`receive_time` datetime NOT NULL COMMENT '接收到数据时间',
`cas_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '套压',
`oil_pressure` decimal(10, 2) NULL DEFAULT NULL COMMENT '油压',
`first_solenoid_status` int NULL DEFAULT NULL COMMENT '当前第一个电磁阀状态',
`battery_voltage` decimal(10, 1) NULL DEFAULT NULL COMMENT '电池电压',
`solar_voltage` decimal(10, 1) NULL DEFAULT NULL COMMENT '太阳能电压',
`remaining_time_action` int NULL DEFAULT NULL COMMENT '动作剩余时间',
`second_solenoid_status` int NULL DEFAULT NULL COMMENT '第二个电磁阀状态',
`pre_transmission` int NULL DEFAULT NULL COMMENT '输压',
`internet_traffic` int NULL DEFAULT NULL COMMENT '流量',
`load_factor` int NULL DEFAULT NULL COMMENT '载荷因子',
`data_time` datetime NULL DEFAULT NULL COMMENT '日期时间',
`show_delay` int NULL DEFAULT NULL COMMENT '显示延时',
`open_well_sampling_interval` int NULL DEFAULT NULL COMMENT '开井采样间隔',
`close_well_sampling_interval` int NULL DEFAULT NULL COMMENT '关井采样间隔',
`ctl_model` int NULL DEFAULT NULL COMMENT '控制模式',
`min_pressure` int NULL DEFAULT NULL COMMENT '套压最小值',
`max_pressure` int NULL DEFAULT NULL COMMENT '套压最大值',
`pressure_min_voltage` int NULL DEFAULT NULL COMMENT '套压最小电压',
`pressure_max_voltage` int NULL DEFAULT NULL COMMENT '套压最大电压',
`oil_min` int NULL DEFAULT NULL COMMENT '油压最小值',
`oil_max` int NULL DEFAULT NULL COMMENT '油压最大值',
`oil_min_voltage` int NULL DEFAULT NULL COMMENT '油压最小电压',
`oil_max_voltage` int NULL DEFAULT NULL COMMENT '油压最大电压',
`input_pressure_min_value` int NULL DEFAULT NULL COMMENT '输压最小值',
`input_pressure_max_value` int NULL DEFAULT NULL COMMENT '输压最大值',
`input_voltage_min_value` int NULL DEFAULT NULL COMMENT '输压最小电压',
`input_voltage_max_value` int NULL DEFAULT NULL COMMENT '输压最大电压',
`flow_rate_min_value` int NULL DEFAULT NULL COMMENT '流量最小值',
`flow_rate_max_value` int NULL DEFAULT NULL COMMENT '流量最大值',
`flow_voltage_min_value` int NULL DEFAULT NULL COMMENT '流量最小电压',
`flow_voltage_max_value` int NULL DEFAULT NULL COMMENT '流量最大电压',
`continuous_sampling_interval_duration` int NULL DEFAULT NULL COMMENT '连续采样间隔',
`sensor_signal_effective_level` int NULL DEFAULT NULL COMMENT '到达传感器有效电平',
`pressure_compensation_polarity_flag` int NULL DEFAULT NULL COMMENT '套压补偿极性',
`pressure_compensation_value_setting` int NULL DEFAULT NULL COMMENT '套压补偿值',
`oil_pressure_compensation_polarity_flag` int NULL DEFAULT NULL COMMENT '油压补偿极性',
`oil_pressure_compensation_value_setting` int NULL DEFAULT NULL COMMENT '油压补偿值',
`input_pressure_compensation_polarity_flag` int NULL DEFAULT NULL COMMENT '输压补偿极性',
`input_pressure_compensation_value_setting` int NULL DEFAULT NULL COMMENT '输压补偿值',
`flow_compensation_polarity_flag` int NULL DEFAULT NULL COMMENT '流量补偿极性',
`flow_compensation_value_setting` int NULL DEFAULT NULL COMMENT '流量补偿值',
`well_open_time_timestamp` int NULL DEFAULT NULL COMMENT '开井时间',
`well_open_pressure_value` decimal(10, 2) NULL DEFAULT NULL COMMENT '开井套压',
`well_open_oil_pressure_value` decimal(10, 2) NULL DEFAULT NULL COMMENT '开井油压',
`well_open_load_factor_presets` decimal(10, 2) NULL DEFAULT NULL COMMENT '开井载荷因子预设值',
`well_close_time_timestamp` int NULL DEFAULT NULL COMMENT '关井时间时间戳',
`well_close_pressure_value` int NULL DEFAULT NULL COMMENT '关井压力值',
`well_close_oil_pressure_value` int NULL DEFAULT NULL COMMENT '关井油压值',
`well_close_flow_value` int NULL DEFAULT NULL COMMENT '关井流量值',
`min_well_open_time_duration` int NULL DEFAULT NULL COMMENT '最小开井时间持续时长',
`max_well_open_time_duration` int NULL DEFAULT NULL COMMENT '最大开井时间持续时长',
`min_well_close_time_duration` int NULL DEFAULT NULL COMMENT '最小关井时间持续时长',
`max_well_close_time_duration` int NULL DEFAULT NULL COMMENT '最大关井时间持续时长',
`pressure_stabilization_duration` int NULL DEFAULT NULL COMMENT '压力稳定持续时长',
`flow_stabilization_duration` int NULL DEFAULT NULL COMMENT '流量稳定持续时长',
`load_factor_stabilization_duration` int NULL DEFAULT NULL COMMENT '载荷因子稳定持续时长',
`plunger_delay_duration` int NULL DEFAULT NULL COMMENT '柱塞延迟时长',
`plunger_rise_duration` int NULL DEFAULT NULL COMMENT '柱塞上升时长',
`continuos_flow_duration` int NULL DEFAULT NULL COMMENT '连续流量持续时长',
`well_close_time_duration` int NULL DEFAULT NULL COMMENT '关井时间持续时长',
`well_close_time_not_reached_duration` int NULL DEFAULT NULL COMMENT '关井时间未达到持续时长',
`well_close_not_reached_count_value` int NULL DEFAULT NULL COMMENT '关井未达到次数值',
`plunger_delay_duration_repeat` int NULL DEFAULT NULL COMMENT '柱塞延迟时长重复次数',
`target_time_timestamp` int NULL DEFAULT NULL COMMENT '目标时间时间戳',
`target_time_range_value` int NULL DEFAULT NULL COMMENT '目标时间范围值',
`continuos_flow_increase_duration` int NULL DEFAULT NULL COMMENT '连续流量增加持续时长',
`continuos_flow_decrease_duration` int NULL DEFAULT NULL COMMENT '连续流量减少持续时长',
`well_close_increase_duration` int NULL DEFAULT NULL COMMENT '关井增加持续时长',
`well_close_decrease_duration` int NULL DEFAULT NULL COMMENT '关井减少持续时长',
`min_well_close_time_duration2` int NULL DEFAULT NULL COMMENT '最小关井时间持续时长',
`max_well_close_time_duration2` int NULL DEFAULT NULL COMMENT '最大关井时间持续时长',
`min_continous_flow_time_duration` int NULL DEFAULT NULL COMMENT '最小连续流量时间持续时长',
`max_continous_flow_time_duration` int NULL DEFAULT NULL COMMENT '最大连续流量时间持续时长',
`min_well_open_time_duration2` int NULL DEFAULT NULL COMMENT '最小开井时间持续时长',
`max_well_open_time_duration2` int NULL DEFAULT NULL COMMENT '最大开井时间持续时长',
`well_open_pressure_value_at_open` int NULL DEFAULT NULL COMMENT '开井时压力值(开井瞬间)',
`well_open_oil_pressure_value_at_open` int NULL DEFAULT NULL COMMENT '开井时油压值(开井瞬间)',
`well_open_load_factor_presets_at_open` int NULL DEFAULT NULL COMMENT '开井时载荷因子预设值(开井瞬间)',
`well_close_pressure_value_at_close` int NULL DEFAULT NULL COMMENT '关井时压力值(关井瞬间)',
`well_close_oil_ppressure_value_at_close` int NULL DEFAULT NULL COMMENT '关井时油压值(关井瞬间)',
`well_close_flow_value_at_close` int NULL DEFAULT NULL COMMENT '关井时流量值(关井瞬间)',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `udx_device_create_time` (`device_id` ASC, `collection_time` ASC) USING BTREE COMMENT '设备采集数据唯一键'
) ENGINE = InnoDB COMMENT = '设备ID$DeviceId$的采集数据'

View File

@ -0,0 +1,120 @@
INSERT INTO `$TableName$`(`id`, `device_id`, `created_time`, `collection_time`, `receive_time`, `cas_pressure`,
`oil_pressure`, `first_solenoid_status`, `battery_voltage`, `solar_voltage`,
`remaining_time_action`, `second_solenoid_status`, `pre_transmission`, `internet_traffic`,
`load_factor`, `data_time`, `show_delay`, `open_well_sampling_interval`,
`close_well_sampling_interval`, `ctl_model`, `min_pressure`,
`max_pressure`, `pressure_min_voltage`, `pressure_max_voltage`, `oil_min`, `oil_max`,
`oil_min_voltage`, `oil_max_voltage`, `input_pressure_min_value`, `input_pressure_max_value`,
`input_voltage_min_value`, `input_voltage_max_value`, `flow_rate_min_value`,
`flow_rate_max_value`,
`flow_voltage_min_value`, `flow_voltage_max_value`, `continuous_sampling_interval_duration`,
`sensor_signal_effective_level`, `pressure_compensation_polarity_flag`,
`pressure_compensation_value_setting`,
`oil_pressure_compensation_polarity_flag`, `oil_pressure_compensation_value_setting`,
`input_pressure_compensation_polarity_flag`, `input_pressure_compensation_value_setting`,
`flow_compensation_polarity_flag`, `flow_compensation_value_setting`,
`well_open_time_timestamp`,
`well_open_pressure_value`, `well_open_oil_pressure_value`, `well_open_load_factor_presets`,
`well_close_time_timestamp`, `well_close_pressure_value`, `well_close_oil_pressure_value`,
`well_close_flow_value`, `min_well_open_time_duration`, `max_well_open_time_duration`,
`min_well_close_time_duration`, `max_well_close_time_duration`,
`pressure_stabilization_duration`,
`flow_stabilization_duration`, `load_factor_stabilization_duration`, `plunger_delay_duration`,
`plunger_rise_duration`, `continuos_flow_duration`, `well_close_time_duration`,
`well_close_time_not_reached_duration`, `well_close_not_reached_count_value`,
`plunger_delay_duration_repeat`, `target_time_timestamp`, `target_time_range_value`,
`continuos_flow_increase_duration`, `continuos_flow_decrease_duration`,
`well_close_increase_duration`,
`well_close_decrease_duration`, `min_well_close_time_duration2`,
`max_well_close_time_duration2`,
`min_continous_flow_time_duration`, `max_continous_flow_time_duration`,
`min_well_open_time_duration2`,
`max_well_open_time_duration2`, `well_open_pressure_value_at_open`,
`well_open_oil_pressure_value_at_open`, `well_open_load_factor_presets_at_open`,
`well_close_pressure_value_at_close`, `well_close_oil_ppressure_value_at_close`,
`well_close_flow_value_at_close`)
VALUES (?, ?, NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),
cas_pressure=VALUES(cas_pressure),
oil_pressure=VALUES(oil_pressure),
first_solenoid_status=VALUES(first_solenoid_status),
battery_voltage=VALUES(battery_voltage),
solar_voltage=VALUES(solar_voltage),
remaining_time_action=VALUES(remaining_time_action),
second_solenoid_status=VALUES(second_solenoid_status),
pre_transmission=VALUES(pre_transmission),
internet_traffic=VALUES(internet_traffic),
load_factor=VALUES(load_factor),
data_time=VALUES(data_time),
show_delay=VALUES(show_delay),
open_well_sampling_interval=VALUES(open_well_sampling_interval),
close_well_sampling_interval=VALUES(close_well_sampling_interval),
ctl_model=VALUES(ctl_model),
min_pressure=VALUES(min_pressure),
max_pressure=VALUES(max_pressure),
pressure_min_voltage=VALUES(pressure_min_voltage),
pressure_max_voltage=VALUES(pressure_max_voltage),
oil_min=VALUES(oil_min),
oil_max=VALUES(oil_max),
oil_min_voltage=VALUES(oil_min_voltage),
oil_max_voltage=VALUES(oil_max_voltage),
input_pressure_min_value=VALUES(input_pressure_min_value),
input_pressure_max_value=VALUES(input_pressure_max_value),
input_voltage_min_value=VALUES(input_voltage_min_value),
input_voltage_max_value=VALUES(input_voltage_max_value),
flow_rate_min_value=VALUES(flow_rate_min_value),
flow_rate_max_value=VALUES(flow_rate_max_value),
flow_voltage_min_value=VALUES(flow_voltage_min_value),
flow_voltage_max_value=VALUES(flow_voltage_max_value),
continuous_sampling_interval_duration=VALUES(continuous_sampling_interval_duration),
sensor_signal_effective_level=VALUES(sensor_signal_effective_level),
pressure_compensation_polarity_flag=VALUES(pressure_compensation_polarity_flag),
pressure_compensation_value_setting=VALUES(pressure_compensation_value_setting),
oil_pressure_compensation_polarity_flag=VALUES(oil_pressure_compensation_polarity_flag),
oil_pressure_compensation_value_setting=VALUES(oil_pressure_compensation_value_setting),
input_pressure_compensation_polarity_flag=VALUES(input_pressure_compensation_polarity_flag),
input_pressure_compensation_value_setting=VALUES(input_pressure_compensation_value_setting),
flow_compensation_polarity_flag=VALUES(flow_compensation_polarity_flag),
flow_compensation_value_setting=VALUES(flow_compensation_value_setting),
well_open_time_timestamp=VALUES(well_open_time_timestamp),
well_open_pressure_value=VALUES(well_open_pressure_value),
well_open_oil_pressure_value=VALUES(well_open_oil_pressure_value),
well_open_load_factor_presets=VALUES(well_open_load_factor_presets),
well_close_time_timestamp=VALUES(well_close_time_timestamp),
well_close_pressure_value=VALUES(well_close_pressure_value),
well_close_oil_pressure_value=VALUES(well_close_oil_pressure_value),
well_close_flow_value=VALUES(well_close_flow_value),
min_well_open_time_duration=VALUES(min_well_open_time_duration),
max_well_open_time_duration=VALUES(max_well_open_time_duration),
min_well_close_time_duration=VALUES(min_well_close_time_duration),
max_well_close_time_duration=VALUES(max_well_close_time_duration),
pressure_stabilization_duration=VALUES(pressure_stabilization_duration),
flow_stabilization_duration=VALUES(flow_stabilization_duration),
load_factor_stabilization_duration=VALUES(load_factor_stabilization_duration),
plunger_delay_duration=VALUES(plunger_delay_duration),
plunger_rise_duration=VALUES(plunger_rise_duration),
continuos_flow_duration=VALUES(continuos_flow_duration),
well_close_time_duration=VALUES(well_close_time_duration),
well_close_time_not_reached_duration=VALUES(well_close_time_not_reached_duration),
well_close_not_reached_count_value=VALUES(well_close_not_reached_count_value),
plunger_delay_duration_repeat=VALUES(plunger_delay_duration_repeat),
target_time_timestamp=VALUES(target_time_timestamp),
target_time_range_value=VALUES(target_time_range_value),
continuos_flow_increase_duration=VALUES(continuos_flow_increase_duration),
continuos_flow_decrease_duration=VALUES(continuos_flow_decrease_duration),
well_close_increase_duration=VALUES(well_close_increase_duration),
well_close_decrease_duration=VALUES(well_close_decrease_duration),
min_well_close_time_duration2=VALUES(min_well_close_time_duration2),
max_well_close_time_duration2=VALUES(max_well_close_time_duration2),
min_continous_flow_time_duration=VALUES(min_continous_flow_time_duration),
max_continous_flow_time_duration=VALUES(max_continous_flow_time_duration),
min_well_open_time_duration2=VALUES(min_well_open_time_duration2),
max_well_open_time_duration2=VALUES(max_well_open_time_duration2),
well_open_pressure_value_at_open=VALUES(well_open_pressure_value_at_open),
well_open_oil_pressure_value_at_open=VALUES(well_open_oil_pressure_value_at_open),
well_open_load_factor_presets_at_open=VALUES(well_open_load_factor_presets_at_open),
well_close_pressure_value_at_close=VALUES(well_close_pressure_value_at_close),
well_close_oil_ppressure_value_at_close=VALUES(well_close_oil_ppressure_value_at_close),
well_close_flow_value_at_close=VALUES(well_close_flow_value_at_close)

View File

@ -0,0 +1,64 @@
package com.isu.gaswellwatch;
import com.serotonin.modbus4j.code.FunctionCode;
import com.serotonin.modbus4j.msg.*;
import com.serotonin.modbus4j.serial.rtu.RtuMessageParser;
import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse;
import com.serotonin.modbus4j.sero.util.queue.ByteQueue;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
public class ModbusTest {
public static void main(String[] args) throws Exception {
test();
}
public static void test() throws Exception {
// "01 03 40 0000 0960 0000 0258 0000 0258 0000 0064 000299A0 0000012C0000012C0000012C0000070800002A3000000E1000007E900000000A0000000A0000000A0000000044D0"
String collectionMessage = "01034000000960000002580000025800000064000299A00000012C0000012C0000012C0000070800002A3000000E1000007E900000000A0000000A0000000A0000000044D0";
ByteQueue byteQueue = new ByteQueue(collectionMessage);
RtuMessageParser masterParser = new RtuMessageParser(true);
RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
ModbusResponse modbusResponse = response.getModbusResponse();
short[] values = null;
switch (modbusResponse.getFunctionCode()) {
case FunctionCode.READ_COILS -> {
values = ((ReadCoilsResponse) modbusResponse).getShortData();
}
case FunctionCode.READ_DISCRETE_INPUTS -> {
values = ((ReadDiscreteInputsResponse) modbusResponse).getShortData();
}
case FunctionCode.READ_INPUT_REGISTERS -> {
values = ((ReadInputRegistersResponse) modbusResponse).getShortData();
}
case FunctionCode.READ_HOLDING_REGISTERS -> {
values = ((ReadHoldingRegistersResponse) modbusResponse).getShortData();
}
case FunctionCode.READ_EXCEPTION_STATUS -> {
values = new short[]{((ReadExceptionStatusResponse) modbusResponse).getExceptionStatus()};
}
default -> {
throw new RuntimeException("Funcetion code not supported: " + modbusResponse.getFunctionCode());
}
}
String address;
int startAddress = 150, index = 0, stepSize = 0;
Map<String, Object> messagePointMap = new HashMap<>();
for (short value : values) {
stepSize = index * 4;
address = StringUtils.leftPad(String.valueOf(startAddress), 4, '0');
System.out.println(address + ": " +
StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize) +
" -> " + String.valueOf(value));
startAddress++;
index++;
}
System.out.println();
}
}