From 4bc2281229e8df707baf2aece703355eeb61f167 Mon Sep 17 00:00:00 2001 From: qinjie <463333974@qq.com> Date: Mon, 9 Dec 2024 21:01:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AD=97=E6=AE=B5=E4=BF=AE=E6=94=B9=E3=80=81?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=9B=91=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BusinessMessageHandlerListener.java | 53 +++++++++++++++++++ .../data/listener/DynamicRabbitListener.java | 15 +++++- .../modbus/impl/CommandServiceImpl.java | 7 ++- .../service/impl/DeviceServiceImpl.java | 3 ++ src/main/resources/mapper/DeviceDao.xml | 8 +-- 5 files changed, 78 insertions(+), 8 deletions(-) create mode 100755 src/main/java/com/isu/gaswellwatch/modbus/data/listener/BusinessMessageHandlerListener.java diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/BusinessMessageHandlerListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/BusinessMessageHandlerListener.java new file mode 100755 index 0000000..30c333a --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/BusinessMessageHandlerListener.java @@ -0,0 +1,53 @@ +package com.isu.gaswellwatch.modbus.data.listener; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.BatchMessageListener; +import org.springframework.amqp.core.Message; + +import java.util.List; + +@Slf4j +@RequiredArgsConstructor +public class BusinessMessageHandlerListener implements BatchMessageListener { + @Override + public void onMessage(Message message) { + this.onMessageBatch(List.of(message)); + } + + @Override + public void onMessageBatch(List messages) { + + //读取全部提醒字段的配置 + + for (Message message : messages) { + String messageString = new String(message.getBody()); + log.info("收到业务消息:{}", messageString); + JSONObject messageObject= JSON.parseObject(messageString); + if(messageObject.containsKey("old")){ + JSONObject oldObject = messageObject.getJSONObject("old"); + JSONObject newObject = messageObject.getJSONObject("new"); + String deviceId = oldObject.getString("deviceId"); + //TODO + //根据设备ID获取设备的产品类型 + + //根据对应的产品类型,用对应的气井开关字段获取开关状态(wellStatus,solenoidValveStatus,firstSolenoidStatus) + + //比对新旧数据,看开关状态是否一致 + + //根据ID找到对应的提醒字段配置 + + //遍历该id下的所有字段的配置,检查新消息中字段的值是否符合提醒条件 + + + } + + } + + //若不一致则批量记录设备操作日志 + + //如果提醒数据不为空,批量保存提醒记录 + } +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java index 0c3e970..c487a87 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java @@ -23,7 +23,9 @@ public class DynamicRabbitListener implements ApplicationRunner { private final AmqpAdmin amqpAdmin; private final ComposeModbusMessageListener composeListener; + private final ComposeModbusMessageListener businessMessageListener; private final SimpleMessageListenerContainer modbusMessageListenerContainer; + private final SimpleMessageListenerContainer businessMessageListenerContainer; public DynamicRabbitListener(ConnectionFactory connectionFactory) { this.amqpAdmin = new RabbitAdmin(connectionFactory); @@ -33,11 +35,18 @@ public class DynamicRabbitListener implements ApplicationRunner { this.modbusMessageListenerContainer.setMessageListener(this.composeListener); // 启动监听容器 this.modbusMessageListenerContainer.start(); + + //业务监听容器 + this.businessMessageListener = new ComposeModbusMessageListener(); + this.businessMessageListenerContainer = new SimpleMessageListenerContainer(); + this.businessMessageListenerContainer.setConnectionFactory(connectionFactory); + this.businessMessageListenerContainer.setMessageListener(this.businessMessageListener); + this.businessMessageListenerContainer.start(); } @Override public void run(ApplicationArguments args) { -// this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener()); + this.businessMessageListener.addBatchMessageListener(new BusinessMessageHandlerListener()); this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener()); IntStream.range(0, Queues.QUEUE_PARTITION).forEach(index -> { this.addEventListener(Queues.DEVICE_EVENTS + index); @@ -52,7 +61,9 @@ public class DynamicRabbitListener implements ApplicationRunner { } private void addEventListener(String queueName) { - this.amqpAdmin.declareQueue(QueueBuilder.durable(queueName).build()); + Queue queue = QueueBuilder.durable(queueName).build(); + this.amqpAdmin.declareQueue(queue); + this.businessMessageListenerContainer.addQueues(queue); } } \ No newline at end of file diff --git a/src/main/java/com/isu/gaswellwatch/modbus/impl/CommandServiceImpl.java b/src/main/java/com/isu/gaswellwatch/modbus/impl/CommandServiceImpl.java index 8956255..5d45b6c 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/impl/CommandServiceImpl.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/impl/CommandServiceImpl.java @@ -50,9 +50,12 @@ public class CommandServiceImpl implements CommandService { //记录用户保存控制指令日志 Integer flag = null; - if (Command.KNPCV1_TURN_ON_THE_WELL.equals(command.getCode())) { + if (Command.KNPCV1_TURN_ON_THE_WELL.equals(command.getCode())||Command.ETC_TURN_ON_THE_WELL.equals(command.getCode())|| + Command.SCSS_TURN_ON_THE_WELL.equals(command.getCode())) { flag = 1; - } else if (Command.KNPCV1_TURN_OFF_THE_WELL.equals(command.getCode())) { + } else if (Command.KNPCV1_TURN_OFF_THE_WELL.equals(command.getCode())|| + Command.ETC_TURN_OFF_THE_WELL.equals(command.getCode())|| + Command.SCSS_TURN_OFF_THE_WELL.equals(command.getCode())) { flag = 0; } ; diff --git a/src/main/java/com/isu/gaswellwatch/service/impl/DeviceServiceImpl.java b/src/main/java/com/isu/gaswellwatch/service/impl/DeviceServiceImpl.java index 890e681..8dcbeeb 100644 --- a/src/main/java/com/isu/gaswellwatch/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/isu/gaswellwatch/service/impl/DeviceServiceImpl.java @@ -204,6 +204,9 @@ public class DeviceServiceImpl extends ServiceImpl implements List deviceHistoryVO = page.getRecords(); if (CollectionUtil.isNotEmpty(deviceHistoryVO)) { Map runModeMap = this.dictionaryService.getValueMapByType("runMode"); + if(PersistenceHandler.ETC_MODBUS_TYPE.equalsIgnoreCase(device.getProduct().getCode())){ + runModeMap = this.dictionaryService.getValueMapByType("controlMode"); + } Map plugStatusMap = this.dictionaryService.getValueMapByType("plugStatus"); for (DeviceHistoryVO deviceVO : deviceHistoryVO) { diff --git a/src/main/resources/mapper/DeviceDao.xml b/src/main/resources/mapper/DeviceDao.xml index b88d201..62610e0 100644 --- a/src/main/resources/mapper/DeviceDao.xml +++ b/src/main/resources/mapper/DeviceDao.xml @@ -70,10 +70,10 @@