diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ComposeModbusMessageListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ComposeModbusMessageListener.java index c98fa9f..e204480 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ComposeModbusMessageListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ComposeModbusMessageListener.java @@ -29,4 +29,5 @@ public class ComposeModbusMessageListener implements BatchMessageListener { public void onMessageBatch(List messages) { this.messageListenerList.forEach(messageListener -> messageListener.onMessageBatch(messages)); } + } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java index 2935dfd..a69758f 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java @@ -28,40 +28,30 @@ public class DynamicRabbitListener implements ApplicationRunner { private final AmqpAdmin amqpAdmin; private final JdbcTemplate jdbcTemplate; private final ComposeModbusMessageListener composeListener; - private final SimpleMessageListenerContainer listenerContainer; + private final SimpleMessageListenerContainer modbusMessageListenerContainer; public DynamicRabbitListener(ConnectionFactory connectionFactory, JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; this.amqpAdmin = new RabbitAdmin(connectionFactory); - this.listenerContainer = new SimpleMessageListenerContainer(); - this.listenerContainer.setConnectionFactory(connectionFactory); - this.listenerContainer.setMessageListener(this.composeListener = new ComposeModbusMessageListener()); + this.modbusMessageListenerContainer = new SimpleMessageListenerContainer(); + this.modbusMessageListenerContainer.setConnectionFactory(connectionFactory); + this.modbusMessageListenerContainer.setMessageListener(this.composeListener = new ComposeModbusMessageListener()); // 启动监听容器 - this.listenerContainer.start(); + this.modbusMessageListenerContainer.start(); } @Override public void run(ApplicationArguments args) { this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener()); this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener()); - - this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(item -> - this.addListenerQueue(this.registerDevice(item), null, null)); + this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(this::registerDeviceAndListener); } - public void registerDevice(Long deviceId) { - this.jdbcTemplate.queryForList(DEVICE_SQL + deviceId).forEach(this::registerDevice); + public void registerDeviceAndListener(Long deviceId) { + this.jdbcTemplate.queryForList(DEVICE_SQL + deviceId).forEach(this::registerDeviceAndListener); } - public void addListenerQueue(String gatewaySn) { - this.addListenerQueue(String.format(Queues.MODBUS_COLLECT_DATA, gatewaySn), null, null); - } - - public void addListenerQueue(String queueName, String exchangeName, String routingKey) { - Queue queue = QueueBuilder.durable(queueName).build(); - // 声明队列 - this.amqpAdmin.declareQueue(queue); - + public void addListenerCollectQueue(Queue queue, String exchangeName, String routingKey) { if (StringUtils.isNotBlank(exchangeName) && StringUtils.isNotBlank(routingKey)) { // 声明直接类型的交换器 DirectExchange exchange = new DirectExchange(exchangeName); @@ -69,17 +59,18 @@ public class DynamicRabbitListener implements ApplicationRunner { // 绑定队列和交换器 this.amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey)); } - // 设置监听的队列 - this.listenerContainer.addQueues(queue); + this.modbusMessageListenerContainer.addQueues(queue); } - private String registerDevice(Map deviceMap) { - String eventQueue = String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(deviceMap, "id")); - String collectDataQueue = String.format(Queues.MODBUS_COLLECT_DATA, MapUtil.getStr(deviceMap, "identifier")); - this.amqpAdmin.declareQueue(QueueBuilder.durable(eventQueue).build()); - this.amqpAdmin.declareQueue(QueueBuilder.durable(collectDataQueue).build()); - return collectDataQueue; + private void registerDeviceAndListener(Map deviceMap) { + Queue eventQueue = QueueBuilder.durable(String.format(Queues.DEVICE_EVENTS, + MapUtil.getStr(deviceMap, "id"))).build(); + Queue collectQueue = QueueBuilder.durable(String.format(Queues.MODBUS_COLLECT_DATA, + MapUtil.getStr(deviceMap, "identifier"))).build(); + this.amqpAdmin.declareQueue(eventQueue); + this.amqpAdmin.declareQueue(collectQueue); + this.addListenerCollectQueue(collectQueue, null, null); } } \ No newline at end of file