diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java index 1df1416..7409045 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java @@ -72,45 +72,49 @@ public class Redis2DBPersistenceService { if (Objects.isNull(deviceId)) { continue; } - if (Objects.nonNull(idGatewayMappingMap)) { - operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, - "online", String.valueOf(idGatewayMappingMap.containsKey(deviceId))); - operations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, String.valueOf(deviceId), - String.valueOf(idGatewayMappingMap.containsKey(deviceId))); - } - String modbusDeviceProductCode = (String) operations.get( - PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code"); - if (StringUtils.isEmpty(modbusDeviceProductCode)) { - List> deviceInfoList = this.jdbcTemplate.queryForList(DEVICE_INFO_SQL + deviceId); - if (ObjectUtils.isNotEmpty(deviceInfoList)) { - Map deviceInfo = deviceInfoList.get(0); - Map cacheDeviceInfo = new HashMap<>(deviceInfo.size()); - deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value))); - modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code"); - operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo); + try { + if (Objects.nonNull(idGatewayMappingMap)) { + operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, + "online", String.valueOf(idGatewayMappingMap.containsKey(deviceId))); + operations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, String.valueOf(deviceId), + String.valueOf(idGatewayMappingMap.containsKey(deviceId))); } - } - if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) { - continue; - } - tableName = DEFAULT_DATA_TABLE + deviceId; - queueName = Queues.getDeviceEventsQueue(deviceId); - persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); - existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); - if (ObjectUtils.isEmpty(existsTableList) - || ObjectUtils.isEmpty(existsTableList.get(0)) - || StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) { - persistenceHandler.createTable(tableName, deviceId); - } - Map> changeData = persistenceHandler.insert(tableName, cacheKey); - if (ObjectUtils.isNotEmpty(changeData)) { - String rowDataJson = JSONUtil.toJsonStr(changeData); - try { - log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson); - this.rabbitTemplate.convertAndSend(queueName, rowDataJson); - } catch (Exception e) { - log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e); + String modbusDeviceProductCode = (String) operations.get( + PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code"); + if (StringUtils.isEmpty(modbusDeviceProductCode)) { + List> deviceInfoList = this.jdbcTemplate.queryForList(DEVICE_INFO_SQL + deviceId); + if (ObjectUtils.isNotEmpty(deviceInfoList)) { + Map deviceInfo = deviceInfoList.get(0); + Map cacheDeviceInfo = new HashMap<>(deviceInfo.size()); + deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value))); + modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code"); + operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo); + } } + if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) { + continue; + } + tableName = DEFAULT_DATA_TABLE + deviceId; + queueName = Queues.getDeviceEventsQueue(deviceId); + persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); + existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); + if (ObjectUtils.isEmpty(existsTableList) + || ObjectUtils.isEmpty(existsTableList.get(0)) + || StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) { + persistenceHandler.createTable(tableName, deviceId); + } + Map> changeData = persistenceHandler.insert(tableName, cacheKey); + if (ObjectUtils.isNotEmpty(changeData)) { + String rowDataJson = JSONUtil.toJsonStr(changeData); + try { + log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson); + this.rabbitTemplate.convertAndSend(queueName, rowDataJson); + } catch (Exception e) { + log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e); + } + } + } catch (Exception e) { + log.error("设备{}最新数据落库失败", deviceId, JSONUtil.toJsonStr(deviceMap), e); } } }