diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java index 8b7826d..5a04522 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java @@ -89,7 +89,7 @@ public class Redis2DBPersistenceService { } } tableName = DEFAULT_DATA_TABLE + deviceId; - queueName = String.format(Queues.DEVICE_EVENTS, deviceId); + queueName = Queues.DEVICE_EVENTS + (deviceId % 10); persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); if (ObjectUtils.isEmpty(existsTableList) @@ -133,7 +133,7 @@ public class Redis2DBPersistenceService { } return idGatewayMappingMap; } catch (Exception e) { - log.error("Get online devices list fail", e); + log.warn("Get online devices list fail", e); return null; } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java index a69758f..1e8fd7a 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java @@ -5,72 +5,54 @@ package com.isu.gaswellwatch.modbus.data.listener; * 2024/11/23 0:32 */ -import cn.hutool.core.map.MapUtil; -import org.apache.commons.lang3.StringUtils; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; -import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; -import java.util.Map; +import java.util.stream.IntStream; @Component public class DynamicRabbitListener implements ApplicationRunner { - private static final String DEVICE_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d where d.id = "; - private static final String GATEWAY_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d ORDER BY d.id "; - private final AmqpAdmin amqpAdmin; - private final JdbcTemplate jdbcTemplate; private final ComposeModbusMessageListener composeListener; private final SimpleMessageListenerContainer modbusMessageListenerContainer; - public DynamicRabbitListener(ConnectionFactory connectionFactory, JdbcTemplate jdbcTemplate) { - this.jdbcTemplate = jdbcTemplate; + public DynamicRabbitListener(ConnectionFactory connectionFactory) { this.amqpAdmin = new RabbitAdmin(connectionFactory); + this.composeListener = new ComposeModbusMessageListener(); this.modbusMessageListenerContainer = new SimpleMessageListenerContainer(); this.modbusMessageListenerContainer.setConnectionFactory(connectionFactory); - this.modbusMessageListenerContainer.setMessageListener(this.composeListener = new ComposeModbusMessageListener()); + this.modbusMessageListenerContainer.setMessageListener(this.composeListener); // 启动监听容器 this.modbusMessageListenerContainer.start(); } @Override public void run(ApplicationArguments args) { - this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener()); +// this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener()); this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener()); - this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(this::registerDeviceAndListener); + IntStream.range(0, 10).forEach(index -> { + this.addEventListener(Queues.DEVICE_EVENTS + index); + this.addCollectDataListener(Queues.MODBUS_COLLECT_DATA + index); + }); } - public void registerDeviceAndListener(Long deviceId) { - this.jdbcTemplate.queryForList(DEVICE_SQL + deviceId).forEach(this::registerDeviceAndListener); - } - - public void addListenerCollectQueue(Queue queue, String exchangeName, String routingKey) { - if (StringUtils.isNotBlank(exchangeName) && StringUtils.isNotBlank(routingKey)) { - // 声明直接类型的交换器 - DirectExchange exchange = new DirectExchange(exchangeName); - this.amqpAdmin.declareExchange(exchange); - // 绑定队列和交换器 - this.amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey)); - } - // 设置监听的队列 + private void addCollectDataListener(String queueName) { + Queue queue = QueueBuilder.durable(queueName).build(); + this.amqpAdmin.declareQueue(queue); this.modbusMessageListenerContainer.addQueues(queue); } - private void registerDeviceAndListener(Map deviceMap) { - Queue eventQueue = QueueBuilder.durable(String.format(Queues.DEVICE_EVENTS, - MapUtil.getStr(deviceMap, "id"))).build(); - Queue collectQueue = QueueBuilder.durable(String.format(Queues.MODBUS_COLLECT_DATA, - MapUtil.getStr(deviceMap, "identifier"))).build(); - this.amqpAdmin.declareQueue(eventQueue); - this.amqpAdmin.declareQueue(collectQueue); - this.addListenerCollectQueue(collectQueue, null, null); + private void addEventListener(String queueName) { + this.amqpAdmin.declareQueue(QueueBuilder.durable(queueName).build()); } } \ No newline at end of file diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java index 323dbca..b42ff5e 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java @@ -5,7 +5,6 @@ package com.isu.gaswellwatch.modbus.data.listener; * 2024/11/26 12:37 */ public final class Queues { - - public static final String DEVICE_EVENTS = "/device/%s/events"; - public static final String MODBUS_COLLECT_DATA = "/modbus/device/%s/collect"; + public static final String DEVICE_EVENTS = "/device/events/"; + public static final String MODBUS_COLLECT_DATA = "/modbus/collect/"; } diff --git a/src/main/java/com/isu/gaswellwatch/service/impl/DeviceServiceImpl.java b/src/main/java/com/isu/gaswellwatch/service/impl/DeviceServiceImpl.java index 399492c..bbbf0f7 100644 --- a/src/main/java/com/isu/gaswellwatch/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/isu/gaswellwatch/service/impl/DeviceServiceImpl.java @@ -8,12 +8,18 @@ import com.isu.gaswellwatch.config.SnowflakeConfig; import com.isu.gaswellwatch.dao.DeviceDao; import com.isu.gaswellwatch.dto.DeviceCreateDTO; import com.isu.gaswellwatch.dto.DeviceEditDTO; -import com.isu.gaswellwatch.entity.*; +import com.isu.gaswellwatch.entity.Device; +import com.isu.gaswellwatch.entity.DeviceOptLog; +import com.isu.gaswellwatch.entity.Dictionary; +import com.isu.gaswellwatch.entity.GasWell; import com.isu.gaswellwatch.exception.BusinessException; import com.isu.gaswellwatch.modbus.data.PersistenceHandler; import com.isu.gaswellwatch.modbus.data.Redis2DBPersistenceService; import com.isu.gaswellwatch.modbus.data.listener.DynamicRabbitListener; -import com.isu.gaswellwatch.service.*; +import com.isu.gaswellwatch.service.DeviceOptLogService; +import com.isu.gaswellwatch.service.DeviceService; +import com.isu.gaswellwatch.service.DictionaryService; +import com.isu.gaswellwatch.service.GasWellService; import com.isu.gaswellwatch.utils.ConverterUtil; import com.isu.gaswellwatch.vo.DeviceHistoryVO; import com.isu.gaswellwatch.vo.DeviceVO; @@ -52,50 +58,50 @@ public class DeviceServiceImpl extends ServiceImpl implements private DynamicRabbitListener dynamicRabbitListener; @Override - public Page page(Integer currentPage, Integer pageSize, String gasWellName, String gasStationName, Long deviceTypeId, Long blockId){ - Page page = deviceDao.page(new Page<>(currentPage, pageSize),gasWellName,gasStationName,deviceTypeId,blockId); + public Page page(Integer currentPage, Integer pageSize, String gasWellName, String gasStationName, Long deviceTypeId, Long blockId) { + Page page = this.deviceDao.page(new Page<>(currentPage, pageSize), gasWellName, gasStationName, deviceTypeId, blockId); List deviceVOList = page.getRecords(); // 从Redis获取设备运行数据 - if(CollectionUtil.isNotEmpty(deviceVOList)) { - Map runModeMap = dictionaryService.getValueMapByType("runMode"); - Map plugStatusMap = dictionaryService.getValueMapByType("plugStatus"); + if (CollectionUtil.isNotEmpty(deviceVOList)) { + Map runModeMap = this.dictionaryService.getValueMapByType("runMode"); + Map plugStatusMap = this.dictionaryService.getValueMapByType("plugStatus"); try { for (DeviceVO deviceVO : deviceVOList) { String deviceKey = PersistenceHandler.DEVICE_DATA_CACHE + deviceVO.getId(); - Object casPressure = redisTemplate.opsForHash().get(deviceKey, "casPressure"); + Object casPressure = this.redisTemplate.opsForHash().get(deviceKey, "casPressure"); deviceVO.setCasPressure(casPressure == null ? "" : casPressure.toString()); - Object oilPressure = redisTemplate.opsForHash().get(deviceKey, "oilPressure"); + Object oilPressure = this.redisTemplate.opsForHash().get(deviceKey, "oilPressure"); deviceVO.setOilPressure(oilPressure == null ? "" : oilPressure.toString()); - Object prePressure = redisTemplate.opsForHash().get(deviceKey, "prePressure"); + Object prePressure = this.redisTemplate.opsForHash().get(deviceKey, "prePressure"); deviceVO.setPrePressure(prePressure == null ? "" : prePressure.toString()); - Object online = redisTemplate.opsForHash().get(deviceKey, "online"); + Object online = this.redisTemplate.opsForHash().get(deviceKey, "online"); deviceVO.setOnline(online == null ? "" : online.toString()); - Object gas_status = redisTemplate.opsForHash().get(deviceKey, "gas_status"); + Object gas_status = this.redisTemplate.opsForHash().get(deviceKey, "gas_status"); deviceVO.setWellCtl(gas_status == null ? "" : gas_status.toString()); - Object runMode = redisTemplate.opsForHash().get(deviceKey, "runMode"); - if(runMode==null) { + Object runMode = this.redisTemplate.opsForHash().get(deviceKey, "runMode"); + if (runMode == null) { deviceVO.setRunMode(""); - }else { + } else { Dictionary runMode1 = runModeMap.get(runMode.toString()); - deviceVO.setRunMode(runMode1==null?"":runMode1.getName()); + deviceVO.setRunMode(runMode1 == null ? "" : runMode1.getName()); } - Object plugStatus = redisTemplate.opsForHash().get(deviceKey, "plugStatus"); - if(plugStatus==null) { + Object plugStatus = this.redisTemplate.opsForHash().get(deviceKey, "plugStatus"); + if (plugStatus == null) { deviceVO.setPlugStatus(""); - }else { + } else { Dictionary plugStatus1 = plugStatusMap.get(plugStatus.toString()); - deviceVO.setPlugStatus(plugStatus1==null?"":plugStatus1.getName()); + deviceVO.setPlugStatus(plugStatus1 == null ? "" : plugStatus1.getName()); } } - }catch (RedisConnectionFailureException e){ - log.error("redis连接失败,请检查redis连接"); + } catch (RedisConnectionFailureException e) { + this.log.error("redis连接失败,请检查redis连接"); } } @@ -103,55 +109,52 @@ public class DeviceServiceImpl extends ServiceImpl implements } @Override - public void add(DeviceCreateDTO deviceCreateDTO) throws NumberFormatException{ + public void add(DeviceCreateDTO deviceCreateDTO) throws NumberFormatException { //查重 - List list = list(new LambdaQueryWrapper().eq(Device::getId, deviceCreateDTO.getCode())); - if(CollectionUtil.isNotEmpty(list)) { + List list = this.list(new LambdaQueryWrapper().eq(Device::getId, deviceCreateDTO.getCode())); + if (CollectionUtil.isNotEmpty(list)) { throw new BusinessException("已有相同设备编码,请重新输入"); } - GasWell gasWell = gasWellService.getOne(new LambdaQueryWrapper().eq(GasWell::getId, deviceCreateDTO.getGasWell())); - if(gasWell==null) { + GasWell gasWell = this.gasWellService.getOne(new LambdaQueryWrapper().eq(GasWell::getId, deviceCreateDTO.getGasWell())); + if (gasWell == null) { throw new BusinessException("该气井不存在"); } - if(gasWell.getDeviceId()!=null) { + if (gasWell.getDeviceId() != null) { throw new BusinessException("该气井已存在绑定设备,无法添加新设备"); } Device device = ConverterUtil.convert(deviceCreateDTO, Device.class); //code必须为整形,用于在缓存中查询设备的上报数据 device.setId(Long.valueOf(deviceCreateDTO.getCode())); - save(device); + this.save(device); //在气井中绑定设备 - gasWellService.bindDevice(deviceCreateDTO.getGasWell(),device.getId()); - - //创建该设备在mq中的事件队列 - dynamicRabbitListener.registerDeviceAndListener(device.getId()); + this.gasWellService.bindDevice(deviceCreateDTO.getGasWell(), device.getId()); } @Override - public void edit(DeviceEditDTO deviceEditDTO){ - updateById(ConverterUtil.convert(deviceEditDTO, Device.class)); + public void edit(DeviceEditDTO deviceEditDTO) { + this.updateById(ConverterUtil.convert(deviceEditDTO, Device.class)); } @Override - public void delete(Long id){ - Long gasWellId = getDevice(id).getGasWell().getId(); + public void delete(Long id) { + Long gasWellId = this.getDevice(id).getGasWell().getId(); //删除设备 - removeById(id); + this.removeById(id); //解绑气井 - gasWellService.unbindDevice(gasWellId); + this.gasWellService.unbindDevice(gasWellId); } @Override public DeviceVO getDevice(Long id) { - return deviceDao.getDeviceById(id); + return this.deviceDao.getDeviceById(id); } @Override - public Map getDeviceControlData(Long deviceId) { - return redisTemplate.opsForHash().entries(PersistenceHandler.DEVICE_DATA_CACHE + deviceId); + public Map getDeviceControlData(Long deviceId) { + return this.redisTemplate.opsForHash().entries(PersistenceHandler.DEVICE_DATA_CACHE + deviceId); } @Override @@ -159,18 +162,18 @@ public class DeviceServiceImpl extends ServiceImpl implements Date start = null; Date end = null; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - if(!StringUtils.isEmpty(startTime) ) { + if (!StringUtils.isEmpty(startTime)) { start = simpleDateFormat.parse(startTime); } - if(!StringUtils.isEmpty(endTime)) { + if (!StringUtils.isEmpty(endTime)) { end = simpleDateFormat.parse(endTime); } - return deviceOptLogService.page(new Page<>(currentPage, pageSize),start,end,deviceId); + return this.deviceOptLogService.page(new Page<>(currentPage, pageSize), start, end, deviceId); } @Override public List getDeviceVOByIds(List deviceIdList) { - return deviceDao.getDeviceVOByIds(deviceIdList); + return this.deviceDao.getDeviceVOByIds(deviceIdList); } @@ -179,28 +182,27 @@ public class DeviceServiceImpl extends ServiceImpl implements Date start = null; Date end = null; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - if(!StringUtils.isEmpty(startTime) ) { + if (!StringUtils.isEmpty(startTime)) { start = simpleDateFormat.parse(startTime); } - if(!StringUtils.isEmpty(endTime)) { + if (!StringUtils.isEmpty(endTime)) { end = simpleDateFormat.parse(endTime); } - String tableName = Redis2DBPersistenceService.DEFAULT_DATA_TABLE +deviceId; - Page page = deviceDao.historyPage(new Page<>(currentPage, pageSize),start,end,deviceId,tableName); + String tableName = Redis2DBPersistenceService.DEFAULT_DATA_TABLE + deviceId; + Page page = this.deviceDao.historyPage(new Page<>(currentPage, pageSize), start, end, deviceId, tableName); List deviceHistoryVO = page.getRecords(); - if(CollectionUtil.isNotEmpty(deviceHistoryVO)) { - Map runModeMap = dictionaryService.getValueMapByType("runMode"); - Map plugStatusMap = dictionaryService.getValueMapByType("plugStatus"); + if (CollectionUtil.isNotEmpty(deviceHistoryVO)) { + Map runModeMap = this.dictionaryService.getValueMapByType("runMode"); + Map plugStatusMap = this.dictionaryService.getValueMapByType("plugStatus"); for (DeviceHistoryVO deviceVO : deviceHistoryVO) { - deviceVO.setRunMode(StringUtils.isEmpty(deviceVO.getRunMode()) ?"":runModeMap.get(deviceVO.getRunMode()).getName()); - deviceVO.setPlugStatus(StringUtils.isEmpty(deviceVO.getPlugStatus()) ?"":plugStatusMap.get(deviceVO.getPlugStatus()).getName()); + deviceVO.setRunMode(StringUtils.isEmpty(deviceVO.getRunMode()) ? "" : runModeMap.get(deviceVO.getRunMode()).getName()); + deviceVO.setPlugStatus(StringUtils.isEmpty(deviceVO.getPlugStatus()) ? "" : plugStatusMap.get(deviceVO.getPlugStatus()).getName()); } } return page; } - } diff --git a/src/main/java/com/isu/gaswellwatch/vo/command/etc/PlungerTimer.java b/src/main/java/com/isu/gaswellwatch/vo/command/etc/PlungerTimer.java index 2c73516..c1b4a19 100644 --- a/src/main/java/com/isu/gaswellwatch/vo/command/etc/PlungerTimer.java +++ b/src/main/java/com/isu/gaswellwatch/vo/command/etc/PlungerTimer.java @@ -26,7 +26,7 @@ public class PlungerTimer extends Command implements Timing { private static final long serialVersionUID = 888739283997908251L; /** - * 柱塞上升时间。[0-999]:[0-59]:[0-59] + * 柱塞上升时间。000:00:00 – 499:59:59 */ @NotBlank(message = "柱塞上升时间不能为空") private String plungerRiseTime; @@ -53,7 +53,7 @@ public class PlungerTimer extends Command implements Timing { } public void setPlungerRiseTime(String plungerRiseTime) { - this.plungerRiseTime = this.timingValidate(plungerRiseTime, "plungerRiseTime"); + this.plungerRiseTime = this.timeValidate(plungerRiseTime, "plungerRiseTime", 499); } } diff --git a/src/main/java/com/isu/gaswellwatch/vo/command/etc/SensorDelay.java b/src/main/java/com/isu/gaswellwatch/vo/command/etc/SensorDelay.java index 619d210..b68ff43 100644 --- a/src/main/java/com/isu/gaswellwatch/vo/command/etc/SensorDelay.java +++ b/src/main/java/com/isu/gaswellwatch/vo/command/etc/SensorDelay.java @@ -25,17 +25,17 @@ public class SensorDelay extends Command implements Timing { @Serial private static final long serialVersionUID = -126267138442242492L; /** - * 到达传感器延时时间。[0-999]:[0-59]:[0-59] + * 到达传感器延时时间。000:00:00 – 499:59:57 */ @NotBlank(message = "到达传感器延时时间不能为空") private String arrivalSensorDelayTime; /** - * 关井时间。[0-999]:[0-59]:[0-59] + * 关井时间。000:00:00 – 500:00:00 */ @NotBlank(message = "关井时间不能为空") private String wellShutInTime; /** - * 续流时间。[0-999]:[0-59]:[0-59] + * 续流时间。000:00:00 – 500:00:00 */ @NotBlank(message = "续流时间不能为空") private String afterFlowTime; @@ -66,15 +66,15 @@ public class SensorDelay extends Command implements Timing { } public void setAfterFlowTime(String afterFlowTime) { - this.afterFlowTime = this.timingValidate(afterFlowTime, "afterFlowTime"); + this.afterFlowTime = this.timeValidate(afterFlowTime, "afterFlowTime", 500); } public void setWellShutInTime(String wellShutInTime) { - this.wellShutInTime = this.timingValidate(wellShutInTime, "wellShutInTime"); + this.wellShutInTime = this.timeValidate(wellShutInTime, "wellShutInTime", 500); } public void setArrivalSensorDelayTime(String arrivalSensorDelayTime) { - this.arrivalSensorDelayTime = this.timingValidate(arrivalSensorDelayTime, "arrivalSensorDelayTime"); + this.arrivalSensorDelayTime = this.timeValidate(arrivalSensorDelayTime, "arrivalSensorDelayTime", 499); } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 5a79c3a..6c125d7 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -29,11 +29,11 @@ spring: # manual:手动ack,需要在业务代码结束后,调用api发送ack。 # auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack # none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除 - acknowledge-mode: auto + acknowledge-mode: none concurrency: 10 retry: # 开启消费者失败重试 - enabled: true + enabled: false # 初始的失败等待时长为1秒 initial-interval: 1000ms # 失败的等待时长倍数,下次等待时长 = multiplier * initial-interval @@ -41,7 +41,7 @@ spring: # 最大重试次数 max-attempts: 3 # true无状态;false有状态。如果业务中包含事务,这里改为false - stateless: true + stateless: false server: