package com.isu.gaswellwatch.modbus.data; import cn.hutool.core.map.MapUtil; import cn.hutool.json.JSONUtil; import com.isu.gaswellwatch.entity.Response; import com.isu.gaswellwatch.modbus.data.listener.Queues; import com.isu.gaswellwatch.utils.HexUtil; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.core.ParameterizedTypeReference; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ScanOptions; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import java.util.*; import java.util.stream.Collectors; /** * @author 王仕龙 * 2024/11/23 11:55 */ @Slf4j @Component @SuppressWarnings("all") public class Redis2DBPersistenceService { public static final String DEFAULT_DATA_TABLE = "t_device_data_"; public static final String DEVICE_INFO_SQL = "SELECT d.*, dp.code as modbus_device_product_code from device d " + "join dictionary dp on dp.id = d.product where d.id = "; private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA" + " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'"; @Resource private JdbcTemplate jdbcTemplate; @Resource(name = "stringRedisTemplate") private RedisTemplate redisTemplate; @Resource private RabbitTemplate rabbitTemplate; @Resource private Map persistenceHandlerMap; private final RestTemplate restTemplate = new RestTemplate(); @Scheduled(cron = "20/30 * * * * ? ") public void write() { Map idGatewayMappingMap = writeOnlineGateway(); HashOperations operations = this.redisTemplate.opsForHash(); try (Cursor cursor = this.redisTemplate.scan(ScanOptions.scanOptions() .match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) { Long deviceId; Map deviceMap; PersistenceHandler persistenceHandler; String cacheKey, tableName, queueName; List> existsTableList; while (cursor.hasNext()) { deviceMap = operations.entries(cacheKey = cursor.next()); if (ObjectUtils.isEmpty(deviceMap)) { continue; } deviceId = MapUtil.getLong(deviceMap, "deviceId"); if (Objects.isNull(deviceId)) { continue; } try { if (Objects.nonNull(idGatewayMappingMap)) { updateDeviceOnlineStatus(operations, deviceId, String.valueOf(idGatewayMappingMap.containsKey(deviceId))); } String modbusDeviceProductCode = (String) operations.get( PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code"); if (StringUtils.isEmpty(modbusDeviceProductCode)) { List> deviceInfoList = this.jdbcTemplate.queryForList(DEVICE_INFO_SQL + deviceId); if (ObjectUtils.isNotEmpty(deviceInfoList)) { Map deviceInfo = deviceInfoList.get(0); Map cacheDeviceInfo = new HashMap<>(deviceInfo.size()); deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value))); modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code"); operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo); } } if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) { continue; } tableName = DEFAULT_DATA_TABLE + deviceId; queueName = Queues.getDeviceEventsQueue(deviceId); persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); if (ObjectUtils.isEmpty(existsTableList) || ObjectUtils.isEmpty(existsTableList.get(0)) || StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) { persistenceHandler.createTable(tableName, deviceId); } Map> changeData = persistenceHandler.insert(tableName, cacheKey); if (ObjectUtils.isNotEmpty(changeData)) { String rowDataJson = JSONUtil.toJsonStr(changeData); try { log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson); this.rabbitTemplate.convertAndSend(queueName, rowDataJson); } catch (Exception e) { log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e); } } } catch (Exception e) { log.error("设备{}最新数据落库失败", deviceId, JSONUtil.toJsonStr(deviceMap), e); } } } } private Map writeOnlineGateway() { try { RequestEntity request = RequestEntity.get("http://localhost:9999/modbus-tcp/online").build(); ResponseEntity>> response = this.restTemplate.exchange(request, new ParameterizedTypeReference>>() { }); if (Objects.isNull(response) || Objects.isNull(response.getBody())) { return null; } if (ObjectUtils.isEmpty(response.getBody().getData())) { return Collections.emptyMap(); } List> idGatewayMappingList = this.jdbcTemplate.queryForList("select id, gateway_sn" + " from device where gateway_sn in (" + response.getBody() .getData() .stream() .map(value -> new String(HexUtil.hexStringToBytes(value))) .map(value -> "'" + value + "'") .collect(Collectors.joining(",")) + ")"); if (ObjectUtils.isEmpty(idGatewayMappingList)) { return null; } Long deviceId; HashOperations operations = this.redisTemplate.opsForHash(); Map idGatewayMappingMap = new HashMap(); for (Map map : idGatewayMappingList) { idGatewayMappingMap.put(deviceId = MapUtil.getLong(map, "id"), MapUtil.getStr(map, "gateway_sn")); updateDeviceOnlineStatus(operations, deviceId, String.valueOf(idGatewayMappingMap.containsKey(deviceId))); } return idGatewayMappingMap; } catch (Exception e) { log.warn("Get online devices list fail", e); return null; } } private void updateDeviceOnlineStatus(HashOperations operations, Long deviceId, String onlineStatus) { operations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, String.valueOf(deviceId), onlineStatus); operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", onlineStatus); } }