如果某个设备数据落库失败,不影响其他设备数据落库

This commit is contained in:
wangshilong 2024-12-12 15:38:29 +08:00
parent 77781a8ef9
commit dcdf3e7a02
1 changed files with 41 additions and 37 deletions

View File

@ -72,45 +72,49 @@ public class Redis2DBPersistenceService {
if (Objects.isNull(deviceId)) { if (Objects.isNull(deviceId)) {
continue; continue;
} }
if (Objects.nonNull(idGatewayMappingMap)) { try {
operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, if (Objects.nonNull(idGatewayMappingMap)) {
"online", String.valueOf(idGatewayMappingMap.containsKey(deviceId))); operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId,
operations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, String.valueOf(deviceId), "online", String.valueOf(idGatewayMappingMap.containsKey(deviceId)));
String.valueOf(idGatewayMappingMap.containsKey(deviceId))); operations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, String.valueOf(deviceId),
} String.valueOf(idGatewayMappingMap.containsKey(deviceId)));
String modbusDeviceProductCode = (String) operations.get(
PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code");
if (StringUtils.isEmpty(modbusDeviceProductCode)) {
List<Map<String, Object>> deviceInfoList = this.jdbcTemplate.queryForList(DEVICE_INFO_SQL + deviceId);
if (ObjectUtils.isNotEmpty(deviceInfoList)) {
Map<String, Object> deviceInfo = deviceInfoList.get(0);
Map<String, Object> cacheDeviceInfo = new HashMap<>(deviceInfo.size());
deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value)));
modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code");
operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo);
} }
} String modbusDeviceProductCode = (String) operations.get(
if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) { PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code");
continue; if (StringUtils.isEmpty(modbusDeviceProductCode)) {
} List<Map<String, Object>> deviceInfoList = this.jdbcTemplate.queryForList(DEVICE_INFO_SQL + deviceId);
tableName = DEFAULT_DATA_TABLE + deviceId; if (ObjectUtils.isNotEmpty(deviceInfoList)) {
queueName = Queues.getDeviceEventsQueue(deviceId); Map<String, Object> deviceInfo = deviceInfoList.get(0);
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); Map<String, Object> cacheDeviceInfo = new HashMap<>(deviceInfo.size());
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value)));
if (ObjectUtils.isEmpty(existsTableList) modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code");
|| ObjectUtils.isEmpty(existsTableList.get(0)) operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo);
|| StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) { }
persistenceHandler.createTable(tableName, deviceId);
}
Map<String, Map<String, Object>> changeData = persistenceHandler.insert(tableName, cacheKey);
if (ObjectUtils.isNotEmpty(changeData)) {
String rowDataJson = JSONUtil.toJsonStr(changeData);
try {
log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson);
this.rabbitTemplate.convertAndSend(queueName, rowDataJson);
} catch (Exception e) {
log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e);
} }
if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) {
continue;
}
tableName = DEFAULT_DATA_TABLE + deviceId;
queueName = Queues.getDeviceEventsQueue(deviceId);
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode);
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
if (ObjectUtils.isEmpty(existsTableList)
|| ObjectUtils.isEmpty(existsTableList.get(0))
|| StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) {
persistenceHandler.createTable(tableName, deviceId);
}
Map<String, Map<String, Object>> changeData = persistenceHandler.insert(tableName, cacheKey);
if (ObjectUtils.isNotEmpty(changeData)) {
String rowDataJson = JSONUtil.toJsonStr(changeData);
try {
log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson);
this.rabbitTemplate.convertAndSend(queueName, rowDataJson);
} catch (Exception e) {
log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e);
}
}
} catch (Exception e) {
log.error("设备{}最新数据落库失败", deviceId, JSONUtil.toJsonStr(deviceMap), e);
} }
} }
} }