设备数据持久化事件推送队列动态生成
This commit is contained in:
parent
442ae6e4bf
commit
a7f4dcd000
|
@ -58,9 +58,9 @@ public class Redis2DBPersistenceService {
|
||||||
try (Cursor<String> cursor = this.redisTemplate.scan(ScanOptions.scanOptions()
|
try (Cursor<String> cursor = this.redisTemplate.scan(ScanOptions.scanOptions()
|
||||||
.match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) {
|
.match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) {
|
||||||
Long deviceId;
|
Long deviceId;
|
||||||
String cacheKey, tableName;
|
|
||||||
Map<String, Object> deviceMap;
|
Map<String, Object> deviceMap;
|
||||||
PersistenceHandler persistenceHandler;
|
PersistenceHandler persistenceHandler;
|
||||||
|
String cacheKey, tableName, queueName;
|
||||||
List<Map<String, Object>> existsTableList;
|
List<Map<String, Object>> existsTableList;
|
||||||
while (cursor.hasNext()) {
|
while (cursor.hasNext()) {
|
||||||
deviceMap = operations.entries(cacheKey = cursor.next());
|
deviceMap = operations.entries(cacheKey = cursor.next());
|
||||||
|
@ -88,8 +88,9 @@ public class Redis2DBPersistenceService {
|
||||||
operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo);
|
operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode);
|
|
||||||
tableName = DEFAULT_DATA_TABLE + deviceId;
|
tableName = DEFAULT_DATA_TABLE + deviceId;
|
||||||
|
queueName = String.format(Queues.DEVICE_EVENTS, deviceId);
|
||||||
|
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode);
|
||||||
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
|
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
|
||||||
if (ObjectUtils.isEmpty(existsTableList)
|
if (ObjectUtils.isEmpty(existsTableList)
|
||||||
|| ObjectUtils.isEmpty(existsTableList.get(0))
|
|| ObjectUtils.isEmpty(existsTableList.get(0))
|
||||||
|
@ -99,7 +100,6 @@ public class Redis2DBPersistenceService {
|
||||||
Map<String, Map<String, Object>> changeData = persistenceHandler.insert(tableName, cacheKey);
|
Map<String, Map<String, Object>> changeData = persistenceHandler.insert(tableName, cacheKey);
|
||||||
if (ObjectUtils.isNotEmpty(changeData)) {
|
if (ObjectUtils.isNotEmpty(changeData)) {
|
||||||
String rowDataJson = JSONUtil.toJsonStr(changeData);
|
String rowDataJson = JSONUtil.toJsonStr(changeData);
|
||||||
String queueName = String.format(Queues.DEVICE_EVENTS, deviceId);
|
|
||||||
try {
|
try {
|
||||||
log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson);
|
log.debug("推设备最新落库数据到MQ({}): {}", queueName, rowDataJson);
|
||||||
this.rabbitTemplate.convertAndSend(queueName, rowDataJson);
|
this.rabbitTemplate.convertAndSend(queueName, rowDataJson);
|
||||||
|
|
|
@ -16,10 +16,13 @@ import org.springframework.boot.ApplicationRunner;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
public class DynamicRabbitListener implements ApplicationRunner {
|
public class DynamicRabbitListener implements ApplicationRunner {
|
||||||
|
|
||||||
|
private static final String DEVICE_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d where d.id = ";
|
||||||
private static final String GATEWAY_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d ORDER BY d.id ";
|
private static final String GATEWAY_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d ORDER BY d.id ";
|
||||||
|
|
||||||
private final AmqpAdmin amqpAdmin;
|
private final AmqpAdmin amqpAdmin;
|
||||||
|
@ -42,9 +45,12 @@ public class DynamicRabbitListener implements ApplicationRunner {
|
||||||
this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
|
this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
|
||||||
this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener());
|
this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener());
|
||||||
|
|
||||||
this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item -> {
|
this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item ->
|
||||||
this.addListenerQueue(String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(item, "identifier")), null, null);
|
this.addListenerQueue(this.registerDevice(item), null, null));
|
||||||
});
|
}
|
||||||
|
|
||||||
|
public void registerDevice(Long deviceId) {
|
||||||
|
this.jdbcTemplate.queryForList(DEVICE_SQL + deviceId).forEach(this::registerDevice);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addListenerQueue(String gatewaySn) {
|
public void addListenerQueue(String gatewaySn) {
|
||||||
|
@ -68,4 +74,12 @@ public class DynamicRabbitListener implements ApplicationRunner {
|
||||||
this.listenerContainer.addQueues(queue);
|
this.listenerContainer.addQueues(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String registerDevice(Map<String, Object> deviceMap) {
|
||||||
|
String eventQueue = String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(deviceMap, "id"));
|
||||||
|
String collectDataQueue = String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(deviceMap, "identifier"));
|
||||||
|
this.amqpAdmin.declareQueue(QueueBuilder.durable(eventQueue).build());
|
||||||
|
this.amqpAdmin.declareQueue(QueueBuilder.durable(collectDataQueue).build());
|
||||||
|
return collectDataQueue;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue