Merge remote-tracking branch 'origin/develop' into develop

This commit is contained in:
qinjie 2024-11-23 18:49:43 +08:00
commit d9362860f3
10 changed files with 362 additions and 10 deletions

46
pom.xml
View File

@ -19,10 +19,10 @@
<java.version>21</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<fastjson.version>2.0.51</fastjson.version>
<mysql.version>8.0.15</mysql.version>
<mybatis.version>3.0.3</mybatis.version>
<lombok.version>1.18.34</lombok.version>
<hutool.version>5.8.29</hutool.version>
<biaomidou.version>3.5.5</biaomidou.version>
<druid.version>1.2.20</druid.version>
@ -70,7 +70,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<version>1.18.34</version>
<scope>compile</scope>
</dependency>
@ -161,7 +161,19 @@
<artifactId>sa-token-spring-boot3-starter</artifactId>
<version>1.38.0</version>
</dependency>
<dependency>
<groupId>com.infiniteautomation</groupId>
<artifactId>modbus4j</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
@ -207,7 +219,6 @@
</profile>
</profiles>
<build>
<plugins>
<plugin>
@ -231,7 +242,7 @@
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<version>1.18.34</version>
</path>
<path>
<groupId>org.projectlombok</groupId>
@ -264,4 +275,29 @@
</plugins>
</build>
<repositories>
<repository>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
<id>ias-snapshots</id>
<name>Infinite Automation Snapshot Repository</name>
<url>https://maven.mangoautomation.net/repository/ias-snapshot/</url>
</repository>
<repository>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>ias-releases</id>
<name>Infinite Automation Release Repository</name>
<url>https://maven.mangoautomation.net/repository/ias-release/</url>
</repository>
</repositories>
</project>

View File

@ -0,0 +1,11 @@
package com.isu.gaswellwatch.modbus.data;
/**
* 数据持久化处理器
*
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 11:53
*/
public interface PersistenceHandler {
}

View File

@ -0,0 +1,20 @@
package com.isu.gaswellwatch.modbus.data.decode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
/**
* Modbus 数据解析器
*
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 11:20
*/
public interface DecodeHandler {
String MODBUS_DEVICE_TYPE_FIELD_NAME = "modbusDeviceType";
Logger logger = LoggerFactory.getLogger("com.isu.gaswellwatch.decode");
void decode(Message message) throws Exception;
}

View File

@ -0,0 +1,52 @@
package com.isu.gaswellwatch.modbus.data.decode.impl;
import com.isu.gaswellwatch.modbus.data.decode.DecodeHandler;
import com.serotonin.modbus4j.msg.ReadInputRegistersResponse;
import com.serotonin.modbus4j.serial.rtu.RtuMessageParser;
import com.serotonin.modbus4j.serial.rtu.RtuMessageResponse;
import com.serotonin.modbus4j.sero.util.queue.ByteQueue;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
* KNPCV1 柱塞器解析
*
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 11:23
*/
@RequiredArgsConstructor
@Component(Knpcv1DecodeHandler.MODBUS_DEVICE_TYPE)
public class Knpcv1DecodeHandler implements DecodeHandler {
public static final String MODBUS_DEVICE_TYPE = "KNPCV1";
private final RedisTemplate redisTemplate;
@Override
public void decode(Message message) throws Exception {
// /device/4B454E454E4731343030303030333538/collect
String queueName = message.getMessageProperties().getConsumerQueue();
String messageString = new String(message.getBody());
HashOperations<String, Object, Object> hashOperations = this.redisTemplate.opsForHash();
String[] queuePath = StringUtils.split(queueName, "/");
String deviceId = queuePath[1];
String command = queuePath[2];
logger.info("Decode {}, Queue {}, Message {}", MODBUS_DEVICE_TYPE, queueName, messageString);
ByteQueue byteQueue = new ByteQueue(messageString.replaceAll(" ", ""));
RtuMessageParser masterParser = new RtuMessageParser(true);
RtuMessageResponse response = (RtuMessageResponse) masterParser.parseMessage(byteQueue);
ReadInputRegistersResponse readInputRegistersResponse = (ReadInputRegistersResponse) response.getModbusResponse();
int index = 0;
hashOperations.put(queueName, DecodeHandler.MODBUS_DEVICE_TYPE_FIELD_NAME, MODBUS_DEVICE_TYPE);
for (short value : readInputRegistersResponse.getShortData()) {
hashOperations.put(queueName, StringUtils.leftPad(String.valueOf(index++), 4, '0'), value);
}
}
}

View File

@ -0,0 +1,32 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
import com.isu.gaswellwatch.utils.SpringUtil;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Message;
import java.util.ArrayList;
import java.util.List;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 0:51
*/
public class ComposeModbusMessageListener implements BatchMessageListener {
private final List<BatchMessageListener> messageListenerList = new ArrayList<>();
public void addBatchMessageListener(BatchMessageListener listener) {
SpringUtil.getApplicationContext().getAutowireCapableBeanFactory().autowireBean(listener);
this.messageListenerList.add(listener);
}
@Override
public void onMessage(Message message) {
this.messageListenerList.forEach(messageListener -> messageListener.onMessage(message));
}
@Override
public void onMessageBatch(List<Message> messages) {
this.messageListenerList.forEach(messageListener -> messageListener.onMessageBatch(messages));
}
}

View File

@ -0,0 +1,68 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 0:32
*/
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class DynamicRabbitListener implements ApplicationRunner {
private final AmqpAdmin amqpAdmin;
private final ComposeModbusMessageListener composeListener;
private final SimpleMessageListenerContainer listenerContainer;
public DynamicRabbitListener(ConnectionFactory connectionFactory) {
this.amqpAdmin = new RabbitAdmin(connectionFactory);
this.listenerContainer = new SimpleMessageListenerContainer();
this.listenerContainer.setConnectionFactory(connectionFactory);
this.listenerContainer.setMessageListener(this.composeListener = new ComposeModbusMessageListener());
// 启动监听容器
this.listenerContainer.start();
}
@Override
public void run(ApplicationArguments args) throws Exception {
this.composeListener.addBatchMessageListener(new ModbusMessageBackupListener());
this.composeListener.addBatchMessageListener(new ModbusMessagePersistListener());
// TODO 根据设备自动绑定队列
this.addListenerQueue("/modbus/device/4B454E454E4731343030303030333538/collect");
}
// @RabbitListener(queues = "/modbus/device/4B454E454E4731343030303030333538/collect")
// public void modbusMessage(Message message) {
// System.out.println("modbusMessage:" + new String(message.getBody()));
// }
public void addListenerQueue(String queueName) {
this.addListenerQueue(queueName, null, null);
}
public void addListenerQueue(String queueName, String exchangeName, String routingKey) {
Queue queue = QueueBuilder.durable(queueName).build();
// 声明队列
this.amqpAdmin.declareQueue(queue);
if (StringUtils.isNotBlank(exchangeName) && StringUtils.isNotBlank(routingKey)) {
// 声明直接类型的交换器
DirectExchange exchange = new DirectExchange(exchangeName);
this.amqpAdmin.declareExchange(exchange);
// 绑定队列和交换器
this.amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
}
// 设置监听的队列
this.listenerContainer.addQueues(queue);
}
}

View File

@ -0,0 +1,47 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Message;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 0:47
*/
@Slf4j
public class ModbusMessageBackupListener implements BatchMessageListener {
@Override
public void onMessage(Message message) {
this.onMessageBatch(List.of(message));
}
@Override
public void onMessageBatch(List<Message> messages) {
Path filePath;
String messageString, queueName, backupFileName;
for (Message message : messages) {
queueName = message.getMessageProperties().getConsumerQueue();
backupFileName = StringUtils.replace(queueName, "/", "_");
messageString = new String(message.getBody());
filePath = Paths.get("D:\\modbus\\data\\" + backupFileName + ".data");
try (BufferedWriter writer = Files.newBufferedWriter(filePath, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.write(messageString);
writer.write("\n");
} catch (IOException e) {
log.error("Backup message failed. QueueName {}, Message {}", queueName, messageString, e);
}
}
}
}

View File

@ -0,0 +1,27 @@
package com.isu.gaswellwatch.modbus.data.decode.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Message;
import java.util.List;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 0:43
*/
@Slf4j
public class ModbusMessagePersistListener implements BatchMessageListener {
@Override
public void onMessage(Message message) {
this.onMessageBatch(List.of(message));
}
@Override
public void onMessageBatch(List<Message> messages) {
for (Message message : messages) {
}
}
}

View File

@ -0,0 +1,24 @@
package com.isu.gaswellwatch.modbus.data.impl;
import com.isu.gaswellwatch.modbus.data.PersistenceHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 11:55
*/
@Component
@RequiredArgsConstructor
public class Redis2DBPersistenceHandlerImpl implements PersistenceHandler {
private final RedisTemplate redisTemplate;
@Scheduled(cron = "0/10 * * * * ? ")
public void write() {
this.redisTemplate.getClientList();
}
}

View File

@ -7,6 +7,41 @@ spring:
max-request-size: 200MB #指定为100MB -1无限制
profiles:
active: @environment@
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
# Redis索引默认为 0
database: ${REDIS_DATABASE:0}
# Redis 服务器连接密码(默认为空)
password: ${REDIS_PASSWORD:}
timeout: 5000
rabbitmq:
host: ${RABBIT_MQ_HOST:localhost}
port: ${RABBIT_MQ_PORT:5672}
username: ${RABBIT_MQ_USERNAME:ModbusAdmin}
password: ${RABBIT_MQ_PASSWORD:ModbusPassword}
virtual-host: /
# public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输它在消息服务器持久化消息后通知消息生产者发送成功。
# 发送确认
publisher-confirms: true
listener:
simple:
# manual手动ack需要在业务代码结束后调用api发送ack。
# auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack
# none关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除
acknowledge-mode: auto
concurrency: 10
retry:
# 开启消费者失败重试
enabled: true
# 初始的失败等待时长为1秒
initial-interval: 1000ms
# 失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
multiplier: 1
# 最大重试次数
max-attempts: 3
# true无状态false有状态。如果业务中包含事务这里改为false
stateless: true
server: