231 lines
11 KiB
Java
231 lines
11 KiB
Java
package com.isu.gaswellwatch.modbus.data;
|
|
|
|
import cn.hutool.core.map.MapUtil;
|
|
import cn.hutool.json.JSONUtil;
|
|
import com.isu.gaswellwatch.entity.Response;
|
|
import com.isu.gaswellwatch.modbus.data.listener.Queues;
|
|
import com.isu.gaswellwatch.utils.HexUtil;
|
|
import jakarta.annotation.Resource;
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.lang3.ObjectUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
import org.springframework.core.ParameterizedTypeReference;
|
|
import org.springframework.dao.DataAccessException;
|
|
import org.springframework.data.redis.core.*;
|
|
import org.springframework.http.RequestEntity;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Objects;
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.stream.Collectors;
|
|
|
|
/**
|
|
* @author <a href="mailto:scwsl@foxmail.com">王仕龙</a>
|
|
* 2024/11/23 11:55
|
|
*/
|
|
@Slf4j
|
|
@Component
|
|
@SuppressWarnings("all")
|
|
public class Redis2DBPersistenceService {
|
|
|
|
public static final String DEFAULT_DATA_TABLE = "t_device_data_";
|
|
public static final String DEVICE_INFO_SQL = "SELECT d.*, dp.code as modbus_device_product_code from device d "
|
|
+ "join dictionary dp on dp.id = d.product where d.id = ";
|
|
private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA"
|
|
+ " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'";
|
|
@Resource
|
|
private JdbcTemplate jdbcTemplate;
|
|
@Resource(name = "stringRedisTemplate")
|
|
private RedisTemplate redisTemplate;
|
|
@Resource
|
|
private RabbitTemplate rabbitTemplate;
|
|
@Resource
|
|
private Map<String, PersistenceHandler> persistenceHandlerMap;
|
|
private final RestTemplate restTemplate = new RestTemplate();
|
|
|
|
private final ThreadPoolExecutor persistenceThreadPool = new ThreadPoolExecutor(Math.min(50, Runtime.getRuntime().availableProcessors() - 1)
|
|
, Math.min(100, Runtime.getRuntime().availableProcessors() - 1), 5, TimeUnit.MINUTES,
|
|
new ArrayBlockingQueue<>(5000), new ThreadFactory() {
|
|
final AtomicInteger index = new AtomicInteger(1);
|
|
|
|
@Override
|
|
public Thread newThread(Runnable runnable) {
|
|
Thread thread = new Thread(runnable, "Modbus-auto-collect-thread-" + this.index.getAndIncrement());
|
|
thread.setDaemon(true);
|
|
thread.setPriority(Thread.MIN_PRIORITY);
|
|
return thread;
|
|
}
|
|
}, new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
|
@Scheduled(cron = "20/30 * * * * ? ")
|
|
public void write() {
|
|
writeOnlineGateway();
|
|
HashOperations operations = this.redisTemplate.opsForHash();
|
|
try (Cursor<String> cursor = this.redisTemplate.scan(ScanOptions.scanOptions()
|
|
.match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) {
|
|
Long deviceId;
|
|
Map<String, Object> deviceMap;
|
|
PersistenceHandler persistenceHandler;
|
|
String cacheKey, tableName, queueName;
|
|
List<Map<String, Object>> existsTableList;
|
|
while (cursor.hasNext()) {
|
|
deviceMap = operations.entries(cacheKey = cursor.next());
|
|
if (ObjectUtils.isEmpty(deviceMap)) {
|
|
continue;
|
|
}
|
|
deviceId = MapUtil.getLong(deviceMap, "deviceId");
|
|
if (Objects.isNull(deviceId)) {
|
|
continue;
|
|
}
|
|
this.persistenceThreadPool.execute(new Worker(deviceId, cacheKey, operations, deviceMap));
|
|
}
|
|
}
|
|
}
|
|
|
|
private void persistenceDeviceData(Long deviceId,
|
|
String cacheKey,
|
|
HashOperations operations,
|
|
Map<String, Object> deviceMap) {
|
|
String tableName;
|
|
String queueName;
|
|
PersistenceHandler persistenceHandler;
|
|
List<Map<String, Object>> existsTableList;
|
|
try {
|
|
String modbusDeviceProductCode = (String) operations.get(
|
|
PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code");
|
|
if (StringUtils.isEmpty(modbusDeviceProductCode)) {
|
|
List<Map<String, Object>> deviceInfoList = this.jdbcTemplate.queryForList(DEVICE_INFO_SQL + deviceId);
|
|
if (ObjectUtils.isNotEmpty(deviceInfoList)) {
|
|
Map<String, Object> deviceInfo = deviceInfoList.get(0);
|
|
Map<String, Object> cacheDeviceInfo = new HashMap<>(deviceInfo.size());
|
|
deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value)));
|
|
modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code");
|
|
operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo);
|
|
}
|
|
}
|
|
if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) {
|
|
return;
|
|
}
|
|
tableName = DEFAULT_DATA_TABLE + deviceId;
|
|
queueName = Queues.getDeviceEventsQueue(deviceId);
|
|
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode);
|
|
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
|
|
if (ObjectUtils.isEmpty(existsTableList)
|
|
|| ObjectUtils.isEmpty(existsTableList.get(0))
|
|
|| StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) {
|
|
persistenceHandler.createTable(tableName, deviceId);
|
|
}
|
|
Map<String, Map<String, Object>> changeData = persistenceHandler.insert(tableName, cacheKey);
|
|
if (ObjectUtils.isNotEmpty(changeData)) {
|
|
String rowDataJson = JSONUtil.toJsonStr(changeData);
|
|
try {
|
|
log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson);
|
|
this.rabbitTemplate.convertAndSend(queueName, rowDataJson);
|
|
} catch (Exception e) {
|
|
log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e);
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
log.error("设备{}最新数据落库失败", deviceId, JSONUtil.toJsonStr(deviceMap), e);
|
|
}
|
|
}
|
|
|
|
private void writeOnlineGateway() {
|
|
try {
|
|
RequestEntity request = RequestEntity.get("http://localhost:9999/modbus-tcp/online").build();
|
|
ResponseEntity<Response<List<String>>> response = this.restTemplate.exchange(request,
|
|
new ParameterizedTypeReference<Response<List<String>>>() {
|
|
});
|
|
if (Objects.isNull(response) || Objects.isNull(response.getBody())) {
|
|
return;
|
|
}
|
|
if (ObjectUtils.isEmpty(response.getBody().getData())) {
|
|
allOffline();
|
|
return;
|
|
}
|
|
String onlineGatewaySns = response.getBody()
|
|
.getData()
|
|
.stream()
|
|
.filter(Objects::nonNull)
|
|
.map(value -> new String(HexUtil.hexStringToBytes(value)))
|
|
.filter(StringUtils::isNotBlank)
|
|
.map(value -> "'" + value + "'")
|
|
.collect(Collectors.joining(", "));
|
|
if (StringUtils.isBlank(onlineGatewaySns)) {
|
|
onlineGatewaySns = "''";
|
|
}
|
|
List<Map<String, Object>> idGatewayMappingList = this.jdbcTemplate.queryForList("SELECT a.id, " +
|
|
"IF(b.gateway_sn IS NOT NULL, 'true', 'false') AS status FROM device a LEFT JOIN (" +
|
|
"SELECT id, gateway_sn FROM device WHERE gateway_sn IN (" +
|
|
onlineGatewaySns + ")) b ON a.id = b.id ORDER BY a.id");
|
|
if (ObjectUtils.isEmpty(idGatewayMappingList)) {
|
|
allOffline();
|
|
return;
|
|
}
|
|
this.redisTemplate.executePipelined(new SessionCallback<Object>() {
|
|
@Override
|
|
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
|
|
String deviceId, status;
|
|
HashOperations hashOperations = operations.opsForHash();
|
|
for (Map<String, Object> map : idGatewayMappingList) {
|
|
deviceId = MapUtil.getStr(map, "id");
|
|
status = MapUtil.getStr(map, "status");
|
|
hashOperations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, deviceId, status);
|
|
hashOperations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", status);
|
|
}
|
|
return null;
|
|
}
|
|
});
|
|
} catch (Exception e) {
|
|
log.warn("Get online devices list fail", e);
|
|
}
|
|
}
|
|
|
|
private void allOffline() {
|
|
HashOperations hashOperations = this.redisTemplate.opsForHash();
|
|
Map<String, String> onlineDeviceStatusMap = hashOperations.entries(PersistenceHandler.ONLINE_DEVICE_CACHE);
|
|
this.redisTemplate.executePipelined(new SessionCallback<Object>() {
|
|
@Override
|
|
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
|
|
onlineDeviceStatusMap.forEach((deviceId, status) -> {
|
|
hashOperations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, deviceId, "false");
|
|
hashOperations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", "false");
|
|
});
|
|
return null;
|
|
}
|
|
});
|
|
}
|
|
|
|
private void updateDeviceOnlineStatus(HashOperations operations, Long deviceId, String onlineStatus) {
|
|
operations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, String.valueOf(deviceId), onlineStatus);
|
|
operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", onlineStatus);
|
|
}
|
|
|
|
@RequiredArgsConstructor
|
|
private class Worker implements Runnable {
|
|
private final Long deviceId;
|
|
private final String cacheKey;
|
|
private final HashOperations operations;
|
|
private final Map<String, Object> deviceMap;
|
|
|
|
@Override
|
|
public void run() {
|
|
Redis2DBPersistenceService.this.persistenceDeviceData(deviceId, cacheKey, operations, deviceMap);
|
|
}
|
|
}
|
|
|
|
}
|