diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java index 752dc7b..10ff195 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java @@ -58,9 +58,9 @@ public class Redis2DBPersistenceService { try (Cursor cursor = this.redisTemplate.scan(ScanOptions.scanOptions() .match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) { Long deviceId; - String cacheKey, tableName; Map deviceMap; PersistenceHandler persistenceHandler; + String cacheKey, tableName, queueName; List> existsTableList; while (cursor.hasNext()) { deviceMap = operations.entries(cacheKey = cursor.next()); @@ -88,8 +88,9 @@ public class Redis2DBPersistenceService { operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo); } } - persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); tableName = DEFAULT_DATA_TABLE + deviceId; + queueName = String.format(Queues.DEVICE_EVENTS, deviceId); + persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); if (ObjectUtils.isEmpty(existsTableList) || ObjectUtils.isEmpty(existsTableList.get(0)) @@ -99,7 +100,6 @@ public class Redis2DBPersistenceService { Map> changeData = persistenceHandler.insert(tableName, cacheKey); if (ObjectUtils.isNotEmpty(changeData)) { String rowDataJson = JSONUtil.toJsonStr(changeData); - String queueName = String.format(Queues.DEVICE_EVENTS, deviceId); try { log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson); this.rabbitTemplate.convertAndSend(queueName, rowDataJson); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java index 2b1cbe6..2935dfd 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java @@ -16,10 +16,13 @@ import org.springframework.boot.ApplicationRunner; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; +import java.util.Map; + @Component public class DynamicRabbitListener implements ApplicationRunner { + private static final String DEVICE_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d where d.id = "; private static final String GATEWAY_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d ORDER BY d.id "; private final AmqpAdmin amqpAdmin; @@ -42,9 +45,12 @@ public class DynamicRabbitListener implements ApplicationRunner { this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener()); this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener()); - this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item -> { - this.addListenerQueue(String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(item, "identifier")), null, null); - }); + this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item -> + this.addListenerQueue(this.registerDevice(item), null, null)); + } + + public void registerDevice(Long deviceId) { + this.jdbcTemplate.queryForList(DEVICE_SQL + deviceId).forEach(this::registerDevice); } public void addListenerQueue(String gatewaySn) { @@ -68,4 +74,12 @@ public class DynamicRabbitListener implements ApplicationRunner { this.listenerContainer.addQueues(queue); } + private String registerDevice(Map deviceMap) { + String eventQueue = String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(deviceMap, "id")); + String collectDataQueue = String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(deviceMap, "identifier")); + this.amqpAdmin.declareQueue(QueueBuilder.durable(eventQueue).build()); + this.amqpAdmin.declareQueue(QueueBuilder.durable(collectDataQueue).build()); + return collectDataQueue; + } + } \ No newline at end of file