Accept Merge Request #2: (use_db_gateway -> develop)

Merge Request: M

Created By: @王仕龙
Accepted By: @王仕龙
URL: https://g-dupe6279.coding.net/p/chuanqing/d/ModBus-rtcp/git/merge/2?initial=true
This commit is contained in:
王仕龙 2024-12-01 14:47:59 +08:00 committed by Coding
commit 3e5fec9236
7 changed files with 119 additions and 62 deletions

View File

@ -1,25 +1,43 @@
package com.iot.modbus_rtcp.config; package com.iot.modbus_rtcp.config;
import lombok.Data; import cn.hutool.core.map.MapUtil;
import org.springframework.boot.context.properties.ConfigurationProperties; import com.iot.modbus_rtcp.utils.HexUtil;
import org.springframework.context.annotation.Configuration; import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Configuration @Service
@Data @RequiredArgsConstructor
@ConfigurationProperties(prefix = "server.netty")
public class EquipmentIPProperties { public class EquipmentIPProperties {
private Map<String, String> identifiers; private final JdbcTemplate jdbcTemplate;
private final Map<String, String> identifiers = new ConcurrentHashMap<>();
public HashSet<String> keys() {
return new HashSet<>(this.identifiers.keySet());
}
public String get(String key) { public String get(String key) {
return this.identifiers.get(key); return this.identifiers.get(key);
} }
public String put(String gatewaySn) {
String gatewayHeartbeat = HexUtil.bytesToHexString(gatewaySn.getBytes()).toUpperCase();
this.identifiers.put(gatewayHeartbeat, gatewaySn);
return gatewayHeartbeat;
}
public boolean contains(String gatewayHeartbeat) {
if (this.identifiers.containsKey(gatewayHeartbeat)) {
return true;
}
gatewayHeartbeat = gatewayHeartbeat.toUpperCase();
String gatewaySn = new String(HexUtil.hexStringToBytes(gatewayHeartbeat));
Map<String, Object> countMap = this.jdbcTemplate.queryForMap("select count(*) as ctn from device where gateway_sn = '" + gatewaySn + "'");
if (MapUtil.getInt(countMap, "ctn") > 0) {
this.identifiers.put(gatewayHeartbeat, gatewaySn);
return true;
}
return false;
}
} }

View File

@ -1,6 +1,7 @@
package com.iot.modbus_rtcp.jobs; package com.iot.modbus_rtcp.jobs;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import com.iot.modbus_rtcp.config.EquipmentIPProperties;
import com.iot.modbus_rtcp.controller.ModbusTCPController; import com.iot.modbus_rtcp.controller.ModbusTCPController;
import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.dto.ModbusCommandDto;
@ -64,6 +65,7 @@ public class AutoCollectJobs {
private final JdbcTemplate jdbcTemplate; private final JdbcTemplate jdbcTemplate;
private final ModbusTCPController controller; private final ModbusTCPController controller;
private final EquipmentIPProperties equipmentIPProperties;
private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors()) private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors())
, Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS, , Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5000), new ThreadFactory() { new ArrayBlockingQueue<>(5000), new ThreadFactory() {
@ -91,11 +93,14 @@ public class AutoCollectJobs {
} }
resultList.stream() resultList.stream()
.filter(item -> { .filter(item -> {
String identifier = MapUtil.getStr(item, "identifier"); String gatewaySn = MapUtil.getStr(item, "identifier");
if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup().get(identifier))) { String gatewayIdentifier = this.equipmentIPProperties.put(gatewaySn);
if (Objects.nonNull(ModbusTCPController.nettyServer.getGroup()
.get(gatewayIdentifier))) {
item.put("identifier", gatewayIdentifier);
return true; return true;
} }
log.warn("Gateway {} is disconnected and does not collect data", identifier); log.warn("Gateway {} is disconnected and does not collect data", gatewaySn);
return false; return false;
}) })
.collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "deviceId"))) .collect(Collectors.groupingBy(item -> MapUtil.getStr(item, "deviceId")))

View File

@ -14,20 +14,16 @@ import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects;
@Slf4j @Slf4j
public class ModbusDecoder extends ByteToMessageDecoder { public class ModbusDecoder extends ByteToMessageDecoder {
private final ChannelGroup channelGroup; private final ChannelGroup channelGroup;
private final HashSet<String> identityList;
private final EquipmentIPProperties equipmentIPProperties; private final EquipmentIPProperties equipmentIPProperties;
public ModbusDecoder(ChannelGroup channelGroup) { public ModbusDecoder(ChannelGroup channelGroup) {
this.channelGroup = channelGroup; this.channelGroup = channelGroup;
this.equipmentIPProperties = SpringUtil.getBean(EquipmentIPProperties.class); this.equipmentIPProperties = SpringUtil.getBean(EquipmentIPProperties.class);
this.identityList = this.equipmentIPProperties.keys();
} }
@Override @Override
@ -41,10 +37,8 @@ public class ModbusDecoder extends ByteToMessageDecoder {
String hex = HexUtil.bytesToHexString(b); String hex = HexUtil.bytesToHexString(b);
log.info("解析到:{}", hex); log.info("解析到:{}", hex);
if (this.identityList.contains(hex.toUpperCase())) { // 心跳 if (this.equipmentIPProperties.contains(hex.toUpperCase())) { // 心跳
String msgUpperCase = hex.toUpperCase(); String msgUpperCase = hex.toUpperCase();
for (String identity : this.identityList) {
if (Objects.equals(identity, msgUpperCase)) {
log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase); log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase);
String currentAddress = ChannelGroup.getKey(ip, port); String currentAddress = ChannelGroup.getKey(ip, port);
SyncPriorityChannel channel = this.channelGroup.get(currentAddress); SyncPriorityChannel channel = this.channelGroup.get(currentAddress);
@ -66,8 +60,6 @@ public class ModbusDecoder extends ByteToMessageDecoder {
IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress); IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress);
return; return;
} }
}
}
SyncPriorityChannel channel = this.channelGroup.get(ip, port); SyncPriorityChannel channel = this.channelGroup.get(ip, port);
ModbusCommandDto message = channel.getCurrentMessage(); ModbusCommandDto message = channel.getCurrentMessage();

View File

@ -2,6 +2,7 @@ package com.iot.modbus_rtcp.netty;
import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import com.iot.modbus_rtcp.utils.HexUtil;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import lombok.Getter; import lombok.Getter;
@ -140,6 +141,7 @@ public class SyncPriorityChannel implements Runnable {
} }
public void setCollectQueue() { public void setCollectQueue() {
this.collectQueue = "/modbus/device/" + this.identifier + "/collect"; // this.collectQueue = "/modbus/device/" + this.identifier + "/collect";
this.collectQueue = "/modbus/collect/" + HexUtil.hashPartition(this.identifier, 10);
} }
} }

View File

@ -22,6 +22,10 @@ public class HexUtil {
return sb.toString().trim().toUpperCase(); return sb.toString().trim().toUpperCase();
} }
public static int hashPartition(String key, int partition) {
return Math.abs(key.hashCode() % partition);
}
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(hexStringToBytes("0D")); System.out.println(hexStringToBytes("0D"));
System.out.println(new byte[]{(byte) Integer.parseInt("0D", 16)}); System.out.println(new byte[]{(byte) Integer.parseInt("0D", 16)});

View File

@ -0,0 +1,40 @@
package com.iot.modbus_rtcp;
import com.iot.modbus_rtcp.utils.HexUtil;
import org.junit.jupiter.api.Test;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/28 18:52
*/
public class GatewayTest {
@Test
public void testConverterHeartbeat() {
String s1 = "4B454E454E4731343030303030333538";
String s2 = "KENENG1400000358";
System.out.println(new String(HexUtil.hexStringToBytes(s1)));
System.out.println(HexUtil.bytesToHexString(s2.getBytes()));
String hex = "333538";
int decimal = 0;
int power = 0;
for (int i = hex.length() - 1; i >= 0; i--) {
char digit = hex.charAt(i);
if (digit >= '0' && digit <= '9') {
decimal += (digit - '0') * Math.pow(16, power);
} else if (digit >= 'A' && digit <= 'F') {
decimal += (digit - 'A' + 10) * Math.pow(16, power);
} else if (digit >= 'a' && digit <= 'f') {
decimal += (digit - 'a' + 10) * Math.pow(16, power);
} else {
throw new IllegalArgumentException("Invalid hex digit: " + digit);
}
power++;
}
System.out.println(decimal);
}
}

View File

@ -1,5 +1,7 @@
package com.iot.modbus_rtcp; package com.iot.modbus_rtcp;
import com.iot.modbus_rtcp.utils.HexUtil;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -15,7 +17,7 @@ import java.util.concurrent.locks.LockSupport;
public class NonBlockingSocketTest { public class NonBlockingSocketTest {
private static final String HOST = "127.0.0.1"; private static final String HOST = "127.0.0.1";
private static final Integer PORT = 1200; private static final Integer PORT = 502;
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open(); SocketChannel socketChannel = SocketChannel.open();
@ -48,7 +50,7 @@ public class NonBlockingSocketTest {
lastSentHeartBeatTime = nowTime; lastSentHeartBeatTime = nowTime;
socketChannel.write(heartBeatBuffer); socketChannel.write(heartBeatBuffer);
} }
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500)); LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
continue; continue;
} }
String line = byteBufferToHexString(readBuffer).trim(); String line = byteBufferToHexString(readBuffer).trim();
@ -56,27 +58,21 @@ public class NonBlockingSocketTest {
System.out.println(LocalDateTime.now() + "<==:收到服务器端请求:" + line); System.out.println(LocalDateTime.now() + "<==:收到服务器端请求:" + line);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
// 接收到请求 // 接收到请求
switch (line.toUpperCase()) { switch (line.toUpperCase().substring(0, line.length() - 4)) {
case "01040000001531C5" -> case "010200140004" ->
// 发送响应字符串 socketChannel.write(ByteBuffer.wrap(HexUtil.hexStringToBytes("01042C07D000010003000C002B0029000300010000001A0018003303CD002300080DAC0DAC0000000000000959091DF066")));
socketChannel.write(ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes())); case "0103006A002A" ->
socketChannel.write(ByteBuffer.wrap(HexUtil.hexStringToBytes("0103A400010000000000000000000000000000000000000001000100000000000A0005001E000B001E00000011002D00000012000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001001E00000017001E00000000000100230000000003E800640000000A0000000000000000000003E7003B003B00000001000103E7003B003B0000000100019014")));
case "0104000A001A" ->
socketChannel.write(ByteBuffer.wrap(HexUtil.hexStringToBytes("01043400020016002B0034000100070001000F001E0016000A00000000000000000000000000000000000003080000153D000000001365B5E3")));
// 接收到开井请求 // 接收到开井请求
case "01050001FF00DDFA" -> case "01050000FF00" ->
// 发送响应字符串 socketChannel.write(ByteBuffer.wrap(HexUtil.hexStringToBytes("01050000FF00DDFA")));
socketChannel.write(ByteBuffer.wrap("01050001FF00DDFA".getBytes()));
// 接收到关井请求 // 接收到关井请求
case "0105000100009C0A" -> case "01050001FF00" ->
// 发送响应字符串 socketChannel.write(ByteBuffer.wrap(HexUtil.hexStringToBytes("01050001FF00DDFA")));
socketChannel.write(ByteBuffer.wrap("0105000100009C0A".getBytes()));
// 接收到读取运行模式请求
case "010300640001C5D5" ->
// 发送响应字符串
socketChannel.write(ByteBuffer.wrap("01 03 02 00 03 F8 45".replaceAll(" ", "").getBytes()));
// 接收到退出请求 // 接收到退出请求
case "exit" -> socketChannel.close(); case "exit" -> socketChannel.close();
} }