gasWellWatch/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService....

142 lines
7.1 KiB
Java
Raw Normal View History

2024-11-25 17:55:45 +08:00
package com.isu.gaswellwatch.modbus.data;
2024-11-24 18:51:30 +08:00
import cn.hutool.core.map.MapUtil;
import cn.hutool.json.JSONUtil;
2024-11-25 13:07:43 +08:00
import com.isu.gaswellwatch.entity.Response;
import com.isu.gaswellwatch.modbus.data.listener.Queues;
2024-11-24 18:51:30 +08:00
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
2024-11-25 13:07:43 +08:00
import org.springframework.core.ParameterizedTypeReference;
2024-11-24 18:51:30 +08:00
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
2024-11-25 13:07:43 +08:00
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
2024-11-24 18:51:30 +08:00
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
2024-11-25 13:07:43 +08:00
import java.util.Objects;
import java.util.stream.Collectors;
2024-11-24 18:51:30 +08:00
/**
2024-11-26 15:51:47 +08:00
* @author <a href="mailto:scwsl@foxmail.com">王仕龙</a>
2024-11-24 18:51:30 +08:00
* 2024/11/23 11:55
*/
@Slf4j
@Component
@SuppressWarnings("all")
public class Redis2DBPersistenceService {
2024-11-25 22:44:08 +08:00
public static final String DEFAULT_DATA_TABLE = "t_device_data_";
private static final String DEVICE_INFO_SQL = "SELECT d.*, dp.code as modbus_device_product_code from device d "
2024-11-26 18:46:34 +08:00
+ "join dictionary dp on dp.id = d.product where d.id = ";
private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA"
+ " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'";
2024-11-24 18:51:30 +08:00
@Resource
private JdbcTemplate jdbcTemplate;
2024-11-25 13:07:43 +08:00
@Resource(name = "stringRedisTemplate")
2024-11-24 18:51:30 +08:00
private RedisTemplate redisTemplate;
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
2024-11-24 18:51:30 +08:00
private Map<String, PersistenceHandler> persistenceHandlerMap;
private final RestTemplate restTemplate = new RestTemplate();
2024-11-24 18:51:30 +08:00
2024-11-25 13:07:43 +08:00
@Scheduled(cron = "20/30 * * * * ? ")
2024-11-24 18:51:30 +08:00
public void write() {
2024-11-25 13:07:43 +08:00
Map<Long, String> idGatewayMappingMap = getOnlineGateway();
2024-11-24 18:51:30 +08:00
HashOperations operations = this.redisTemplate.opsForHash();
try (Cursor<String> cursor = this.redisTemplate.scan(ScanOptions.scanOptions()
.match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) {
2024-11-24 18:51:30 +08:00
Long deviceId;
2024-11-25 13:07:43 +08:00
Map<String, Object> deviceMap;
2024-11-24 18:51:30 +08:00
PersistenceHandler persistenceHandler;
String cacheKey, tableName, queueName;
2024-11-25 13:07:43 +08:00
List<Map<String, Object>> existsTableList;
2024-11-24 18:51:30 +08:00
while (cursor.hasNext()) {
deviceMap = operations.entries(cacheKey = cursor.next());
2024-11-25 13:07:43 +08:00
if (ObjectUtils.isEmpty(deviceMap)) {
continue;
}
2024-11-24 18:51:30 +08:00
deviceId = MapUtil.getLong(deviceMap, "deviceId");
2024-11-25 13:07:43 +08:00
if (Objects.isNull(deviceId)) {
continue;
}
if (Objects.nonNull(idGatewayMappingMap)) {
operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId,
"online", String.valueOf(idGatewayMappingMap.containsKey(deviceId)));
2024-11-27 14:34:49 +08:00
operations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, String.valueOf(deviceId),
String.valueOf(idGatewayMappingMap.containsKey(deviceId)));
2024-11-25 13:07:43 +08:00
}
String modbusDeviceProductCode = (String) operations.get(
PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code");
if (StringUtils.isEmpty(modbusDeviceProductCode)) {
2024-11-25 17:55:45 +08:00
Map<String, Object> deviceInfo = this.jdbcTemplate.queryForMap(DEVICE_INFO_SQL + deviceId);
if (ObjectUtils.isNotEmpty(deviceInfo)) {
Map<String, Object> cacheDeviceInfo = new HashMap<>(deviceInfo.size());
deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value)));
modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code");
operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo);
}
2024-11-25 17:55:45 +08:00
}
2024-11-25 22:44:08 +08:00
tableName = DEFAULT_DATA_TABLE + deviceId;
queueName = String.format(Queues.DEVICE_EVENTS, deviceId);
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode);
2024-11-25 13:07:43 +08:00
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
if (ObjectUtils.isEmpty(existsTableList)
|| ObjectUtils.isEmpty(existsTableList.get(0))
|| StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) {
2024-11-25 13:07:43 +08:00
persistenceHandler.createTable(tableName, deviceId);
2024-11-24 18:51:30 +08:00
}
2024-11-26 15:51:47 +08:00
Map<String, Map<String, Object>> changeData = persistenceHandler.insert(tableName, cacheKey);
if (ObjectUtils.isNotEmpty(changeData)) {
String rowDataJson = JSONUtil.toJsonStr(changeData);
try {
log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson);
this.rabbitTemplate.convertAndSend(queueName, rowDataJson);
} catch (Exception e) {
log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e);
}
}
2024-11-24 18:51:30 +08:00
}
}
}
2024-11-25 13:07:43 +08:00
private Map<Long, String> getOnlineGateway() {
try {
2024-11-27 02:00:58 +08:00
RequestEntity request = RequestEntity.get("http://localhost:9999/modbus-tcp/online").build();
ResponseEntity<Response<List<String>>> response = this.restTemplate.exchange(request,
new ParameterizedTypeReference<Response<List<String>>>() {
});
2024-11-25 17:55:45 +08:00
if (Objects.isNull(response) || Objects.isNull(response.getBody()) || ObjectUtils.isEmpty(response.getBody().getData())) {
2024-11-25 13:07:43 +08:00
return null;
}
List<Map<String, Object>> idGatewayMappingList = this.jdbcTemplate.queryForList("select id, gateway_sn"
+ " from device where gateway_sn in ("
+ response.getBody().getData().stream().map(value -> "'"
+ value + "'").collect(Collectors.joining(",")) + ")");
2024-11-25 13:07:43 +08:00
if (ObjectUtils.isEmpty(idGatewayMappingList)) {
return null;
}
Map<Long, String> idGatewayMappingMap = new HashMap<Long, String>();
for (Map<String, Object> map : idGatewayMappingList) {
idGatewayMappingMap.put(MapUtil.getLong(map, "id"), MapUtil.getStr(map, "gateway_sn"));
}
return idGatewayMappingMap;
} catch (Exception e) {
log.error("Get online devices list fail", e);
return null;
}
}
2024-11-24 18:51:30 +08:00
}