字段修改、增加监听
This commit is contained in:
parent
c71d8ff8dc
commit
4bc2281229
|
@ -0,0 +1,53 @@
|
|||
package com.isu.gaswellwatch.modbus.data.listener;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.BatchMessageListener;
|
||||
import org.springframework.amqp.core.Message;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class BusinessMessageHandlerListener implements BatchMessageListener {
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
this.onMessageBatch(List.of(message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessageBatch(List<Message> messages) {
|
||||
|
||||
//读取全部提醒字段的配置
|
||||
|
||||
for (Message message : messages) {
|
||||
String messageString = new String(message.getBody());
|
||||
log.info("收到业务消息:{}", messageString);
|
||||
JSONObject messageObject= JSON.parseObject(messageString);
|
||||
if(messageObject.containsKey("old")){
|
||||
JSONObject oldObject = messageObject.getJSONObject("old");
|
||||
JSONObject newObject = messageObject.getJSONObject("new");
|
||||
String deviceId = oldObject.getString("deviceId");
|
||||
//TODO
|
||||
//根据设备ID获取设备的产品类型
|
||||
|
||||
//根据对应的产品类型,用对应的气井开关字段获取开关状态(wellStatus,solenoidValveStatus,firstSolenoidStatus)
|
||||
|
||||
//比对新旧数据,看开关状态是否一致
|
||||
|
||||
//根据ID找到对应的提醒字段配置
|
||||
|
||||
//遍历该id下的所有字段的配置,检查新消息中字段的值是否符合提醒条件
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//若不一致则批量记录设备操作日志
|
||||
|
||||
//如果提醒数据不为空,批量保存提醒记录
|
||||
}
|
||||
}
|
|
@ -23,7 +23,9 @@ public class DynamicRabbitListener implements ApplicationRunner {
|
|||
|
||||
private final AmqpAdmin amqpAdmin;
|
||||
private final ComposeModbusMessageListener composeListener;
|
||||
private final ComposeModbusMessageListener businessMessageListener;
|
||||
private final SimpleMessageListenerContainer modbusMessageListenerContainer;
|
||||
private final SimpleMessageListenerContainer businessMessageListenerContainer;
|
||||
|
||||
public DynamicRabbitListener(ConnectionFactory connectionFactory) {
|
||||
this.amqpAdmin = new RabbitAdmin(connectionFactory);
|
||||
|
@ -33,11 +35,18 @@ public class DynamicRabbitListener implements ApplicationRunner {
|
|||
this.modbusMessageListenerContainer.setMessageListener(this.composeListener);
|
||||
// 启动监听容器
|
||||
this.modbusMessageListenerContainer.start();
|
||||
|
||||
//业务监听容器
|
||||
this.businessMessageListener = new ComposeModbusMessageListener();
|
||||
this.businessMessageListenerContainer = new SimpleMessageListenerContainer();
|
||||
this.businessMessageListenerContainer.setConnectionFactory(connectionFactory);
|
||||
this.businessMessageListenerContainer.setMessageListener(this.businessMessageListener);
|
||||
this.businessMessageListenerContainer.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) {
|
||||
// this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
|
||||
this.businessMessageListener.addBatchMessageListener(new BusinessMessageHandlerListener());
|
||||
this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener());
|
||||
IntStream.range(0, Queues.QUEUE_PARTITION).forEach(index -> {
|
||||
this.addEventListener(Queues.DEVICE_EVENTS + index);
|
||||
|
@ -52,7 +61,9 @@ public class DynamicRabbitListener implements ApplicationRunner {
|
|||
}
|
||||
|
||||
private void addEventListener(String queueName) {
|
||||
this.amqpAdmin.declareQueue(QueueBuilder.durable(queueName).build());
|
||||
Queue queue = QueueBuilder.durable(queueName).build();
|
||||
this.amqpAdmin.declareQueue(queue);
|
||||
this.businessMessageListenerContainer.addQueues(queue);
|
||||
}
|
||||
|
||||
}
|
|
@ -50,9 +50,12 @@ public class CommandServiceImpl implements CommandService {
|
|||
|
||||
//记录用户保存控制指令日志
|
||||
Integer flag = null;
|
||||
if (Command.KNPCV1_TURN_ON_THE_WELL.equals(command.getCode())) {
|
||||
if (Command.KNPCV1_TURN_ON_THE_WELL.equals(command.getCode())||Command.ETC_TURN_ON_THE_WELL.equals(command.getCode())||
|
||||
Command.SCSS_TURN_ON_THE_WELL.equals(command.getCode())) {
|
||||
flag = 1;
|
||||
} else if (Command.KNPCV1_TURN_OFF_THE_WELL.equals(command.getCode())) {
|
||||
} else if (Command.KNPCV1_TURN_OFF_THE_WELL.equals(command.getCode())||
|
||||
Command.ETC_TURN_OFF_THE_WELL.equals(command.getCode())||
|
||||
Command.SCSS_TURN_OFF_THE_WELL.equals(command.getCode())) {
|
||||
flag = 0;
|
||||
}
|
||||
;
|
||||
|
|
|
@ -204,6 +204,9 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
|
|||
List<DeviceHistoryVO> deviceHistoryVO = page.getRecords();
|
||||
if (CollectionUtil.isNotEmpty(deviceHistoryVO)) {
|
||||
Map<String, Dictionary> runModeMap = this.dictionaryService.getValueMapByType("runMode");
|
||||
if(PersistenceHandler.ETC_MODBUS_TYPE.equalsIgnoreCase(device.getProduct().getCode())){
|
||||
runModeMap = this.dictionaryService.getValueMapByType("controlMode");
|
||||
}
|
||||
Map<String, Dictionary> plugStatusMap = this.dictionaryService.getValueMapByType("plugStatus");
|
||||
|
||||
for (DeviceHistoryVO deviceVO : deviceHistoryVO) {
|
||||
|
|
|
@ -70,10 +70,10 @@
|
|||
|
||||
|
||||
<select id="historyPage" resultType="com.isu.gaswellwatch.vo.DeviceHistoryVO">
|
||||
select t.collection_time,t.oil_pressure,t.cas_pressure,t.pre_pressure,t.status_end_time
|
||||
<if test="deviceProduct!=null and deviceProduct=='knpc'">,t.run_mode,t.temperature,t.humidity,t.well_status,t.plug_status</if>
|
||||
<if test="deviceProduct!=null and deviceProduct=='etc'"></if>
|
||||
<if test="deviceProduct!=null and deviceProduct=='scss'">t.ctl_model as runMode,t.solar_voltage</if>
|
||||
select t.collection_time,t.oil_pressure,t.cas_pressure
|
||||
<if test="deviceProduct!=null and deviceProduct=='knpc'">,t.pre_pressure,t.run_mode,t.status_end_time,t.temperature,t.humidity,t.well_status,t.plug_status</if>
|
||||
<if test="deviceProduct!=null and deviceProduct=='etc'">,t.current_status_remaining_time as statusEndTime,t.controller_current_status as runMode,t.solenoid_valve_status as wellStatus</if>
|
||||
<if test="deviceProduct!=null and deviceProduct=='scss'">,t.pre_pressure,t.ctl_model as runMode,t.remaining_time_action as statusEndTime,t.solar_voltage,t.first_solenoid_status as wellStatus</if>
|
||||
from ${tableName} t
|
||||
<where>
|
||||
t.device_id = #{deviceId}
|
||||
|
|
Loading…
Reference in New Issue