改为并发落库

This commit is contained in:
wangshilong 2024-12-21 14:43:26 +08:00
parent 6ee2875c30
commit 042dbfcecf
1 changed files with 86 additions and 40 deletions

View File

@ -6,6 +6,7 @@ import com.isu.gaswellwatch.entity.Response;
import com.isu.gaswellwatch.modbus.data.listener.Queues; import com.isu.gaswellwatch.modbus.data.listener.Queues;
import com.isu.gaswellwatch.utils.HexUtil; import com.isu.gaswellwatch.utils.HexUtil;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -23,6 +24,11 @@ import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import java.util.*; import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -49,6 +55,20 @@ public class Redis2DBPersistenceService {
private Map<String, PersistenceHandler> persistenceHandlerMap; private Map<String, PersistenceHandler> persistenceHandlerMap;
private final RestTemplate restTemplate = new RestTemplate(); private final RestTemplate restTemplate = new RestTemplate();
private final ThreadPoolExecutor persistenceThreadPool = new ThreadPoolExecutor(Math.min(50, Runtime.getRuntime().availableProcessors() - 1)
, Math.min(100, Runtime.getRuntime().availableProcessors() - 1), 5, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(5000), new ThreadFactory() {
final AtomicInteger index = new AtomicInteger(1);
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Modbus-auto-collect-thread-" + this.index.getAndIncrement());
thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY);
return thread;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
@Scheduled(cron = "20/30 * * * * ? ") @Scheduled(cron = "20/30 * * * * ? ")
public void write() { public void write() {
Map<Long, String> idGatewayMappingMap = writeOnlineGateway(); Map<Long, String> idGatewayMappingMap = writeOnlineGateway();
@ -69,48 +89,60 @@ public class Redis2DBPersistenceService {
if (Objects.isNull(deviceId)) { if (Objects.isNull(deviceId)) {
continue; continue;
} }
try { this.persistenceThreadPool.execute(new Worker(deviceId, cacheKey, operations, deviceMap, idGatewayMappingMap));
if (Objects.nonNull(idGatewayMappingMap)) { }
updateDeviceOnlineStatus(operations, deviceId, String.valueOf(idGatewayMappingMap.containsKey(deviceId))); }
} }
String modbusDeviceProductCode = (String) operations.get(
PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code"); private void persistenceDeviceData(Long deviceId,
if (StringUtils.isEmpty(modbusDeviceProductCode)) { String cacheKey,
List<Map<String, Object>> deviceInfoList = this.jdbcTemplate.queryForList(DEVICE_INFO_SQL + deviceId); HashOperations operations,
if (ObjectUtils.isNotEmpty(deviceInfoList)) { Map<String, Object> deviceMap,
Map<String, Object> deviceInfo = deviceInfoList.get(0); Map<Long, String> idGatewayMappingMap) {
Map<String, Object> cacheDeviceInfo = new HashMap<>(deviceInfo.size()); String tableName;
deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value))); String queueName;
modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code"); PersistenceHandler persistenceHandler;
operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo); List<Map<String, Object>> existsTableList;
} try {
} if (Objects.nonNull(idGatewayMappingMap)) {
if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) { updateDeviceOnlineStatus(operations, deviceId, String.valueOf(idGatewayMappingMap.containsKey(deviceId)));
continue; }
} String modbusDeviceProductCode = (String) operations.get(
tableName = DEFAULT_DATA_TABLE + deviceId; PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code");
queueName = Queues.getDeviceEventsQueue(deviceId); if (StringUtils.isEmpty(modbusDeviceProductCode)) {
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); List<Map<String, Object>> deviceInfoList = this.jdbcTemplate.queryForList(DEVICE_INFO_SQL + deviceId);
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); if (ObjectUtils.isNotEmpty(deviceInfoList)) {
if (ObjectUtils.isEmpty(existsTableList) Map<String, Object> deviceInfo = deviceInfoList.get(0);
|| ObjectUtils.isEmpty(existsTableList.get(0)) Map<String, Object> cacheDeviceInfo = new HashMap<>(deviceInfo.size());
|| StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) { deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value)));
persistenceHandler.createTable(tableName, deviceId); modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code");
} operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo);
Map<String, Map<String, Object>> changeData = persistenceHandler.insert(tableName, cacheKey);
if (ObjectUtils.isNotEmpty(changeData)) {
String rowDataJson = JSONUtil.toJsonStr(changeData);
try {
log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson);
this.rabbitTemplate.convertAndSend(queueName, rowDataJson);
} catch (Exception e) {
log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e);
}
}
} catch (Exception e) {
log.error("设备{}最新数据落库失败", deviceId, JSONUtil.toJsonStr(deviceMap), e);
} }
} }
if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) {
return;
}
tableName = DEFAULT_DATA_TABLE + deviceId;
queueName = Queues.getDeviceEventsQueue(deviceId);
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode);
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
if (ObjectUtils.isEmpty(existsTableList)
|| ObjectUtils.isEmpty(existsTableList.get(0))
|| StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) {
persistenceHandler.createTable(tableName, deviceId);
}
Map<String, Map<String, Object>> changeData = persistenceHandler.insert(tableName, cacheKey);
if (ObjectUtils.isNotEmpty(changeData)) {
String rowDataJson = JSONUtil.toJsonStr(changeData);
try {
log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson);
this.rabbitTemplate.convertAndSend(queueName, rowDataJson);
} catch (Exception e) {
log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e);
}
}
} catch (Exception e) {
log.error("设备{}最新数据落库失败", deviceId, JSONUtil.toJsonStr(deviceMap), e);
} }
} }
@ -156,4 +188,18 @@ public class Redis2DBPersistenceService {
operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", onlineStatus); operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", onlineStatus);
} }
@RequiredArgsConstructor
private class Worker implements Runnable {
private final Long deviceId;
private final String cacheKey;
private final HashOperations operations;
private final Map<String, Object> deviceMap;
private final Map<Long, String> idGatewayMappingMap;
@Override
public void run() {
Redis2DBPersistenceService.this.persistenceDeviceData(deviceId, cacheKey, operations, deviceMap, idGatewayMappingMap);
}
}
} }