package com.isu.gaswellwatch.modbus.data; import cn.hutool.core.map.MapUtil; import cn.hutool.json.JSONUtil; import com.isu.gaswellwatch.entity.Response; import com.isu.gaswellwatch.modbus.data.listener.Queues; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.core.ParameterizedTypeReference; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ScanOptions; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; /** * @author 王仕龙 * 2024/11/23 11:55 */ @Slf4j @Component @SuppressWarnings("all") public class Redis2DBPersistenceService { public static final String DEFAULT_DATA_TABLE = "t_device_data_"; public static final String DEVICE_INFO_SQL = "SELECT d.*, dp.code as modbus_device_product_code from device d " + "join dictionary dp on dp.id = d.product where d.id = "; private static final String EXISTS_TABLE_SQL = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA" + " IN ('gaswellwatch', 'gas_well_watch') AND TABLE_NAME='$TableName$'"; @Resource private JdbcTemplate jdbcTemplate; @Resource(name = "stringRedisTemplate") private RedisTemplate redisTemplate; @Resource private RabbitTemplate rabbitTemplate; @Resource private Map persistenceHandlerMap; private final RestTemplate restTemplate = new RestTemplate(); @Scheduled(cron = "20/30 * * * * ? ") public void write() { Map idGatewayMappingMap = getOnlineGateway(); HashOperations operations = this.redisTemplate.opsForHash(); try (Cursor cursor = this.redisTemplate.scan(ScanOptions.scanOptions() .match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) { Long deviceId; Map deviceMap; PersistenceHandler persistenceHandler; String cacheKey, tableName, queueName; List> existsTableList; while (cursor.hasNext()) { deviceMap = operations.entries(cacheKey = cursor.next()); if (ObjectUtils.isEmpty(deviceMap)) { continue; } deviceId = MapUtil.getLong(deviceMap, "deviceId"); if (Objects.isNull(deviceId)) { continue; } if (Objects.nonNull(idGatewayMappingMap)) { operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", String.valueOf(idGatewayMappingMap.containsKey(deviceId))); operations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, String.valueOf(deviceId), String.valueOf(idGatewayMappingMap.containsKey(deviceId))); } String modbusDeviceProductCode = (String) operations.get( PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code"); if (StringUtils.isEmpty(modbusDeviceProductCode)) { Map deviceInfo = this.jdbcTemplate.queryForMap(DEVICE_INFO_SQL + deviceId); if (ObjectUtils.isNotEmpty(deviceInfo)) { Map cacheDeviceInfo = new HashMap<>(deviceInfo.size()); deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value))); modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code"); operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo); } } tableName = DEFAULT_DATA_TABLE + deviceId; queueName = Queues.DEVICE_EVENTS + (deviceId % 10); persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); if (ObjectUtils.isEmpty(existsTableList) || ObjectUtils.isEmpty(existsTableList.get(0)) || StringUtils.isBlank(MapUtil.getStr(existsTableList.get(0), "TABLE_NAME"))) { persistenceHandler.createTable(tableName, deviceId); } Map> changeData = persistenceHandler.insert(tableName, cacheKey); if (ObjectUtils.isNotEmpty(changeData)) { String rowDataJson = JSONUtil.toJsonStr(changeData); try { log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson); this.rabbitTemplate.convertAndSend(queueName, rowDataJson); } catch (Exception e) { log.error("推设备最新落库数据到MQ失败({}): {}", queueName, rowDataJson, e); } } } } } private Map getOnlineGateway() { try { RequestEntity request = RequestEntity.get("http://localhost:9999/modbus-tcp/online").build(); ResponseEntity>> response = this.restTemplate.exchange(request, new ParameterizedTypeReference>>() { }); if (Objects.isNull(response) || Objects.isNull(response.getBody()) || ObjectUtils.isEmpty(response.getBody().getData())) { return null; } List> idGatewayMappingList = this.jdbcTemplate.queryForList("select id, gateway_sn" + " from device where gateway_sn in (" + response.getBody().getData().stream().map(value -> "'" + value + "'").collect(Collectors.joining(",")) + ")"); if (ObjectUtils.isEmpty(idGatewayMappingList)) { return null; } Map idGatewayMappingMap = new HashMap(); for (Map map : idGatewayMappingList) { idGatewayMappingMap.put(MapUtil.getLong(map, "id"), MapUtil.getStr(map, "gateway_sn")); } return idGatewayMappingMap; } catch (Exception e) { log.warn("Get online devices list fail", e); return null; } } }