使用数据库网关配置

This commit is contained in:
wangshilong 2024-11-28 20:27:37 +08:00
parent 3723bd3a78
commit fdc37c8ad1
5 changed files with 79 additions and 46 deletions

View File

@ -1,25 +1,43 @@
package com.iot.modbus_rtcp.config; package com.iot.modbus_rtcp.config;
import lombok.Data; import cn.hutool.core.map.MapUtil;
import org.springframework.boot.context.properties.ConfigurationProperties; import com.iot.modbus_rtcp.utils.HexUtil;
import org.springframework.context.annotation.Configuration; import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Configuration @Service
@Data @RequiredArgsConstructor
@ConfigurationProperties(prefix = "server.netty")
public class EquipmentIPProperties { public class EquipmentIPProperties {
private Map<String, String> identifiers; private final JdbcTemplate jdbcTemplate;
private final Map<String, String> identifiers = new ConcurrentHashMap<>();
public HashSet<String> keys() {
return new HashSet<>(this.identifiers.keySet());
}
public String get(String key) { public String get(String key) {
return this.identifiers.get(key); return this.identifiers.get(key);
} }
public String put(String gatewaySn) {
String gatewayHeartbeat = HexUtil.bytesToHexString(gatewaySn.getBytes()).toUpperCase();
this.identifiers.put(gatewayHeartbeat, gatewaySn);
return gatewayHeartbeat;
}
public boolean contains(String gatewayHeartbeat) {
if (this.identifiers.containsKey(gatewayHeartbeat)) {
return true;
}
gatewayHeartbeat = gatewayHeartbeat.toUpperCase();
String gatewaySn = new String(HexUtil.hexStringToBytes(gatewayHeartbeat));
Map<String, Object> countMap = this.jdbcTemplate.queryForMap("select count(*) as ctn from device where gateway_sn = '" + gatewaySn + "'");
if (MapUtil.getInt(countMap, "ctn") > 0) {
this.identifiers.put(gatewayHeartbeat, gatewaySn);
return true;
}
return false;
}
} }

View File

@ -1,6 +1,7 @@
package com.iot.modbus_rtcp.jobs; package com.iot.modbus_rtcp.jobs;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import com.iot.modbus_rtcp.config.EquipmentIPProperties;
import com.iot.modbus_rtcp.controller.ModbusTCPController; import com.iot.modbus_rtcp.controller.ModbusTCPController;
import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.dto.ModbusCommandDto;
@ -64,6 +65,7 @@ public class AutoCollectJobs {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final ModbusTCPController controller; private final ModbusTCPController controller;
private final EquipmentIPProperties equipmentIPProperties;
private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors()) private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors())
, Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS, , Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5000), new ThreadFactory() { new ArrayBlockingQueue<>(5000), new ThreadFactory() {
@ -91,11 +93,14 @@ public class AutoCollectJobs {
} }
resultList.stream() resultList.stream()
.filter(item -> { .filter(item -> {
String identifier = MapUtil.getStr(item, "identifier"); String gatewaySn = MapUtil.getStr(item, "identifier");
if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup().get(identifier))) { String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn);
if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup()
.get(gatewayIdentifier))) {
item.put("identifier", gatewayIdentifier);
return true; return true;
} }
log.warn("Gateway {} is disconnected and does not collect data", identifier); log.warn("Gateway {} is disconnected and does not collect data", gatewaySn);
return false; return false;
}) })
.collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "deviceId"))) .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "deviceId")))

View File

@ -14,20 +14,16 @@ import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects;
@Slf4j @Slf4j
public class ModbusDecoder extends ByteToMessageDecoder { public class ModbusDecoder extends ByteToMessageDecoder {
private final ChannelGroup channelGroup; private final ChannelGroup channelGroup;
private final HashSet<String> identityList;
private final EquipmentIPProperties equipmentIPProperties; private final EquipmentIPProperties equipmentIPProperties;
public ModbusDecoder(ChannelGroup channelGroup) { public ModbusDecoder(ChannelGroup channelGroup) {
this.channelGroup = channelGroup; this.channelGroup = channelGroup;
this.equipmentIPProperties = SpringUtil.getBean(EquipmentIPProperties.class); this.equipmentIPProperties = SpringUtil.getBean(EquipmentIPProperties.class);
this.identityList = this.equipmentIPProperties.keys();
} }
@Override @Override
@ -41,32 +37,28 @@ public class ModbusDecoder extends ByteToMessageDecoder {
String hex = HexUtil.bytesToHexString(b); String hex = HexUtil.bytesToHexString(b);
log.info("解析到:{}", hex); log.info("解析到:{}", hex);
if (this.identityList.contains(hex.toUpperCase())) { // 心跳 if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳
String msgUpperCase = hex.toUpperCase(); String msgUpperCase = hex.toUpperCase();
for (String identity : this.identityList) { log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase);
if (Objects.equals(identity, msgUpperCase)) { String currentAddress = ChannelGroup.getKey(ip, port);
log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase); SyncPriorityChannel channel = this.channelGroup.get(currentAddress);
String currentAddress = ChannelGroup.getKey(ip, port); channel.setIdentifier(msgUpperCase);
SyncPriorityChannel channel = this.channelGroup.get(currentAddress); this.channelGroup.put(msgUpperCase, channel);
channel.setIdentifier(msgUpperCase); SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(channel.getCollectQueue()).build());
this.channelGroup.put(msgUpperCase, channel);
SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(channel.getCollectQueue()).build());
IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase)); IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase));
String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase); String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase);
if (StringUtils.isNotBlank(currentAddress) && if (StringUtils.isNotBlank(currentAddress) &&
StringUtils.isNotBlank(oldAddress) && StringUtils.isNotBlank(oldAddress) &&
!currentAddress.equals(oldAddress)) { !currentAddress.equals(oldAddress)) {
//说明设备重连后IP+端口发生了改变 //说明设备重连后IP+端口发生了改变
String[] split = oldAddress.split(":"); String[] split = oldAddress.split(":");
//删除老连接 //删除老连接
this.channelGroup.remove(split[0], Integer.parseInt(split[1])); this.channelGroup.remove(split[0], Integer.parseInt(split[1]));
log.info("删除channelGroup通道:{}连接:{},新增该通道连接:{}", this.equipmentIPProperties.get(msgUpperCase), oldAddress, currentAddress); log.info("删除channelGroup通道:{}连接:{},新增该通道连接:{}", this.equipmentIPProperties.get(msgUpperCase), oldAddress, currentAddress);
}
IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress);
return;
}
} }
IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress);
return;
} }
SyncPriorityChannel channel = this.channelGroup.get(ip, port); SyncPriorityChannel channel = this.channelGroup.get(ip, port);

View File

@ -0,0 +1,20 @@
package com.iot.modbus_rtcp;
import com.iot.modbus_rtcp.utils.HexUtil;
import org.junit.jupiter.api.Test;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/28 18:52
*/
public class GatewayTest {
@Test
public void testConverterHeartbeat() {
String s1 = "4B454E454E4731343030303030333538";
String s2 = "KENENG1400000358";
System.out.println(new String(HexUtil.hexStringToBytes(s1)));
System.out.println(HexUtil.bytesToHexString(s2.getBytes()));
}
}

View File

@ -15,7 +15,7 @@ import java.util.concurrent.locks.LockSupport;
public class NonBlockingSocketTest { public class NonBlockingSocketTest {
private static final String HOST = "127.0.0.1"; private static final String HOST = "127.0.0.1";
private static final Integer PORT = 1200; private static final Integer PORT = 502;
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open(); SocketChannel socketChannel = SocketChannel.open();
@ -56,11 +56,9 @@ public class NonBlockingSocketTest {
System.out.println(LocalDateTime.now() + "<==:收到服务器端请求:" + line); System.out.println(LocalDateTime.now() + "<==:收到服务器端请求:" + line);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
// 接收到请求 // 接收到请求
switch (line.toUpperCase()) { switch (line.toUpperCase()) {
case "01040000001531C5" -> case "01040000001671C4" ->
// 发送响应字符串 // 发送响应字符串
socketChannel.write(ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes())); socketChannel.write(ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes()));
// 接收到开井请求 // 接收到开井请求