添加数据持久化事件推送能力
This commit is contained in:
parent
fb3470ae67
commit
90ec96f7be
|
@ -30,7 +30,7 @@ public interface PersistenceHandler {
|
||||||
|
|
||||||
void createTable(String tableName, Long deviceId);
|
void createTable(String tableName, Long deviceId);
|
||||||
|
|
||||||
void insert(String tableName, String cacheKey);
|
Map<String, Object> insert(String tableName, String cacheKey);
|
||||||
|
|
||||||
default void setValue(PreparedStatement ps, Map<String, Object> row, int index, String key, int sqlType) throws SQLException {
|
default void setValue(PreparedStatement ps, Map<String, Object> row, int index, String key, int sqlType) throws SQLException {
|
||||||
String value = MapUtil.getStr(row, key);
|
String value = MapUtil.getStr(row, key);
|
||||||
|
|
|
@ -1,11 +1,14 @@
|
||||||
package com.isu.gaswellwatch.modbus.data;
|
package com.isu.gaswellwatch.modbus.data;
|
||||||
|
|
||||||
import cn.hutool.core.map.MapUtil;
|
import cn.hutool.core.map.MapUtil;
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
import com.isu.gaswellwatch.entity.Response;
|
import com.isu.gaswellwatch.entity.Response;
|
||||||
|
import com.isu.gaswellwatch.modbus.data.listener.Queues;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.ObjectUtils;
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.core.ParameterizedTypeReference;
|
import org.springframework.core.ParameterizedTypeReference;
|
||||||
import org.springframework.data.redis.core.Cursor;
|
import org.springframework.data.redis.core.Cursor;
|
||||||
import org.springframework.data.redis.core.HashOperations;
|
import org.springframework.data.redis.core.HashOperations;
|
||||||
|
@ -34,23 +37,23 @@ import java.util.stream.Collectors;
|
||||||
public class Redis2DBPersistenceService {
|
public class Redis2DBPersistenceService {
|
||||||
|
|
||||||
public static final String DEFAULT_DATA_TABLE = "t_device_data_";
|
public static final String DEFAULT_DATA_TABLE = "t_device_data_";
|
||||||
private static final String DEVICE_INFO_SQL = "SELECT * from device where id = ";
|
private static final String DEVICE_INFO_SQL = "SELECT d.*, dp.code as modbus_device_product_code from device d " + "join dictionary dp on dp.id = d.product where id = ";
|
||||||
private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA"
|
private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA" + " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'";
|
||||||
+ " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'";
|
|
||||||
@Resource
|
@Resource
|
||||||
private JdbcTemplate jdbcTemplate;
|
private JdbcTemplate jdbcTemplate;
|
||||||
@Resource(name = "stringRedisTemplate")
|
@Resource(name = "stringRedisTemplate")
|
||||||
private RedisTemplate redisTemplate;
|
private RedisTemplate redisTemplate;
|
||||||
@Resource
|
@Resource
|
||||||
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
@Resource
|
||||||
private Map<String, PersistenceHandler> persistenceHandlerMap;
|
private Map<String, PersistenceHandler> persistenceHandlerMap;
|
||||||
private RestTemplate restTemplate = new RestTemplate();
|
private final RestTemplate restTemplate = new RestTemplate();
|
||||||
|
|
||||||
@Scheduled(cron = "20/30 * * * * ? ")
|
@Scheduled(cron = "20/30 * * * * ? ")
|
||||||
public void write() {
|
public void write() {
|
||||||
Map<Long, String> idGatewayMappingMap = getOnlineGateway();
|
Map<Long, String> idGatewayMappingMap = getOnlineGateway();
|
||||||
HashOperations operations = this.redisTemplate.opsForHash();
|
HashOperations operations = this.redisTemplate.opsForHash();
|
||||||
try (Cursor<String> cursor = this.redisTemplate.scan(ScanOptions.scanOptions()
|
try (Cursor<String> cursor = this.redisTemplate.scan(ScanOptions.scanOptions().match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) {
|
||||||
.match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) {
|
|
||||||
Long deviceId;
|
Long deviceId;
|
||||||
String cacheKey, tableName;
|
String cacheKey, tableName;
|
||||||
Map<String, Object> deviceMap;
|
Map<String, Object> deviceMap;
|
||||||
|
@ -66,27 +69,35 @@ public class Redis2DBPersistenceService {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (Objects.nonNull(idGatewayMappingMap)) {
|
if (Objects.nonNull(idGatewayMappingMap)) {
|
||||||
operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online",
|
operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", String.valueOf(idGatewayMappingMap.containsKey(deviceId)));
|
||||||
String.valueOf(idGatewayMappingMap.containsKey(deviceId)));
|
|
||||||
}
|
}
|
||||||
String modbusDeviceTypeId = (String) operations.get(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "product");
|
String modbusDeviceProductCode = (String) operations.get(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code");
|
||||||
if (StringUtils.isEmpty(modbusDeviceTypeId)) {
|
if (StringUtils.isEmpty(modbusDeviceProductCode)) {
|
||||||
Map<String, Object> deviceInfo = this.jdbcTemplate.queryForMap(DEVICE_INFO_SQL + deviceId);
|
Map<String, Object> deviceInfo = this.jdbcTemplate.queryForMap(DEVICE_INFO_SQL + deviceId);
|
||||||
Map<String, Object> cacheDeviceInfo = new HashMap<>(deviceInfo.size());
|
if (ObjectUtils.isNotEmpty(deviceInfo)) {
|
||||||
deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value)));
|
Map<String, Object> cacheDeviceInfo = new HashMap<>(deviceInfo.size());
|
||||||
modbusDeviceTypeId = (String) cacheDeviceInfo.get("product");
|
deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value)));
|
||||||
operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo);
|
modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code");
|
||||||
|
operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode);
|
||||||
persistenceHandler = persistenceHandlerMap.get(ModbusDeviceTypeEnum.getCodeById(modbusDeviceTypeId));
|
|
||||||
tableName = DEFAULT_DATA_TABLE + deviceId;
|
tableName = DEFAULT_DATA_TABLE + deviceId;
|
||||||
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
|
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
|
||||||
if (ObjectUtils.isEmpty(existsTableList)
|
if (ObjectUtils.isEmpty(existsTableList) || ObjectUtils.isEmpty(existsTableList.get(0)) || StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) {
|
||||||
|| ObjectUtils.isEmpty(existsTableList.get(0))
|
|
||||||
|| StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) {
|
|
||||||
persistenceHandler.createTable(tableName, deviceId);
|
persistenceHandler.createTable(tableName, deviceId);
|
||||||
}
|
}
|
||||||
persistenceHandler.insert(tableName, cacheKey);
|
Map<String, Object> rowData = persistenceHandler.insert(tableName, cacheKey);
|
||||||
|
if (ObjectUtils.isNotEmpty(rowData)) {
|
||||||
|
String rowDataJson = JSONUtil.toJsonStr(rowData);
|
||||||
|
String queueName = String.format(Queues.DEVICE_EVENTS, deviceId);
|
||||||
|
try {
|
||||||
|
log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson);
|
||||||
|
this.rabbitTemplate.convertAndSend(queueName, rowDataJson);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -99,9 +110,7 @@ public class Redis2DBPersistenceService {
|
||||||
if (Objects.isNull(response) || Objects.isNull(response.getBody()) || ObjectUtils.isEmpty(response.getBody().getData())) {
|
if (Objects.isNull(response) || Objects.isNull(response.getBody()) || ObjectUtils.isEmpty(response.getBody().getData())) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
List<Map<String, Object>> idGatewayMappingList = this.jdbcTemplate.queryForList("select id, gateway_sn" +
|
List<Map<String, Object>> idGatewayMappingList = this.jdbcTemplate.queryForList("select id, gateway_sn" + " from device where gateway_sn in (" + response.getBody().getData().stream().map(value -> "'" + value + "'").collect(Collectors.joining(",")) + ")");
|
||||||
" from device where gateway_sn in (" +
|
|
||||||
response.getBody().getData().stream().map(value -> "'" + value + "'").collect(Collectors.joining(",")) + ")");
|
|
||||||
if (ObjectUtils.isEmpty(idGatewayMappingList)) {
|
if (ObjectUtils.isEmpty(idGatewayMappingList)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ public class EtcPersistenceHandler implements PersistenceHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void insert(String tableName, String cacheKey) {
|
public Map<String, Object> insert(String tableName, String cacheKey) {
|
||||||
String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_ETC.sql");
|
String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_ETC.sql");
|
||||||
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
|
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
|
||||||
|
|
||||||
|
@ -102,5 +102,6 @@ public class EtcPersistenceHandler implements PersistenceHandler {
|
||||||
return ps.executeUpdate();
|
return ps.executeUpdate();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
return row;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void insert(String tableName, String cacheKey) {
|
public Map<String, Object> insert(String tableName, String cacheKey) {
|
||||||
String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_KNPCV1.sql");
|
String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_KNPCV1.sql");
|
||||||
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
|
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
|
||||||
|
|
||||||
|
@ -102,5 +102,6 @@ public class Knpcv1PersistenceHandler implements PersistenceHandler {
|
||||||
return ps.executeUpdate();
|
return ps.executeUpdate();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
return row;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ public class ScssPersistenceHandler implements PersistenceHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void insert(String tableName, String cacheKey) {
|
public Map<String, Object> insert(String tableName, String cacheKey) {
|
||||||
String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_SCSS.sql");
|
String insertTableSQL = PersistenceHandler.getResource("sql/INSERT_SCSS.sql");
|
||||||
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
|
insertTableSQL = StringUtils.replace(insertTableSQL, "$TableName$", tableName);
|
||||||
|
|
||||||
|
@ -134,5 +134,6 @@ public class ScssPersistenceHandler implements PersistenceHandler {
|
||||||
return ps.executeUpdate();
|
return ps.executeUpdate();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
return row;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,17 +38,17 @@ public class DynamicRabbitListener implements ApplicationRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(ApplicationArguments args) throws Exception {
|
public void run(ApplicationArguments args) {
|
||||||
this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
|
this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
|
||||||
this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener());
|
this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener());
|
||||||
|
|
||||||
this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item -> {
|
this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item -> {
|
||||||
this.addListenerQueue(String.format("/modbus/device/%s/collect", MapUtil.getStr(item, "identifier")), null, null);
|
this.addListenerQueue(String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(item, "identifier")), null, null);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addListenerQueue(String gatewaySn) {
|
public void addListenerQueue(String gatewaySn) {
|
||||||
this.addListenerQueue(String.format("/modbus/device/%s/collect", gatewaySn), null, null);
|
this.addListenerQueue(String.format(Queues.MODBUS_COLLECT_DATA, gatewaySn), null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addListenerQueue(String queueName, String exchangeName, String routingKey) {
|
public void addListenerQueue(String queueName, String exchangeName, String routingKey) {
|
||||||
|
|
|
@ -169,6 +169,7 @@ public class ModbusMessagePersistListener implements BatchMessageListener {
|
||||||
ModbusMessage.MessagePoint messagePoint;
|
ModbusMessage.MessagePoint messagePoint;
|
||||||
String deviceDataCacheName = PersistenceHandler.DEVICE_DATA_CACHE + modbusMessage.getDeviceId();
|
String deviceDataCacheName = PersistenceHandler.DEVICE_DATA_CACHE + modbusMessage.getDeviceId();
|
||||||
HashOperations<String, Object, Object> hashOperations = this.redisTemplate.opsForHash();
|
HashOperations<String, Object, Object> hashOperations = this.redisTemplate.opsForHash();
|
||||||
|
hashOperations.put(deviceDataCacheName, "gatewaySn", modbusMessage.getGatewayIdentifier());
|
||||||
hashOperations.put(deviceDataCacheName, "deviceId", String.valueOf(modbusMessage.getDeviceId()));
|
hashOperations.put(deviceDataCacheName, "deviceId", String.valueOf(modbusMessage.getDeviceId()));
|
||||||
hashOperations.put(deviceDataCacheName, "receiveTime", LocalDateTime
|
hashOperations.put(deviceDataCacheName, "receiveTime", LocalDateTime
|
||||||
.ofInstant(Instant.ofEpochMilli(modbusMessage.getReceiveTime()), ZoneOffset.systemDefault())
|
.ofInstant(Instant.ofEpochMilli(modbusMessage.getReceiveTime()), ZoneOffset.systemDefault())
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
package com.isu.gaswellwatch.modbus.data.listener;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
|
||||||
|
* 2024/11/26 12:37
|
||||||
|
*/
|
||||||
|
public final class Queues {
|
||||||
|
|
||||||
|
public static final String DEVICE_EVENTS = "/device/%s/events";
|
||||||
|
public static final String MODBUS_COLLECT_DATA = "/modbus/device/%s/collect";
|
||||||
|
}
|
|
@ -1,5 +1,7 @@
|
||||||
package com.isu.gaswellwatch.utils;
|
package com.isu.gaswellwatch.utils;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
|
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
|
||||||
* 2024/11/25 16:07
|
* 2024/11/25 16:07
|
||||||
|
@ -7,7 +9,9 @@ package com.isu.gaswellwatch.utils;
|
||||||
public class ReverseComplementCodeUtils {
|
public class ReverseComplementCodeUtils {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 原码转反码
|
* 原码转反码<br/>
|
||||||
|
* 正数的原码,反码,补码都一样(三码合一), 0的反码,补码都是0<br/>
|
||||||
|
* 负数的反码=它的原码符号不变,其他位取反(0 => 1 1 => 0)
|
||||||
*
|
*
|
||||||
* @param source 二进制原码
|
* @param source 二进制原码
|
||||||
* @return
|
* @return
|
||||||
|
@ -16,7 +20,7 @@ public class ReverseComplementCodeUtils {
|
||||||
if (source.startsWith("0")) {
|
if (source.startsWith("0")) {
|
||||||
return source;
|
return source;
|
||||||
} else {
|
} else {
|
||||||
StringBuffer sbf = new StringBuffer();
|
StringBuilder sbf = new StringBuilder(source.length());
|
||||||
sbf.append("1");
|
sbf.append("1");
|
||||||
String f_str = source.substring(1);
|
String f_str = source.substring(1);
|
||||||
for (int i = 0; i < f_str.length(); i++) {
|
for (int i = 0; i < f_str.length(); i++) {
|
||||||
|
@ -32,7 +36,8 @@ public class ReverseComplementCodeUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 反码转补码, 默认补1
|
* 反码转补码, 默认补1<br/>
|
||||||
|
* 负数的补码 = 它的反码 + 1,负数的反码 = 负数的补码 - 1
|
||||||
*
|
*
|
||||||
* @param reverse 二进制反码字符串
|
* @param reverse 二进制反码字符串
|
||||||
* @return
|
* @return
|
||||||
|
@ -52,7 +57,7 @@ public class ReverseComplementCodeUtils {
|
||||||
if (reverse.startsWith("0")) {
|
if (reverse.startsWith("0")) {
|
||||||
return reverse;
|
return reverse;
|
||||||
}
|
}
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder(reverse.length());
|
||||||
int x = 0;
|
int x = 0;
|
||||||
int y = 0;
|
int y = 0;
|
||||||
int pre = 0;//进位
|
int pre = 0;//进位
|
||||||
|
@ -89,14 +94,32 @@ public class ReverseComplementCodeUtils {
|
||||||
* @param
|
* @param
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
|
|
||||||
public static String binComplementToDec(String complement) {
|
public static String binComplementToDec(String complement) {
|
||||||
String h_number = complement.substring(1);
|
String binNumber = complement.substring(1);
|
||||||
if (complement.startsWith("0")) {
|
String firstNumber = complement.substring(0, 1);
|
||||||
return "" + Long.valueOf(h_number, 2);
|
if (Objects.equals(firstNumber, "0")) {
|
||||||
} else if (complement.startsWith("1")) {
|
return "" + Long.valueOf(binNumber, 2);
|
||||||
return "-" + Long.valueOf(h_number, 2);
|
} else if (Objects.equals(firstNumber, "1")) {
|
||||||
|
return "-" + Long.valueOf(binNumber, 2);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
// String hexNumber = "F7"; // 16进制负数表示,等同于-9
|
||||||
|
String hexNumber = "F6"; // 16进制负数表示,等同于-10
|
||||||
|
// String hexNumber = "E6"; // 16进制负数表示,等同于-26
|
||||||
|
int decimalNumber = Integer.parseInt(hexNumber, 16);
|
||||||
|
|
||||||
|
String binaryNumber = Integer.toBinaryString(decimalNumber);
|
||||||
|
System.out.println("有符号16进制数:" + hexNumber + ",对应的2进制数为:" + binaryNumber);
|
||||||
|
|
||||||
|
binaryNumber = binOriginalToReverse(binaryNumber);
|
||||||
|
System.out.println("有符号16进制数:" + hexNumber + ",对应反码2进制为:" + binaryNumber + ",值为:" + Integer.parseInt(binaryNumber, 2));
|
||||||
|
|
||||||
|
binaryNumber = binReverseToComplement(binaryNumber, "1");
|
||||||
|
System.out.println("有符号16进制数:" + hexNumber + ",对应反码的补码为:" + binaryNumber + ",值为:" + Integer.parseInt(binaryNumber, 2));
|
||||||
|
|
||||||
|
System.out.println("有符号16进制数:" + hexNumber + ",对应的10进制数为:" + binComplementToDec(binaryNumber));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue