diff --git a/pom.xml b/pom.xml index 57efe41..67decf2 100644 --- a/pom.xml +++ b/pom.xml @@ -19,10 +19,10 @@ 21 ${java.version} ${java.version} + 2.0.51 8.0.15 3.0.3 - 1.18.34 5.8.29 3.5.5 1.2.20 @@ -70,7 +70,7 @@ org.projectlombok lombok - ${lombok.version} + 1.18.34 compile @@ -114,11 +114,11 @@ pom import - - - - - + + + + + com.alibaba @@ -161,7 +161,19 @@ sa-token-spring-boot3-starter 1.38.0 - + + com.infiniteautomation + modbus4j + 3.1.0 + + + org.springframework.boot + spring-boot-starter-amqp + + + org.springframework.boot + spring-boot-starter-data-redis + org.springframework.boot spring-boot-devtools @@ -207,7 +219,6 @@ - @@ -231,7 +242,7 @@ org.projectlombok lombok - ${lombok.version} + 1.18.34 org.projectlombok @@ -264,4 +275,29 @@ + + + + false + + + true + + ias-snapshots + Infinite Automation Snapshot Repository + https://maven.mangoautomation.net/repository/ias-snapshot/ + + + + true + + + false + + ias-releases + Infinite Automation Release Repository + https://maven.mangoautomation.net/repository/ias-release/ + + + diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java new file mode 100644 index 0000000..691ce17 --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/PersistenceHandler.java @@ -0,0 +1,11 @@ +package com.isu.gaswellwatch.modbus.data; + +/** + * 数据持久化处理器 + * + * @author 王仕龙 + * 2024/11/23 11:53 + */ +public interface PersistenceHandler { + +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java new file mode 100644 index 0000000..b48d326 --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/DecodeHandler.java @@ -0,0 +1,20 @@ +package com.isu.gaswellwatch.modbus.data.decode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; + +/** + * Modbus 数据解析器 + * + * @author 王仕龙 + * 2024/11/23 11:20 + */ +public interface DecodeHandler { + + String MODBUS_DEVICE_TYPE_FIELD_NAME = "modbusDeviceType"; + Logger logger = LoggerFactory.getLogger("com.isu.gaswellwatch.decode"); + + void decode(Message message) throws Exception; + +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java new file mode 100644 index 0000000..d80d94c --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/Knpcv1DecodeHandler.java @@ -0,0 +1,52 @@ +package com.isu.gaswellwatch.modbus.data.decode.impl; + +import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler; +import com.serotonin.modbus4j.msg.ReadInputRegistersResponse; +import com.serotonin.modbus4j.serial.rtu.RtuMessageParser; +import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse; +import com.serotonin.modbus4j.sero.util.queue.ByteQueue; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.core.Message; +import org.springframework.data.redis.core.HashOperations; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +/** + * KNPCV1 柱塞器解析 + * + * @author 王仕龙 + * 2024/11/23 11:23 + */ +@RequiredArgsConstructor +@Component(Knpcv1DecodeHandler.MODBUS_DEVICE_TYPE) +public class Knpcv1DecodeHandler implements DecodeHandler { + + public static final String MODBUS_DEVICE_TYPE = "KNPCV1"; + private final RedisTemplate redisTemplate; + + @Override + public void decode(Message message) throws Exception { + + // /device/4B454E454E4731343030303030333538/collect + String queueName = message.getMessageProperties().getConsumerQueue(); + String messageString = new String(message.getBody()); + HashOperations hashOperations = this.redisTemplate.opsForHash(); + String[] queuePath = StringUtils.split(queueName, "/"); + String deviceId = queuePath[1]; + String command = queuePath[2]; + + logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, queueName, messageString); + + ByteQueue byteQueue = new ByteQueue(messageString.replaceAll(" ", "")); + RtuMessageParser masterParser = new RtuMessageParser(true); + RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue); + ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) response.getModbusResponse(); + int index = 0; + hashOperations.put(queueName, DecodeHandler.MODBUS_DEVICE_TYPE_FIELD_NAME, MODBUS_DEVICE_TYPE); + for (short value : readInputRegistersResponse.getShortData()) { + hashOperations.put(queueName, StringUtils.leftPad(String.valueOf(index++), 4, '0'), value); + } + } + +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ComposeModbusMessageListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ComposeModbusMessageListener.java new file mode 100644 index 0000000..834ca26 --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ComposeModbusMessageListener.java @@ -0,0 +1,32 @@ +package com.isu.gaswellwatch.modbus.data.decode.listener; + +import com.isu.gaswellwatch.utils.SpringUtil; +import org.springframework.amqp.core.BatchMessageListener; +import org.springframework.amqp.core.Message; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author 王仕龙 + * 2024/11/23 0:51 + */ +public class ComposeModbusMessageListener implements BatchMessageListener { + + private final List messageListenerList = new ArrayList<>(); + + public void addBatchMessageListener(BatchMessageListener listener) { + SpringUtil.getApplicationContext().getAutowireCapableBeanFactory().autowireBean(listener); + this.messageListenerList.add(listener); + } + + @Override + public void onMessage(Message message) { + this.messageListenerList.forEach(messageListener -> messageListener.onMessage(message)); + } + + @Override + public void onMessageBatch(List messages) { + this.messageListenerList.forEach(messageListener -> messageListener.onMessageBatch(messages)); + } +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/DynamicRabbitListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/DynamicRabbitListener.java new file mode 100644 index 0000000..99891a9 --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/DynamicRabbitListener.java @@ -0,0 +1,68 @@ +package com.isu.gaswellwatch.modbus.data.decode.listener; + +/** + * @author 王仕龙 + * 2024/11/23 0:32 + */ + +import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + + +@Component +public class DynamicRabbitListener implements ApplicationRunner { + + private final AmqpAdmin amqpAdmin; + private final ComposeModbusMessageListener composeListener; + private final SimpleMessageListenerContainer listenerContainer; + + public DynamicRabbitListener(ConnectionFactory connectionFactory) { + this.amqpAdmin = new RabbitAdmin(connectionFactory); + this.listenerContainer = new SimpleMessageListenerContainer(); + this.listenerContainer.setConnectionFactory(connectionFactory); + this.listenerContainer.setMessageListener(this.composeListener = new ComposeModbusMessageListener()); + // 启动监听容器 + this.listenerContainer.start(); + } + + @Override + public void run(ApplicationArguments args) throws Exception { + this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener()); + this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener()); + // TODO 根据设备自动绑定队列 + this.addListenerQueue("/modbus/device/4B454E454E4731343030303030333538/collect"); + } + +// @RabbitListener(queues = "/modbus/device/4B454E454E4731343030303030333538/collect") +// public void modbusMessage(Message message) { +// System.out.println("modbusMessage:" + new String(message.getBody())); +// } + + public void addListenerQueue(String queueName) { + this.addListenerQueue(queueName, null, null); + } + + public void addListenerQueue(String queueName, String exchangeName, String routingKey) { + Queue queue = QueueBuilder.durable(queueName).build(); + // 声明队列 + this.amqpAdmin.declareQueue(queue); + + if (StringUtils.isNotBlank(exchangeName) && StringUtils.isNotBlank(routingKey)) { + // 声明直接类型的交换器 + DirectExchange exchange = new DirectExchange(exchangeName); + this.amqpAdmin.declareExchange(exchange); + // 绑定队列和交换器 + this.amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey)); + } + + // 设置监听的队列 + this.listenerContainer.addQueues(queue); + } + +} \ No newline at end of file diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java new file mode 100644 index 0000000..9cd9b93 --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessageBackupListener.java @@ -0,0 +1,47 @@ +package com.isu.gaswellwatch.modbus.data.decode.listener; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.core.BatchMessageListener; +import org.springframework.amqp.core.Message; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.List; + +/** + * @author 王仕龙 + * 2024/11/23 0:47 + */ +@Slf4j +public class ModbusMessageBackupListener implements BatchMessageListener { + + @Override + public void onMessage(Message message) { + this.onMessageBatch(List.of(message)); + } + + @Override + public void onMessageBatch(List messages) { + Path filePath; + String messageString, queueName, backupFileName; + for (Message message : messages) { + queueName = message.getMessageProperties().getConsumerQueue(); + backupFileName = StringUtils.replace(queueName, "/", "_"); + messageString = new String(message.getBody()); + filePath = Paths.get("D:\\modbus\\data\\" + backupFileName + ".data"); + try (BufferedWriter writer = Files.newBufferedWriter(filePath, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) { + writer.write(messageString); + writer.write("\n"); + } catch (IOException e) { + log.error("Backup message failed. QueueName {}, Message {}", queueName, messageString, e); + } + } + } + +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java new file mode 100644 index 0000000..5f6529e --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/listener/ModbusMessagePersistListener.java @@ -0,0 +1,27 @@ +package com.isu.gaswellwatch.modbus.data.decode.listener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.BatchMessageListener; +import org.springframework.amqp.core.Message; + +import java.util.List; + +/** + * @author 王仕龙 + * 2024/11/23 0:43 + */ +@Slf4j +public class ModbusMessagePersistListener implements BatchMessageListener { + + @Override + public void onMessage(Message message) { + this.onMessageBatch(List.of(message)); + } + + @Override + public void onMessageBatch(List messages) { + for (Message message : messages) { + + } + } +} diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java new file mode 100644 index 0000000..ddb945a --- /dev/null +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/Redis2DBPersistenceHandlerImpl.java @@ -0,0 +1,24 @@ +package com.isu.gaswellwatch.modbus.data.impl; + +import com.isu.gaswellwatch.modbus.data.PersistenceHandler; +import lombok.RequiredArgsConstructor; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * @author 王仕龙 + * 2024/11/23 11:55 + */ +@Component +@RequiredArgsConstructor +public class Redis2DBPersistenceHandlerImpl implements PersistenceHandler { + + private final RedisTemplate redisTemplate; + + @Scheduled(cron = "0/10 * * * * ? ") + public void write() { + this.redisTemplate.getClientList(); + } + +} diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index c3073b3..08dab3a 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -7,6 +7,41 @@ spring: max-request-size: 200MB #指定为100MB -1无限制 profiles: active: @environment@ + redis: + host: ${REDIS_HOST:localhost} + port: ${REDIS_PORT:6379} + # Redis索引(默认为 0) + database: ${REDIS_DATABASE:0} + # Redis 服务器连接密码(默认为空) + password: ${REDIS_PASSWORD:} + timeout: 5000 + rabbitmq: + host: ${RABBIT_MQ_HOST:localhost} + port: ${RABBIT_MQ_PORT:5672} + username: ${RABBIT_MQ_USERNAME:ModbusAdmin} + password: ${RABBIT_MQ_PASSWORD:ModbusPassword} + virtual-host: / + # public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功。 + # 发送确认 + publisher-confirms: true + listener: + simple: + # manual:手动ack,需要在业务代码结束后,调用api发送ack。 + # auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack + # none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除 + acknowledge-mode: auto + concurrency: 10 + retry: + # 开启消费者失败重试 + enabled: true + # 初始的失败等待时长为1秒 + initial-interval: 1000ms + # 失败的等待时长倍数,下次等待时长 = multiplier * initial-interval + multiplier: 1 + # 最大重试次数 + max-attempts: 3 + # true无状态;false有状态。如果业务中包含事务,这里改为false + stateless: true server: