设备数据持久化事件推送队列动态生成

This commit is contained in:
wangshilong 2024-11-27 15:06:43 +08:00
parent a7f4dcd000
commit 5b7c3a10f9
2 changed files with 19 additions and 27 deletions

View File

@ -29,4 +29,5 @@ public class ComposeModbusMessageListener implements BatchMessageListener {
public void onMessageBatch(List<Message> messages) {
this.messageListenerList.forEach(messageListener -> messageListener.onMessageBatch(messages));
}
}

View File

@ -28,40 +28,30 @@ public class DynamicRabbitListener implements ApplicationRunner {
private final AmqpAdmin amqpAdmin;
private final JdbcTemplate jdbcTemplate;
private final ComposeModbusMessageListener composeListener;
private final SimpleMessageListenerContainer listenerContainer;
private final SimpleMessageListenerContainer modbusMessageListenerContainer;
public DynamicRabbitListener(ConnectionFactory connectionFactory, JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.amqpAdmin = new RabbitAdmin(connectionFactory);
this.listenerContainer = new SimpleMessageListenerContainer();
this.listenerContainer.setConnectionFactory(connectionFactory);
this.listenerContainer.setMessageListener(this.composeListener = new ComposeModbusMessageListener());
this.modbusMessageListenerContainer = new SimpleMessageListenerContainer();
this.modbusMessageListenerContainer.setConnectionFactory(connectionFactory);
this.modbusMessageListenerContainer.setMessageListener(this.composeListener = new ComposeModbusMessageListener());
// 启动监听容器
this.listenerContainer.start();
this.modbusMessageListenerContainer.start();
}
@Override
public void run(ApplicationArguments args) {
this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener());
this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item ->
this.addListenerQueue(this.registerDevice(item), null, null));
this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(this::registerDeviceAndListener);
}
public void registerDevice(Long deviceId) {
this.jdbcTemplate.queryForList(DEVICE_SQL + deviceId).forEach(this::registerDevice);
public void registerDeviceAndListener(Long deviceId) {
this.jdbcTemplate.queryForList(DEVICE_SQL + deviceId).forEach(this::registerDeviceAndListener);
}
public void addListenerQueue(String gatewaySn) {
this.addListenerQueue(String.format(Queues.MODBUS_COLLECT_DATA, gatewaySn), null, null);
}
public void addListenerQueue(String queueName, String exchangeName, String routingKey) {
Queue queue = QueueBuilder.durable(queueName).build();
// 声明队列
this.amqpAdmin.declareQueue(queue);
public void addListenerCollectQueue(Queue queue, String exchangeName, String routingKey) {
if (StringUtils.isNotBlank(exchangeName) && StringUtils.isNotBlank(routingKey)) {
// 声明直接类型的交换器
DirectExchange exchange = new DirectExchange(exchangeName);
@ -69,17 +59,18 @@ public class DynamicRabbitListener implements ApplicationRunner {
// 绑定队列和交换器
this.amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
}
// 设置监听的队列
this.listenerContainer.addQueues(queue);
this.modbusMessageListenerContainer.addQueues(queue);
}
private String registerDevice(Map<String, Object> deviceMap) {
String eventQueue = String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(deviceMap, "id"));
String collectDataQueue = String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(deviceMap, "identifier"));
this.amqpAdmin.declareQueue(QueueBuilder.durable(eventQueue).build());
this.amqpAdmin.declareQueue(QueueBuilder.durable(collectDataQueue).build());
return collectDataQueue;
private void registerDeviceAndListener(Map<String, Object> deviceMap) {
Queue eventQueue = QueueBuilder.durable(String.format(Queues.DEVICE_EVENTS,
MapUtil.getStr(deviceMap, "id"))).build();
Queue collectQueue = QueueBuilder.durable(String.format(Queues.MODBUS_COLLECT_DATA,
MapUtil.getStr(deviceMap, "identifier"))).build();
this.amqpAdmin.declareQueue(eventQueue);
this.amqpAdmin.declareQueue(collectQueue);
this.addListenerCollectQueue(collectQueue, null, null);
}
}