Merge remote-tracking branch 'origin/develop' into develop

This commit is contained in:
qinjie 2024-12-01 15:26:45 +08:00
commit cd81ccf116
7 changed files with 90 additions and 107 deletions

View File

@ -89,7 +89,7 @@ public class Redis2DBPersistenceService {
} }
} }
tableName = DEFAULT_DATA_TABLE + deviceId; tableName = DEFAULT_DATA_TABLE + deviceId;
queueName = String.format(Queues.DEVICE_EVENTS, deviceId); queueName = Queues.DEVICE_EVENTS + (deviceId % 10);
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode);
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
if (ObjectUtils.isEmpty(existsTableList) if (ObjectUtils.isEmpty(existsTableList)
@ -133,7 +133,7 @@ public class Redis2DBPersistenceService {
} }
return idGatewayMappingMap; return idGatewayMappingMap;
} catch (Exception e) { } catch (Exception e) {
log.error("Get online devices list fail", e); log.warn("Get online devices list fail", e);
return null; return null;
} }
} }

View File

@ -5,72 +5,54 @@ package com.isu.gaswellwatch.modbus.data.listener;
* 2024/11/23 0:32 * 2024/11/23 0:32
*/ */
import cn.hutool.core.map.MapUtil; import org.springframework.amqp.core.AmqpAdmin;
import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.*; import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Map; import java.util.stream.IntStream;
@Component @Component
public class DynamicRabbitListener implements ApplicationRunner { public class DynamicRabbitListener implements ApplicationRunner {
private static final String DEVICE_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d where d.id = ";
private static final String GATEWAY_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d ORDER BY d.id ";
private final AmqpAdmin amqpAdmin; private final AmqpAdmin amqpAdmin;
private final JdbcTemplate jdbcTemplate;
private final ComposeModbusMessageListener composeListener; private final ComposeModbusMessageListener composeListener;
private final SimpleMessageListenerContainer modbusMessageListenerContainer; private final SimpleMessageListenerContainer modbusMessageListenerContainer;
public DynamicRabbitListener(ConnectionFactory connectionFactory, JdbcTemplate jdbcTemplate) { public DynamicRabbitListener(ConnectionFactory connectionFactory) {
this.jdbcTemplate = jdbcTemplate;
this.amqpAdmin = new RabbitAdmin(connectionFactory); this.amqpAdmin = new RabbitAdmin(connectionFactory);
this.composeListener = new ComposeModbusMessageListener();
this.modbusMessageListenerContainer = new SimpleMessageListenerContainer(); this.modbusMessageListenerContainer = new SimpleMessageListenerContainer();
this.modbusMessageListenerContainer.setConnectionFactory(connectionFactory); this.modbusMessageListenerContainer.setConnectionFactory(connectionFactory);
this.modbusMessageListenerContainer.setMessageListener(this.composeListener = new ComposeModbusMessageListener()); this.modbusMessageListenerContainer.setMessageListener(this.composeListener);
// 启动监听容器 // 启动监听容器
this.modbusMessageListenerContainer.start(); this.modbusMessageListenerContainer.start();
} }
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener()); // this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener()); this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener());
this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(this::registerDeviceAndListener); IntStream.range(0, 10).forEach(index -> {
this.addEventListener(Queues.DEVICE_EVENTS + index);
this.addCollectDataListener(Queues.MODBUS_COLLECT_DATA + index);
});
} }
public void registerDeviceAndListener(Long deviceId) { private void addCollectDataListener(String queueName) {
this.jdbcTemplate.queryForList(DEVICE_SQL + deviceId).forEach(this::registerDeviceAndListener); Queue queue = QueueBuilder.durable(queueName).build();
} this.amqpAdmin.declareQueue(queue);
public void addListenerCollectQueue(Queue queue, String exchangeName, String routingKey) {
if (StringUtils.isNotBlank(exchangeName) && StringUtils.isNotBlank(routingKey)) {
// 声明直接类型的交换器
DirectExchange exchange = new DirectExchange(exchangeName);
this.amqpAdmin.declareExchange(exchange);
// 绑定队列和交换器
this.amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
}
// 设置监听的队列
this.modbusMessageListenerContainer.addQueues(queue); this.modbusMessageListenerContainer.addQueues(queue);
} }
private void registerDeviceAndListener(Map<String, Object> deviceMap) { private void addEventListener(String queueName) {
Queue eventQueue = QueueBuilder.durable(String.format(Queues.DEVICE_EVENTS, this.amqpAdmin.declareQueue(QueueBuilder.durable(queueName).build());
MapUtil.getStr(deviceMap, "id"))).build();
Queue collectQueue = QueueBuilder.durable(String.format(Queues.MODBUS_COLLECT_DATA,
MapUtil.getStr(deviceMap, "identifier"))).build();
this.amqpAdmin.declareQueue(eventQueue);
this.amqpAdmin.declareQueue(collectQueue);
this.addListenerCollectQueue(collectQueue, null, null);
} }
} }

View File

@ -5,7 +5,6 @@ package com.isu.gaswellwatch.modbus.data.listener;
* 2024/11/26 12:37 * 2024/11/26 12:37
*/ */
public final class Queues { public final class Queues {
public static final String DEVICE_EVENTS = "/device/events/";
public static final String DEVICE_EVENTS = "/device/%s/events"; public static final String MODBUS_COLLECT_DATA = "/modbus/collect/";
public static final String MODBUS_COLLECT_DATA = "/modbus/device/%s/collect";
} }

View File

@ -8,12 +8,18 @@ import com.isu.gaswellwatch.config.SnowflakeConfig;
import com.isu.gaswellwatch.dao.DeviceDao; import com.isu.gaswellwatch.dao.DeviceDao;
import com.isu.gaswellwatch.dto.DeviceCreateDTO; import com.isu.gaswellwatch.dto.DeviceCreateDTO;
import com.isu.gaswellwatch.dto.DeviceEditDTO; import com.isu.gaswellwatch.dto.DeviceEditDTO;
import com.isu.gaswellwatch.entity.*; import com.isu.gaswellwatch.entity.Device;
import com.isu.gaswellwatch.entity.DeviceOptLog;
import com.isu.gaswellwatch.entity.Dictionary;
import com.isu.gaswellwatch.entity.GasWell;
import com.isu.gaswellwatch.exception.BusinessException; import com.isu.gaswellwatch.exception.BusinessException;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler; import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import com.isu.gaswellwatch.modbus.data.Redis2DBPersistenceService; import com.isu.gaswellwatch.modbus.data.Redis2DBPersistenceService;
import com.isu.gaswellwatch.modbus.data.listener.DynamicRabbitListener; import com.isu.gaswellwatch.modbus.data.listener.DynamicRabbitListener;
import com.isu.gaswellwatch.service.*; import com.isu.gaswellwatch.service.DeviceOptLogService;
import com.isu.gaswellwatch.service.DeviceService;
import com.isu.gaswellwatch.service.DictionaryService;
import com.isu.gaswellwatch.service.GasWellService;
import com.isu.gaswellwatch.utils.ConverterUtil; import com.isu.gaswellwatch.utils.ConverterUtil;
import com.isu.gaswellwatch.vo.DeviceHistoryVO; import com.isu.gaswellwatch.vo.DeviceHistoryVO;
import com.isu.gaswellwatch.vo.DeviceVO; import com.isu.gaswellwatch.vo.DeviceVO;
@ -53,39 +59,39 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
@Override @Override
public Page<DeviceVO> page(Integer currentPage, Integer pageSize, String gasWellName, String gasStationName, Long deviceTypeId, Long blockId) { public Page<DeviceVO> page(Integer currentPage, Integer pageSize, String gasWellName, String gasStationName, Long deviceTypeId, Long blockId) {
Page<DeviceVO> page = deviceDao.page(new Page<>(currentPage, pageSize),gasWellName,gasStationName,deviceTypeId,blockId); Page<DeviceVO> page = this.deviceDao.page(new Page<>(currentPage, pageSize), gasWellName, gasStationName, deviceTypeId, blockId);
List<DeviceVO> deviceVOList = page.getRecords(); List<DeviceVO> deviceVOList = page.getRecords();
// 从Redis获取设备运行数据 // 从Redis获取设备运行数据
if (CollectionUtil.isNotEmpty(deviceVOList)) { if (CollectionUtil.isNotEmpty(deviceVOList)) {
Map<String, Dictionary> runModeMap = dictionaryService.getValueMapByType("runMode"); Map<String, Dictionary> runModeMap = this.dictionaryService.getValueMapByType("runMode");
Map<String, Dictionary> plugStatusMap = dictionaryService.getValueMapByType("plugStatus"); Map<String, Dictionary> plugStatusMap = this.dictionaryService.getValueMapByType("plugStatus");
try { try {
for (DeviceVO deviceVO : deviceVOList) { for (DeviceVO deviceVO : deviceVOList) {
String deviceKey = PersistenceHandler.DEVICE_DATA_CACHE + deviceVO.getId(); String deviceKey = PersistenceHandler.DEVICE_DATA_CACHE + deviceVO.getId();
Object casPressure = redisTemplate.opsForHash().get(deviceKey, "casPressure"); Object casPressure = this.redisTemplate.opsForHash().get(deviceKey, "casPressure");
deviceVO.setCasPressure(casPressure == null ? "" : casPressure.toString()); deviceVO.setCasPressure(casPressure == null ? "" : casPressure.toString());
Object oilPressure = redisTemplate.opsForHash().get(deviceKey, "oilPressure"); Object oilPressure = this.redisTemplate.opsForHash().get(deviceKey, "oilPressure");
deviceVO.setOilPressure(oilPressure == null ? "" : oilPressure.toString()); deviceVO.setOilPressure(oilPressure == null ? "" : oilPressure.toString());
Object prePressure = redisTemplate.opsForHash().get(deviceKey, "prePressure"); Object prePressure = this.redisTemplate.opsForHash().get(deviceKey, "prePressure");
deviceVO.setPrePressure(prePressure == null ? "" : prePressure.toString()); deviceVO.setPrePressure(prePressure == null ? "" : prePressure.toString());
Object online = redisTemplate.opsForHash().get(deviceKey, "online"); Object online = this.redisTemplate.opsForHash().get(deviceKey, "online");
deviceVO.setOnline(online == null ? "" : online.toString()); deviceVO.setOnline(online == null ? "" : online.toString());
Object gas_status = redisTemplate.opsForHash().get(deviceKey, "gas_status"); Object gas_status = this.redisTemplate.opsForHash().get(deviceKey, "gas_status");
deviceVO.setWellCtl(gas_status == null ? "" : gas_status.toString()); deviceVO.setWellCtl(gas_status == null ? "" : gas_status.toString());
Object runMode = redisTemplate.opsForHash().get(deviceKey, "runMode"); Object runMode = this.redisTemplate.opsForHash().get(deviceKey, "runMode");
if (runMode == null) { if (runMode == null) {
deviceVO.setRunMode(""); deviceVO.setRunMode("");
} else { } else {
Dictionary runMode1 = runModeMap.get(runMode.toString()); Dictionary runMode1 = runModeMap.get(runMode.toString());
deviceVO.setRunMode(runMode1 == null ? "" : runMode1.getName()); deviceVO.setRunMode(runMode1 == null ? "" : runMode1.getName());
} }
Object plugStatus = redisTemplate.opsForHash().get(deviceKey, "plugStatus"); Object plugStatus = this.redisTemplate.opsForHash().get(deviceKey, "plugStatus");
if (plugStatus == null) { if (plugStatus == null) {
deviceVO.setPlugStatus(""); deviceVO.setPlugStatus("");
} else { } else {
@ -95,7 +101,7 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
} }
} catch (RedisConnectionFailureException e) { } catch (RedisConnectionFailureException e) {
log.error("redis连接失败请检查redis连接"); this.log.error("redis连接失败请检查redis连接");
} }
} }
@ -105,12 +111,12 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
@Override @Override
public void add(DeviceCreateDTO deviceCreateDTO) throws NumberFormatException { public void add(DeviceCreateDTO deviceCreateDTO) throws NumberFormatException {
//查重 //查重
List<Device> list = list(new LambdaQueryWrapper<Device>().eq(Device::getId, deviceCreateDTO.getCode())); List<Device> list = this.list(new LambdaQueryWrapper<Device>().eq(Device::getId, deviceCreateDTO.getCode()));
if (CollectionUtil.isNotEmpty(list)) { if (CollectionUtil.isNotEmpty(list)) {
throw new BusinessException("已有相同设备编码,请重新输入"); throw new BusinessException("已有相同设备编码,请重新输入");
} }
GasWell gasWell = gasWellService.getOne(new LambdaQueryWrapper<GasWell>().eq(GasWell::getId, deviceCreateDTO.getGasWell())); GasWell gasWell = this.gasWellService.getOne(new LambdaQueryWrapper<GasWell>().eq(GasWell::getId, deviceCreateDTO.getGasWell()));
if (gasWell == null) { if (gasWell == null) {
throw new BusinessException("该气井不存在"); throw new BusinessException("该气井不存在");
} }
@ -121,37 +127,34 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
Device device = ConverterUtil.convert(deviceCreateDTO, Device.class); Device device = ConverterUtil.convert(deviceCreateDTO, Device.class);
//code必须为整形用于在缓存中查询设备的上报数据 //code必须为整形用于在缓存中查询设备的上报数据
device.setId(Long.valueOf(deviceCreateDTO.getCode())); device.setId(Long.valueOf(deviceCreateDTO.getCode()));
save(device); this.save(device);
//在气井中绑定设备 //在气井中绑定设备
gasWellService.bindDevice(deviceCreateDTO.getGasWell(),device.getId()); this.gasWellService.bindDevice(deviceCreateDTO.getGasWell(), device.getId());
//创建该设备在mq中的事件队列
dynamicRabbitListener.registerDeviceAndListener(device.getId());
} }
@Override @Override
public void edit(DeviceEditDTO deviceEditDTO) { public void edit(DeviceEditDTO deviceEditDTO) {
updateById(ConverterUtil.convert(deviceEditDTO, Device.class)); this.updateById(ConverterUtil.convert(deviceEditDTO, Device.class));
} }
@Override @Override
public void delete(Long id) { public void delete(Long id) {
Long gasWellId = getDevice(id).getGasWell().getId(); Long gasWellId = this.getDevice(id).getGasWell().getId();
//删除设备 //删除设备
removeById(id); this.removeById(id);
//解绑气井 //解绑气井
gasWellService.unbindDevice(gasWellId); this.gasWellService.unbindDevice(gasWellId);
} }
@Override @Override
public DeviceVO getDevice(Long id) { public DeviceVO getDevice(Long id) {
return deviceDao.getDeviceById(id); return this.deviceDao.getDeviceById(id);
} }
@Override @Override
public Map<String, String> getDeviceControlData(Long deviceId) { public Map<String, String> getDeviceControlData(Long deviceId) {
return redisTemplate.opsForHash().entries(PersistenceHandler.DEVICE_DATA_CACHE + deviceId); return this.redisTemplate.opsForHash().entries(PersistenceHandler.DEVICE_DATA_CACHE + deviceId);
} }
@Override @Override
@ -165,12 +168,12 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
if (!StringUtils.isEmpty(endTime)) { if (!StringUtils.isEmpty(endTime)) {
end = simpleDateFormat.parse(endTime); end = simpleDateFormat.parse(endTime);
} }
return deviceOptLogService.page(new Page<>(currentPage, pageSize),start,end,deviceId); return this.deviceOptLogService.page(new Page<>(currentPage, pageSize), start, end, deviceId);
} }
@Override @Override
public List<DeviceVO> getDeviceVOByIds(List<Long> deviceIdList) { public List<DeviceVO> getDeviceVOByIds(List<Long> deviceIdList) {
return deviceDao.getDeviceVOByIds(deviceIdList); return this.deviceDao.getDeviceVOByIds(deviceIdList);
} }
@ -186,11 +189,11 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
end = simpleDateFormat.parse(endTime); end = simpleDateFormat.parse(endTime);
} }
String tableName = Redis2DBPersistenceService.DEFAULT_DATA_TABLE + deviceId; String tableName = Redis2DBPersistenceService.DEFAULT_DATA_TABLE + deviceId;
Page<DeviceHistoryVO> page = deviceDao.historyPage(new Page<>(currentPage, pageSize),start,end,deviceId,tableName); Page<DeviceHistoryVO> page = this.deviceDao.historyPage(new Page<>(currentPage, pageSize), start, end, deviceId, tableName);
List<DeviceHistoryVO> deviceHistoryVO = page.getRecords(); List<DeviceHistoryVO> deviceHistoryVO = page.getRecords();
if (CollectionUtil.isNotEmpty(deviceHistoryVO)) { if (CollectionUtil.isNotEmpty(deviceHistoryVO)) {
Map<String, Dictionary> runModeMap = dictionaryService.getValueMapByType("runMode"); Map<String, Dictionary> runModeMap = this.dictionaryService.getValueMapByType("runMode");
Map<String, Dictionary> plugStatusMap = dictionaryService.getValueMapByType("plugStatus"); Map<String, Dictionary> plugStatusMap = this.dictionaryService.getValueMapByType("plugStatus");
for (DeviceHistoryVO deviceVO : deviceHistoryVO) { for (DeviceHistoryVO deviceVO : deviceHistoryVO) {
deviceVO.setRunMode(StringUtils.isEmpty(deviceVO.getRunMode()) ? "" : runModeMap.get(deviceVO.getRunMode()).getName()); deviceVO.setRunMode(StringUtils.isEmpty(deviceVO.getRunMode()) ? "" : runModeMap.get(deviceVO.getRunMode()).getName());
@ -201,6 +204,5 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
} }
} }

View File

@ -26,7 +26,7 @@ public class PlungerTimer extends Command implements Timing {
private static final long serialVersionUID = 888739283997908251L; private static final long serialVersionUID = 888739283997908251L;
/** /**
* 柱塞上升时间[0-999]:[0-59]:[0-59] * 柱塞上升时间000:00:00 499:59:59
*/ */
@NotBlank(message = "柱塞上升时间不能为空") @NotBlank(message = "柱塞上升时间不能为空")
private String plungerRiseTime; private String plungerRiseTime;
@ -53,7 +53,7 @@ public class PlungerTimer extends Command implements Timing {
} }
public void setPlungerRiseTime(String plungerRiseTime) { public void setPlungerRiseTime(String plungerRiseTime) {
this.plungerRiseTime = this.timingValidate(plungerRiseTime, "plungerRiseTime"); this.plungerRiseTime = this.timeValidate(plungerRiseTime, "plungerRiseTime", 499);
} }
} }

View File

@ -25,17 +25,17 @@ public class SensorDelay extends Command implements Timing {
@Serial @Serial
private static final long serialVersionUID = -126267138442242492L; private static final long serialVersionUID = -126267138442242492L;
/** /**
* 到达传感器延时时间[0-999]:[0-59]:[0-59] * 到达传感器延时时间000:00:00 499:59:57
*/ */
@NotBlank(message = "到达传感器延时时间不能为空") @NotBlank(message = "到达传感器延时时间不能为空")
private String arrivalSensorDelayTime; private String arrivalSensorDelayTime;
/** /**
* 关井时间[0-999]:[0-59]:[0-59] * 关井时间000:00:00 500:00:00
*/ */
@NotBlank(message = "关井时间不能为空") @NotBlank(message = "关井时间不能为空")
private String wellShutInTime; private String wellShutInTime;
/** /**
* 续流时间[0-999]:[0-59]:[0-59] * 续流时间000:00:00 500:00:00
*/ */
@NotBlank(message = "续流时间不能为空") @NotBlank(message = "续流时间不能为空")
private String afterFlowTime; private String afterFlowTime;
@ -66,15 +66,15 @@ public class SensorDelay extends Command implements Timing {
} }
public void setAfterFlowTime(String afterFlowTime) { public void setAfterFlowTime(String afterFlowTime) {
this.afterFlowTime = this.timingValidate(afterFlowTime, "afterFlowTime"); this.afterFlowTime = this.timeValidate(afterFlowTime, "afterFlowTime", 500);
} }
public void setWellShutInTime(String wellShutInTime) { public void setWellShutInTime(String wellShutInTime) {
this.wellShutInTime = this.timingValidate(wellShutInTime, "wellShutInTime"); this.wellShutInTime = this.timeValidate(wellShutInTime, "wellShutInTime", 500);
} }
public void setArrivalSensorDelayTime(String arrivalSensorDelayTime) { public void setArrivalSensorDelayTime(String arrivalSensorDelayTime) {
this.arrivalSensorDelayTime = this.timingValidate(arrivalSensorDelayTime, "arrivalSensorDelayTime"); this.arrivalSensorDelayTime = this.timeValidate(arrivalSensorDelayTime, "arrivalSensorDelayTime", 499);
} }
} }

View File

@ -29,11 +29,11 @@ spring:
# manual手动ack需要在业务代码结束后调用api发送ack。 # manual手动ack需要在业务代码结束后调用api发送ack。
# auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack # auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack
# none关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除 # none关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除
acknowledge-mode: auto acknowledge-mode: none
concurrency: 10 concurrency: 10
retry: retry:
# 开启消费者失败重试 # 开启消费者失败重试
enabled: true enabled: false
# 初始的失败等待时长为1秒 # 初始的失败等待时长为1秒
initial-interval: 1000ms initial-interval: 1000ms
# 失败的等待时长倍数,下次等待时长 = multiplier * initial-interval # 失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
@ -41,7 +41,7 @@ spring:
# 最大重试次数 # 最大重试次数
max-attempts: 3 max-attempts: 3
# true无状态false有状态。如果业务中包含事务这里改为false # true无状态false有状态。如果业务中包含事务这里改为false
stateless: true stateless: false
server: server: