解析数据落库

This commit is contained in:
wangshilong 2024-11-25 14:49:18 +08:00
parent 8b0b219fbb
commit 539907bd0c
7 changed files with 82 additions and 30 deletions

View File

@ -65,6 +65,11 @@
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.70</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

View File

@ -10,12 +10,10 @@ import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Set;
/**
* Modbus-TCP协议API
@ -25,7 +23,7 @@ import java.util.List;
@RestController
@RequestMapping("modbus-tcp")
public class ModbusTCPController implements ApplicationRunner {
private NettyServer nettyServer;
public static NettyServer nettyServer;
@PreDestroy
private void destroy() {
@ -88,6 +86,16 @@ public class ModbusTCPController implements ApplicationRunner {
return Response.succeed();
}
/**
* 在綫設備列表
*
* @return
*/
@GetMapping("/online")
public Response<Set<String>> online() {
return Response.succeed(this.nettyServer.getGroup().onlineGateway());
}
@Override
public void run(ApplicationArguments args) throws Exception {
this.nettyServer = new NettyServer(502, 10);

View File

@ -14,6 +14,11 @@ import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@ -24,11 +29,25 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
public class AutoCollectJobs {
private static final String SQL = "SELECT ref_id as deviceId, command, message_length messageLength FROM commands " +
private static final String SQL = "SELECT id as commandId, ref_id as deviceId, command, " +
"message_length messageLength, '4B454E454E4731343030303030333538' as identifier FROM commands " +
"WHERE type = 'COLLECTION' and ref_type = 'DEVICE' and id < 10099 order by id";
private final JdbcTemplate jdbcTemplate;
private final ModbusTCPController controller;
private final ThreadPoolExecutor collectThreadPool = new ThreadPoolExecutor(Math.min(10, Runtime.getRuntime().availableProcessors())
, Math.min(100, Runtime.getRuntime().availableProcessors()), 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5000), new ThreadFactory() {
AtomicInteger index = new AtomicInteger(1);
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Modbus-auto-collect-thread-" + index.getAndIncrement());
thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY);
return thread;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
@Scheduled(cron = "0/30 * * * * ? ")
public void autoCollect() {
@ -38,19 +57,30 @@ public class AutoCollectJobs {
List<Map<String, Object>> resultList = null;
while (Objects.isNull(resultList) || resultList.size() >= pageSize) {
resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++) + "," + pageSize);
if (ObjectUtils.isNotEmpty(resultList)) {
this.controller.collect(resultList.stream()
.filter(item -> StringUtils.isNotEmpty(MapUtil.getStr(item, "identifier")))
.map(item -> ModbusCommandDto.builder()
if (ObjectUtils.isEmpty(resultList)) {
continue;
}
List<ModbusCommandDto> collectCommondList = resultList.stream()
.filter(item -> Objects.nonNull(ModbusTCPController.nettyServer.getGroup()
.get((MapUtil.getStr(item, "identifier")))))
.map(item -> {
String identifier = MapUtil.getStr(item, "identifier");
return ModbusCommandDto.builder()
.command(MapUtil.getStr(item, "command"))
.identifier("4B454E454E4731343030303030333538")
.identifier(identifier)
.length(MapUtil.getInt(item, "messageLength"))
.type(CommandTypeComparable.CommandType.COLLECTION)
.key("4B454E454E4731343030303030333538/" + MapUtil.getStr(item, "deviceId") + "/" + timestamp)
.build())
.collect(Collectors.toList()));
.key(StringUtils.joinWith("/", identifier,
MapUtil.getStr(item, "deviceId"),
MapUtil.getStr(item, "commandId"),
timestamp))
.build();
})
.collect(Collectors.toList());
if (ObjectUtils.isEmpty(collectCommondList)) {
continue;
}
this.collectThreadPool.execute(() -> this.controller.collect(collectCommondList));
}
}
}
}

View File

@ -1,9 +1,12 @@
package com.iot.modbus_rtcp.netty;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Slf4j
public class ChannelGroup {
@ -30,6 +33,13 @@ public class ChannelGroup {
this.mChannelMap.remove(identity);
}
public Set<String> onlineGateway() {
return this.mChannelMap.keySet()
.stream()
.filter(identifier -> !StringUtils.contains(identifier, ":"))
.collect(Collectors.toSet());
}
public void see() {
log.info("当前连接有:{}", new ArrayList<>(this.mChannelMap.keySet()));
}

View File

@ -3,7 +3,6 @@ package com.iot.modbus_rtcp.netty;
import cn.hutool.core.util.ArrayUtil;
import com.iot.modbus_rtcp.config.EquipmentIPProperties;
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import com.iot.modbus_rtcp.utils.CRCUtil;
import com.iot.modbus_rtcp.utils.HexUtil;
import com.iot.modbus_rtcp.utils.SpringUtil;
import io.netty.buffer.ByteBuf;
@ -11,7 +10,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
@ -90,21 +88,8 @@ public class ModbusDecoder extends ByteToMessageDecoder {
if (bytesCache.length < (message.getLength() / 2)) {
DataCache.put(identity, bytesCache);
} else if (bytesCache.length == (message.getLength() / 2)) {
// if (!verifyCRC(bytesCache)) {
// return;
// }
out.add(HexUtil.bytesToHexString(bytesCache));
}
}
private boolean verifyCRC(byte[] bytes) {
byte[] crc = ByteUtils.subArray(bytes, bytes.length - 2, bytes.length);
byte[] data = ByteUtils.subArray(bytes, 0, bytes.length - 2);
String generate = CRCUtil.getCRC(data);
String original = HexUtil.bytesToHexString(crc);
return original.equalsIgnoreCase(generate);
}
}

View File

@ -7,6 +7,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ -16,6 +17,7 @@ public class NettyServer extends ChannelInitializer<SocketChannel> {
private ChannelFuture future;
private int port, nThread;
@Getter
private ChannelGroup group;
private ModbusSender sender;
private ChannelHandler handler;

View File

@ -1,5 +1,7 @@
package com.iot.modbus_rtcp.utils;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
public class CRCUtil {
public static String getCRC(byte[] bytes, boolean cvs) {
int CRC = 0x0000ffff;
@ -29,6 +31,16 @@ public class CRCUtil {
return getCRC(bytes, true);
}
public static boolean verifyCRC(byte[] bytes) {
byte[] crc = ByteUtils.subArray(bytes, bytes.length - 2, bytes.length);
byte[] data = ByteUtils.subArray(bytes, 0, bytes.length - 2);
String generate = CRCUtil.getCRC(data);
String original = HexUtil.bytesToHexString(crc);
return original.equalsIgnoreCase(generate);
}
public static void main(String[] args) {
String[] array = new String[]{"0A0300000019", "0A0303930023", "0A0301A4002D", "0A0301D6002D", "0A0300320064", "0A0300960064", "0A0300FA0064", "0A03015E0064", "0A0200000050"};
for (String str : array) {