diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/BusinessMessageHandlerListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/BusinessMessageHandlerListener.java index 4f45b9d..2226c2b 100755 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/BusinessMessageHandlerListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/BusinessMessageHandlerListener.java @@ -48,58 +48,58 @@ public class BusinessMessageHandlerListener implements BatchMessageListener { public void onMessageBatch(List messages) { //读取全部提醒字段的配置 - List settingList = remindService.getAllRemindSetting(); + List settingList = this.remindService.getAllRemindSetting(); List recordList = new ArrayList<>(); //将settingList转换为map,key为deviceId,value为List - Map> deviceRemindSettingMap = new HashMap<>(); + Map> deviceRemindSettingMap = new HashMap<>(); for (RemindSetting setting : settingList) { - if(deviceRemindSettingMap.containsKey(setting.getDeviceId())){ + if (deviceRemindSettingMap.containsKey(setting.getDeviceId())) { deviceRemindSettingMap.get(setting.getDeviceId()).add(setting); - }else{ - deviceRemindSettingMap.put(setting.getDeviceId(),List.of(setting)); + } else { + deviceRemindSettingMap.put(setting.getDeviceId(), List.of(setting)); } } - Map deviceWellStatusMap = new HashMap<>(); + Map deviceWellStatusMap = new HashMap<>(); for (Message message : messages) { String messageString = new String(message.getBody()); - JSONObject messageObject= JSON.parseObject(messageString); - if(messageObject.containsKey("old")){ + JSONObject messageObject = JSON.parseObject(messageString); + if (messageObject.containsKey("old")) { JSONObject oldObject = messageObject.getJSONObject("old"); JSONObject newObject = messageObject.getJSONObject("new"); String deviceId = oldObject.getString("deviceId"); - if(StringUtils.isEmpty(deviceId)){ + if (StringUtils.isEmpty(deviceId)) { continue; } //根据设备ID获取设备的产品类型 - DeviceVO device = deviceService.getDevice(Long.valueOf(deviceId)); + DeviceVO device = this.deviceService.getDevice(Long.valueOf(deviceId)); //根据对应的产品类型,用对应的气井开关字段获取开关状态(wellStatus,solenoidValveStatus,firstSolenoidStatus) String wellStatusKey = getWellStatusKey(device); //比对新旧数据,看开关状态是否一致 - if(!oldObject.getString(wellStatusKey).equals(newObject.getString(wellStatusKey))){ - deviceWellStatusMap.put(Long.valueOf(deviceId),newObject.getInteger(wellStatusKey)); + if (oldObject.getString(wellStatusKey) != null && (!oldObject.getString(wellStatusKey).equals(newObject.getString(wellStatusKey)))) { + deviceWellStatusMap.put(Long.valueOf(deviceId), newObject.getInteger(wellStatusKey)); } //根据ID找到对应的提醒字段配置 - if(deviceRemindSettingMap.containsKey(Long.valueOf(deviceId))){ + if (deviceRemindSettingMap.containsKey(Long.valueOf(deviceId))) { List remindSettings = deviceRemindSettingMap.get(Long.valueOf(deviceId)); for (RemindSetting remindSetting : remindSettings) { //遍历该id下的所有字段的配置,检查新消息中字段的值是否符合提醒条件 //暂时只处理number类型的字段 - if("number".equals(remindSetting.getFieldType())){ + if ("number".equals(remindSetting.getFieldType())) { BigDecimal newValue = newObject.getBigDecimal(remindSetting.getField()); - if(newValue != null){ - if(!StringUtils.isEmpty(remindSetting.getMaxValue())){ - if(newValue.compareTo(new BigDecimal(remindSetting.getMaxValue())) > 0){ + if (newValue != null) { + if (!StringUtils.isEmpty(remindSetting.getMaxValue())) { + if (newValue.compareTo(new BigDecimal(remindSetting.getMaxValue())) > 0) { //如果大于最大值,则保存提醒记录 - addRemindRecordToList(remindSetting, deviceId, device, newValue, recordList); - }else if(!StringUtils.isEmpty(remindSetting.getMinValue())){ - if(newValue.compareTo(new BigDecimal(remindSetting.getMinValue())) < 0){ + this.addRemindRecordToList(remindSetting, deviceId, device, newValue, recordList); + } else if (!StringUtils.isEmpty(remindSetting.getMinValue())) { + if (newValue.compareTo(new BigDecimal(remindSetting.getMinValue())) < 0) { //如果小于最小值,则保存提醒记录 - addRemindRecordToList(remindSetting, deviceId, device, newValue, recordList); + this.addRemindRecordToList(remindSetting, deviceId, device, newValue, recordList); } } } @@ -111,40 +111,39 @@ public class BusinessMessageHandlerListener implements BatchMessageListener { } - } } //若不一致则记录设备操作日志 - if(!deviceWellStatusMap.isEmpty()){ + if (!deviceWellStatusMap.isEmpty()) { for (Map.Entry entry : deviceWellStatusMap.entrySet()) { - this.deviceOptLogService.saveGasWellOptLog(entry.getValue(), entry.getKey(),"device"); + this.deviceOptLogService.saveGasWellOptLog(entry.getValue(), entry.getKey(), "device"); } } //如果提醒数据不为空,批量保存提醒记录 - if(!recordList.isEmpty()){ + if (!recordList.isEmpty()) { this.remindService.saveBatch(recordList); } } private void addRemindRecordToList(RemindSetting remindSetting, String deviceId, DeviceVO device, BigDecimal newValue, List recordList) { RemindRecord remindRecord = new RemindRecord(); - remindRecord.setId(snowflakeConfig.snowflakeId()); + remindRecord.setId(this.snowflakeConfig.snowflakeId()); remindRecord.setDeviceId(Long.valueOf(deviceId)); remindRecord.setGasWell(device.getGasWell().getId()); - remindRecord.setContent("【"+ remindSetting.getFieldName()+"】的值达到"+ newValue +",请及时处理"); + remindRecord.setContent("【" + remindSetting.getFieldName() + "】的值达到" + newValue + ",请及时处理"); recordList.add(remindRecord); } private static String getWellStatusKey(DeviceVO device) { String wellStatusKey = ""; - if(PersistenceHandler.KNPCV1_MODBUS_TYPE.equals(device.getDeviceType().getCode())){ + if (PersistenceHandler.KNPCV1_MODBUS_TYPE.equals(device.getProduct().getCode())) { wellStatusKey = "wellStatus"; - } else if(PersistenceHandler.ETC_MODBUS_TYPE.equals(device.getDeviceType().getCode())){ + } else if (PersistenceHandler.ETC_MODBUS_TYPE.equals(device.getProduct().getCode())) { wellStatusKey = "solenoidValveStatus"; - } else if(PersistenceHandler.SCSS_MODBUS_TYPE.equals(device.getDeviceType().getCode())){ + } else if (PersistenceHandler.SCSS_MODBUS_TYPE.equals(device.getProduct().getCode())) { wellStatusKey = "firstSolenoidStatus"; } return wellStatusKey; diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java index c487a87..c134ab6 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java @@ -41,7 +41,7 @@ public class DynamicRabbitListener implements ApplicationRunner { this.businessMessageListenerContainer = new SimpleMessageListenerContainer(); this.businessMessageListenerContainer.setConnectionFactory(connectionFactory); this.businessMessageListenerContainer.setMessageListener(this.businessMessageListener); - this.businessMessageListenerContainer.start(); +// this.businessMessageListenerContainer.start(); } @Override diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java index 3b34c1f..b3d3250 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java @@ -96,36 +96,50 @@ public class ModbusMessagePersistListener implements BatchMessageListener { RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue); ModbusResponse modbusResponse = response.getModbusResponse(); - short[] values = null; + byte[] byteValues = null; + short[] shortValues = null; switch (modbusResponse.getFunctionCode()) { case FunctionCode.READ_COILS -> { - values = ((ReadCoilsResponse) modbusResponse).getShortData(); + byteValues = ((ReadCoilsResponse) modbusResponse).getData(); } case FunctionCode.READ_DISCRETE_INPUTS -> { - values = ((ReadDiscreteInputsResponse) modbusResponse).getShortData(); + byteValues = ((ReadDiscreteInputsResponse) modbusResponse).getData(); } case FunctionCode.READ_INPUT_REGISTERS -> { - values = ((ReadInputRegistersResponse) modbusResponse).getShortData(); + shortValues = ((ReadInputRegistersResponse) modbusResponse).getShortData(); } case FunctionCode.READ_HOLDING_REGISTERS -> { - values = ((ReadHoldingRegistersResponse) modbusResponse).getShortData(); + shortValues = ((ReadHoldingRegistersResponse) modbusResponse).getShortData(); } case FunctionCode.READ_EXCEPTION_STATUS -> { - values = new short[]{((ReadExceptionStatusResponse) modbusResponse).getExceptionStatus()}; + shortValues = new short[]{((ReadExceptionStatusResponse) modbusResponse).getExceptionStatus()}; } default -> { throw new RuntimeException("Funcetion code not supported: " + modbusResponse.getFunctionCode()); } } - for (short value : values) { - stepSize = index * 4; - messagePointMap.put(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'), - ModbusMessage.MessagePoint.builder() - .parseValue(String.valueOf(value)) - .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize)) - .build()); - index++; - startAddress++; + if (Objects.nonNull(shortValues)) { + for (short value : shortValues) { + stepSize = index * 4; + messagePointMap.put(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'), + ModbusMessage.MessagePoint.builder() + .parseValue(String.valueOf(value)) + .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 10 + stepSize)) + .build()); + index++; + startAddress++; + } + } else if (Objects.nonNull(byteValues)) { + for (byte value : byteValues) { + stepSize = index * 2; + messagePointMap.put(StringUtils.leftPad(String.valueOf(startAddress), 4, '0'), + ModbusMessage.MessagePoint.builder() + .parseValue(String.valueOf(value)) + .originalValue(StringUtils.substring(collectionMessage, 6 + stepSize, 8 + stepSize)) + .build()); + index++; + startAddress++; + } } } catch (Exception e) { log.error("初始数据解析异常: {}", messageString, e);