使用分片队列

This commit is contained in:
wangshilong 2024-11-28 23:35:05 +08:00
parent 59ecfea717
commit b463ddf006
5 changed files with 82 additions and 99 deletions

View File

@ -89,7 +89,7 @@ public class Redis2DBPersistenceService {
}
}
tableName = DEFAULT_DATA_TABLE + deviceId;
queueName = String.format(Queues.DEVICE_EVENTS, deviceId);
queueName = Queues.DEVICE_EVENTS + (deviceId % 10);
persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode);
existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName));
if (ObjectUtils.isEmpty(existsTableList)
@ -133,7 +133,7 @@ public class Redis2DBPersistenceService {
}
return idGatewayMappingMap;
} catch (Exception e) {
log.error("Get online devices list fail", e);
log.warn("Get online devices list fail", e);
return null;
}
}

View File

@ -5,72 +5,54 @@ package com.isu.gaswellwatch.modbus.data.listener;
* 2024/11/23 0:32
*/
import cn.hutool.core.map.MapUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.stream.IntStream;
@Component
public class DynamicRabbitListener implements ApplicationRunner {
private static final String DEVICE_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d where d.id = ";
private static final String GATEWAY_SQL = "SELECT d.id, d.gateway_sn AS identifier FROM device d ORDER BY d.id ";
private final AmqpAdmin amqpAdmin;
private final JdbcTemplate jdbcTemplate;
private final ComposeModbusMessageListener composeListener;
private final SimpleMessageListenerContainer modbusMessageListenerContainer;
public DynamicRabbitListener(ConnectionFactory connectionFactory, JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
public DynamicRabbitListener(ConnectionFactory connectionFactory) {
this.amqpAdmin = new RabbitAdmin(connectionFactory);
this.composeListener = new ComposeModbusMessageListener();
this.modbusMessageListenerContainer = new SimpleMessageListenerContainer();
this.modbusMessageListenerContainer.setConnectionFactory(connectionFactory);
this.modbusMessageListenerContainer.setMessageListener(this.composeListener = new ComposeModbusMessageListener());
this.modbusMessageListenerContainer.setMessageListener(this.composeListener);
// 启动监听容器
this.modbusMessageListenerContainer.start();
}
@Override
public void run(ApplicationArguments args) {
this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
// this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener());
this.jdbcTemplate.queryForList(GATEWAY_SQL).forEach(this::registerDeviceAndListener);
IntStream.range(0, 10).forEach(index -> {
this.addEventListener(Queues.DEVICE_EVENTS + index);
this.addCollectDataListener(Queues.MODBUS_COLLECT_DATA + index);
});
}
public void registerDeviceAndListener(Long deviceId) {
this.jdbcTemplate.queryForList(DEVICE_SQL + deviceId).forEach(this::registerDeviceAndListener);
}
public void addListenerCollectQueue(Queue queue, String exchangeName, String routingKey) {
if (StringUtils.isNotBlank(exchangeName) && StringUtils.isNotBlank(routingKey)) {
// 声明直接类型的交换器
DirectExchange exchange = new DirectExchange(exchangeName);
this.amqpAdmin.declareExchange(exchange);
// 绑定队列和交换器
this.amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
}
// 设置监听的队列
private void addCollectDataListener(String queueName) {
Queue queue = QueueBuilder.durable(queueName).build();
this.amqpAdmin.declareQueue(queue);
this.modbusMessageListenerContainer.addQueues(queue);
}
private void registerDeviceAndListener(Map<String, Object> deviceMap) {
Queue eventQueue = QueueBuilder.durable(String.format(Queues.DEVICE_EVENTS,
MapUtil.getStr(deviceMap, "id"))).build();
Queue collectQueue = QueueBuilder.durable(String.format(Queues.MODBUS_COLLECT_DATA,
MapUtil.getStr(deviceMap, "identifier"))).build();
this.amqpAdmin.declareQueue(eventQueue);
this.amqpAdmin.declareQueue(collectQueue);
this.addListenerCollectQueue(collectQueue, null, null);
private void addEventListener(String queueName) {
this.amqpAdmin.declareQueue(QueueBuilder.durable(queueName).build());
}
}

View File

@ -5,7 +5,6 @@ package com.isu.gaswellwatch.modbus.data.listener;
* 2024/11/26 12:37
*/
public final class Queues {
public static final String DEVICE_EVENTS = "/device/%s/events";
public static final String MODBUS_COLLECT_DATA = "/modbus/device/%s/collect";
public static final String DEVICE_EVENTS = "/device/events/";
public static final String MODBUS_COLLECT_DATA = "/modbus/collect/";
}

View File

@ -8,12 +8,18 @@ import com.isu.gaswellwatch.config.SnowflakeConfig;
import com.isu.gaswellwatch.dao.DeviceDao;
import com.isu.gaswellwatch.dto.DeviceCreateDTO;
import com.isu.gaswellwatch.dto.DeviceEditDTO;
import com.isu.gaswellwatch.entity.*;
import com.isu.gaswellwatch.entity.Device;
import com.isu.gaswellwatch.entity.DeviceOptLog;
import com.isu.gaswellwatch.entity.Dictionary;
import com.isu.gaswellwatch.entity.GasWell;
import com.isu.gaswellwatch.exception.BusinessException;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import com.isu.gaswellwatch.modbus.data.Redis2DBPersistenceService;
import com.isu.gaswellwatch.modbus.data.listener.DynamicRabbitListener;
import com.isu.gaswellwatch.service.*;
import com.isu.gaswellwatch.service.DeviceOptLogService;
import com.isu.gaswellwatch.service.DeviceService;
import com.isu.gaswellwatch.service.DictionaryService;
import com.isu.gaswellwatch.service.GasWellService;
import com.isu.gaswellwatch.utils.ConverterUtil;
import com.isu.gaswellwatch.vo.DeviceHistoryVO;
import com.isu.gaswellwatch.vo.DeviceVO;
@ -52,50 +58,50 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
private DynamicRabbitListener dynamicRabbitListener;
@Override
public Page<DeviceVO> page(Integer currentPage, Integer pageSize, String gasWellName, String gasStationName, Long deviceTypeId, Long blockId){
Page<DeviceVO> page = deviceDao.page(new Page<>(currentPage, pageSize),gasWellName,gasStationName,deviceTypeId,blockId);
public Page<DeviceVO> page(Integer currentPage, Integer pageSize, String gasWellName, String gasStationName, Long deviceTypeId, Long blockId) {
Page<DeviceVO> page = this.deviceDao.page(new Page<>(currentPage, pageSize), gasWellName, gasStationName, deviceTypeId, blockId);
List<DeviceVO> deviceVOList = page.getRecords();
// 从Redis获取设备运行数据
if(CollectionUtil.isNotEmpty(deviceVOList)) {
Map<String, Dictionary> runModeMap = dictionaryService.getValueMapByType("runMode");
Map<String, Dictionary> plugStatusMap = dictionaryService.getValueMapByType("plugStatus");
if (CollectionUtil.isNotEmpty(deviceVOList)) {
Map<String, Dictionary> runModeMap = this.dictionaryService.getValueMapByType("runMode");
Map<String, Dictionary> plugStatusMap = this.dictionaryService.getValueMapByType("plugStatus");
try {
for (DeviceVO deviceVO : deviceVOList) {
String deviceKey = PersistenceHandler.DEVICE_DATA_CACHE + deviceVO.getId();
Object casPressure = redisTemplate.opsForHash().get(deviceKey, "casPressure");
Object casPressure = this.redisTemplate.opsForHash().get(deviceKey, "casPressure");
deviceVO.setCasPressure(casPressure == null ? "" : casPressure.toString());
Object oilPressure = redisTemplate.opsForHash().get(deviceKey, "oilPressure");
Object oilPressure = this.redisTemplate.opsForHash().get(deviceKey, "oilPressure");
deviceVO.setOilPressure(oilPressure == null ? "" : oilPressure.toString());
Object prePressure = redisTemplate.opsForHash().get(deviceKey, "prePressure");
Object prePressure = this.redisTemplate.opsForHash().get(deviceKey, "prePressure");
deviceVO.setPrePressure(prePressure == null ? "" : prePressure.toString());
Object online = redisTemplate.opsForHash().get(deviceKey, "online");
Object online = this.redisTemplate.opsForHash().get(deviceKey, "online");
deviceVO.setOnline(online == null ? "" : online.toString());
Object gas_status = redisTemplate.opsForHash().get(deviceKey, "gas_status");
Object gas_status = this.redisTemplate.opsForHash().get(deviceKey, "gas_status");
deviceVO.setWellCtl(gas_status == null ? "" : gas_status.toString());
Object runMode = redisTemplate.opsForHash().get(deviceKey, "runMode");
if(runMode==null) {
Object runMode = this.redisTemplate.opsForHash().get(deviceKey, "runMode");
if (runMode == null) {
deviceVO.setRunMode("");
}else {
} else {
Dictionary runMode1 = runModeMap.get(runMode.toString());
deviceVO.setRunMode(runMode1==null?"":runMode1.getName());
deviceVO.setRunMode(runMode1 == null ? "" : runMode1.getName());
}
Object plugStatus = redisTemplate.opsForHash().get(deviceKey, "plugStatus");
if(plugStatus==null) {
Object plugStatus = this.redisTemplate.opsForHash().get(deviceKey, "plugStatus");
if (plugStatus == null) {
deviceVO.setPlugStatus("");
}else {
} else {
Dictionary plugStatus1 = plugStatusMap.get(plugStatus.toString());
deviceVO.setPlugStatus(plugStatus1==null?"":plugStatus1.getName());
deviceVO.setPlugStatus(plugStatus1 == null ? "" : plugStatus1.getName());
}
}
}catch (RedisConnectionFailureException e){
log.error("redis连接失败请检查redis连接");
} catch (RedisConnectionFailureException e) {
this.log.error("redis连接失败请检查redis连接");
}
}
@ -103,55 +109,52 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
}
@Override
public void add(DeviceCreateDTO deviceCreateDTO) throws NumberFormatException{
public void add(DeviceCreateDTO deviceCreateDTO) throws NumberFormatException {
//查重
List<Device> list = list(new LambdaQueryWrapper<Device>().eq(Device::getId, deviceCreateDTO.getCode()));
if(CollectionUtil.isNotEmpty(list)) {
List<Device> list = this.list(new LambdaQueryWrapper<Device>().eq(Device::getId, deviceCreateDTO.getCode()));
if (CollectionUtil.isNotEmpty(list)) {
throw new BusinessException("已有相同设备编码,请重新输入");
}
GasWell gasWell = gasWellService.getOne(new LambdaQueryWrapper<GasWell>().eq(GasWell::getId, deviceCreateDTO.getGasWell()));
if(gasWell==null) {
GasWell gasWell = this.gasWellService.getOne(new LambdaQueryWrapper<GasWell>().eq(GasWell::getId, deviceCreateDTO.getGasWell()));
if (gasWell == null) {
throw new BusinessException("该气井不存在");
}
if(gasWell.getDeviceId()!=null) {
if (gasWell.getDeviceId() != null) {
throw new BusinessException("该气井已存在绑定设备,无法添加新设备");
}
Device device = ConverterUtil.convert(deviceCreateDTO, Device.class);
//code必须为整形用于在缓存中查询设备的上报数据
device.setId(Long.valueOf(deviceCreateDTO.getCode()));
save(device);
this.save(device);
//在气井中绑定设备
gasWellService.bindDevice(deviceCreateDTO.getGasWell(),device.getId());
//创建该设备在mq中的事件队列
dynamicRabbitListener.registerDeviceAndListener(device.getId());
this.gasWellService.bindDevice(deviceCreateDTO.getGasWell(), device.getId());
}
@Override
public void edit(DeviceEditDTO deviceEditDTO){
updateById(ConverterUtil.convert(deviceEditDTO, Device.class));
public void edit(DeviceEditDTO deviceEditDTO) {
this.updateById(ConverterUtil.convert(deviceEditDTO, Device.class));
}
@Override
public void delete(Long id){
Long gasWellId = getDevice(id).getGasWell().getId();
public void delete(Long id) {
Long gasWellId = this.getDevice(id).getGasWell().getId();
//删除设备
removeById(id);
this.removeById(id);
//解绑气井
gasWellService.unbindDevice(gasWellId);
this.gasWellService.unbindDevice(gasWellId);
}
@Override
public DeviceVO getDevice(Long id) {
return deviceDao.getDeviceById(id);
return this.deviceDao.getDeviceById(id);
}
@Override
public Map<String,String> getDeviceControlData(Long deviceId) {
return redisTemplate.opsForHash().entries(PersistenceHandler.DEVICE_DATA_CACHE + deviceId);
public Map<String, String> getDeviceControlData(Long deviceId) {
return this.redisTemplate.opsForHash().entries(PersistenceHandler.DEVICE_DATA_CACHE + deviceId);
}
@Override
@ -159,18 +162,18 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
Date start = null;
Date end = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if(!StringUtils.isEmpty(startTime) ) {
if (!StringUtils.isEmpty(startTime)) {
start = simpleDateFormat.parse(startTime);
}
if(!StringUtils.isEmpty(endTime)) {
if (!StringUtils.isEmpty(endTime)) {
end = simpleDateFormat.parse(endTime);
}
return deviceOptLogService.page(new Page<>(currentPage, pageSize),start,end,deviceId);
return this.deviceOptLogService.page(new Page<>(currentPage, pageSize), start, end, deviceId);
}
@Override
public List<DeviceVO> getDeviceVOByIds(List<Long> deviceIdList) {
return deviceDao.getDeviceVOByIds(deviceIdList);
return this.deviceDao.getDeviceVOByIds(deviceIdList);
}
@ -179,28 +182,27 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceDao, Device> implements
Date start = null;
Date end = null;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if(!StringUtils.isEmpty(startTime) ) {
if (!StringUtils.isEmpty(startTime)) {
start = simpleDateFormat.parse(startTime);
}
if(!StringUtils.isEmpty(endTime)) {
if (!StringUtils.isEmpty(endTime)) {
end = simpleDateFormat.parse(endTime);
}
String tableName = Redis2DBPersistenceService.DEFAULT_DATA_TABLE +deviceId;
Page<DeviceHistoryVO> page = deviceDao.historyPage(new Page<>(currentPage, pageSize),start,end,deviceId,tableName);
String tableName = Redis2DBPersistenceService.DEFAULT_DATA_TABLE + deviceId;
Page<DeviceHistoryVO> page = this.deviceDao.historyPage(new Page<>(currentPage, pageSize), start, end, deviceId, tableName);
List<DeviceHistoryVO> deviceHistoryVO = page.getRecords();
if(CollectionUtil.isNotEmpty(deviceHistoryVO)) {
Map<String, Dictionary> runModeMap = dictionaryService.getValueMapByType("runMode");
Map<String, Dictionary> plugStatusMap = dictionaryService.getValueMapByType("plugStatus");
if (CollectionUtil.isNotEmpty(deviceHistoryVO)) {
Map<String, Dictionary> runModeMap = this.dictionaryService.getValueMapByType("runMode");
Map<String, Dictionary> plugStatusMap = this.dictionaryService.getValueMapByType("plugStatus");
for (DeviceHistoryVO deviceVO : deviceHistoryVO) {
deviceVO.setRunMode(StringUtils.isEmpty(deviceVO.getRunMode()) ?"":runModeMap.get(deviceVO.getRunMode()).getName());
deviceVO.setPlugStatus(StringUtils.isEmpty(deviceVO.getPlugStatus()) ?"":plugStatusMap.get(deviceVO.getPlugStatus()).getName());
deviceVO.setRunMode(StringUtils.isEmpty(deviceVO.getRunMode()) ? "" : runModeMap.get(deviceVO.getRunMode()).getName());
deviceVO.setPlugStatus(StringUtils.isEmpty(deviceVO.getPlugStatus()) ? "" : plugStatusMap.get(deviceVO.getPlugStatus()).getName());
}
}
return page;
}
}

View File

@ -29,11 +29,11 @@ spring:
# manual手动ack需要在业务代码结束后调用api发送ack。
# auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack
# none关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除
acknowledge-mode: auto
acknowledge-mode: none
concurrency: 10
retry:
# 开启消费者失败重试
enabled: true
enabled: false
# 初始的失败等待时长为1秒
initial-interval: 1000ms
# 失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
@ -41,7 +41,7 @@ spring:
# 最大重试次数
max-attempts: 3
# true无状态false有状态。如果业务中包含事务这里改为false
stateless: true
stateless: false
server: