From 5fd5dc316e0fafec54d970e499636e1d5f896ea1 Mon Sep 17 00:00:00 2001 From: wangshilong Date: Fri, 6 Dec 2024 13:55:09 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=9F=E5=88=97=E5=90=8D=E7=A7=B0=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gaswellwatch/modbus/data/Redis2DBPersistenceService.java | 2 +- .../modbus/data/listener/DynamicRabbitListener.java | 2 +- .../com/isu/gaswellwatch/modbus/data/listener/Queues.java | 5 +++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java index 4c09afb..c17f42c 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java @@ -90,7 +90,7 @@ public class Redis2DBPersistenceService { } } tableName = DEFAULT_DATA_TABLE + deviceId; - queueName = Queues.DEVICE_EVENTS + (deviceId % 10); + queueName = Queues.getDeviceEventsQueue(deviceId); persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); existsTableList = this.jdbcTemplate.queryForList(StringUtils.replace(EXISTS_TABLE_SQL, "$TableName$", tableName)); if (ObjectUtils.isEmpty(existsTableList) diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java index 1e8fd7a..0c3e970 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/DynamicRabbitListener.java @@ -39,7 +39,7 @@ public class DynamicRabbitListener implements ApplicationRunner { public void run(ApplicationArguments args) { // this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener()); this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener()); - IntStream.range(0, 10).forEach(index -> { + IntStream.range(0, Queues.QUEUE_PARTITION).forEach(index -> { this.addEventListener(Queues.DEVICE_EVENTS + index); this.addCollectDataListener(Queues.MODBUS_COLLECT_DATA + index); }); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java index b42ff5e..6635912 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/Queues.java @@ -5,6 +5,11 @@ package com.isu.gaswellwatch.modbus.data.listener; * 2024/11/26 12:37 */ public final class Queues { + public static final int QUEUE_PARTITION = 10; public static final String DEVICE_EVENTS = "/device/events/"; public static final String MODBUS_COLLECT_DATA = "/modbus/collect/"; + + public static String getDeviceEventsQueue(Long deviceId) { + return DEVICE_EVENTS + (deviceId % QUEUE_PARTITION); + } }