采集代码

This commit is contained in:
wangshilong 2024-11-24 12:07:47 +08:00
parent 84b436a824
commit 2f7ba359e4
29 changed files with 1583 additions and 0 deletions

2
.gitattributes vendored Normal file
View File

@ -0,0 +1,2 @@
/mvnw text eol=lf
*.cmd text eol=crlf

131
pom.xml Normal file
View File

@ -0,0 +1,131 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.5</version>
<relativePath/>
</parent>
<groupId>com.iot.modbus_rtcp</groupId>
<artifactId>modbus-rtcp</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ModBus-rtcp</name>
<description>modbus-rtcp</description>
<properties>
<java.version>21</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<fastjson.version>2.0.51</fastjson.version>
<lombok.version>1.18.34</lombok.version>
<hutool.version>5.8.29</hutool.version>
<jakarta.version>6.0.0</jakarta.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- basic -->
<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
<version>${jakarta.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>3.3.5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- tools -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- tools -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-3-starter</artifactId>
<version>1.2.20</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>3.3.5</version>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,37 @@
package com.iot.modbus_rtcp;
import cn.hutool.json.JSONUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication(scanBasePackages = {"com.iot.modbus_rtcp"})
public class ModbusRtcpApplication implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit");
public static void main(String[] args) {
SpringApplication.run(ModbusRtcpApplication.class, args);
}
// 为RabbitTemplate设置路由到队列失败时调用的方法
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//设置ReturnCallback
rabbitTemplate.setReturnsCallback((returnedMessage) -> {
logger.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", returnedMessage.getReplyCode(),
returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(),
JSONUtil.toJsonStr(returnedMessage.getMessage()));
});
}
}

View File

@ -0,0 +1,25 @@
package com.iot.modbus_rtcp.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.HashSet;
import java.util.Map;
@Configuration
@Data
@ConfigurationProperties(prefix = "server.netty")
public class EquipmentIPProperties {
private Map<String, String> identifiers;
public HashSet<String> keys() {
return new HashSet<>(this.identifiers.keySet());
}
public String get(String key) {
return this.identifiers.get(key);
}
}

View File

@ -0,0 +1,97 @@
package com.iot.modbus_rtcp.controller;
import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import com.iot.modbus_rtcp.netty.NettyServer;
import com.iot.modbus_rtcp.utils.CRCUtil;
import com.iot.modbus_rtcp.utils.HexUtil;
import com.iot.modbus_rtcp.vo.Response;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* Modbus-TCP协议API
* 通信方式采用TCP的方式
*/
@Slf4j
@RestController
@RequestMapping("modbus-tcp")
public class ModbusTCPController implements ApplicationRunner {
private NettyServer nettyServer;
@PreDestroy
private void destroy() {
this.nettyServer.stop();
}
/**
* 采集命令API
* 采集命令API使用异步访问的方式设备响应数据后会将其推送到Kafka
*
* @param modbusCommandBoList
* @return
*/
@PostMapping("/collect")
public Response<String> collect(@RequestBody List<ModbusCommandDto> modbusCommandBoList) {
log.debug("采集请求:{}", modbusCommandBoList);
try {
modbusCommandBoList.stream().forEach(modbusCommandBo -> {
modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.HexStringToBytes(modbusCommandBo.getCommand())));
modbusCommandBo.setType(CommandTypeComparable.CommandType.COLLECTION);
modbusCommandBo.setTimestamp(System.nanoTime());
});
this.nettyServer.sender().send(modbusCommandBoList);
} catch (Exception e) {
log.error("", e);
return Response.failed(e.getMessage());
}
return Response.succeed();
}
/**
* 控制命令API
* 控制命令API使用异步访问的方式设备响应数据自动丢弃
*
* @param modbusCommandBoList
* @return 设备响应返回
*/
@PostMapping("/control")
public Response<String> control(@RequestBody List<ModbusCommandDto> modbusCommandBoList) {
log.debug("控制请求:{}", modbusCommandBoList);
try {
modbusCommandBoList.stream().forEach(modbusCommandBo -> {
modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.HexStringToBytes(modbusCommandBo.getCommand())));
modbusCommandBo.setType(CommandTypeComparable.CommandType.CONTROL);
modbusCommandBo.setTimestamp(System.nanoTime());
});
this.nettyServer.sender().send(modbusCommandBoList);
} catch (Exception e) {
log.error("", e);
return Response.failed(e.getMessage());
}
return Response.succeed();
}
@Override
public void run(ApplicationArguments args) throws Exception {
this.nettyServer = new NettyServer(502, 10);
this.nettyServer.start();
}
}

View File

@ -0,0 +1,4 @@
package com.iot.modbus_rtcp.dto;
public interface CommandDto {
}

View File

@ -0,0 +1,41 @@
package com.iot.modbus_rtcp.dto;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
public class CommandTypeComparable implements Comparable {
private CommandType type;
private long timestamp;
@Override
public int compareTo(Object o) {
CommandTypeComparable commandTypeComparable = (CommandTypeComparable) o;
if (this.getType() == commandTypeComparable.getType()) {
return this.timestamp > commandTypeComparable.timestamp ? 1 : -1;
} else if (this.getType() == CommandType.CONTROL) {
return -1;
} else if (this.getType() == CommandType.COLLECTION) {
return 1;
} else {
throw new RuntimeException("无法排序");
}
}
@Getter
public enum CommandType {
CONTROL("控制"), COLLECTION("采集");
private final String name;
CommandType(String name) {
this.name = name;
}
}
}

View File

@ -0,0 +1,32 @@
package com.iot.modbus_rtcp.dto;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder
@NoArgsConstructor
public class ModbusCommandDto extends CommandTypeComparable implements CommandDto {
/**
* 12位十六进制的命令
*/
private String command;
/**
* 采集长度
*/
private int length;
/**
* 自定义标识
* 采集情况下会推送到MQ的内容格式为自定义标识+设备反馈报文
*/
private String key;
/**
* 网关唯一标识符
*/
private String identifier;
}

View File

@ -0,0 +1,56 @@
package com.iot.modbus_rtcp.jobs;
import cn.hutool.core.map.MapUtil;
import com.iot.modbus_rtcp.controller.ModbusTCPController;
import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/23 18:33
*/
@Component
@RequiredArgsConstructor
public class AutoCollectJobs {
private static final String SQL = "SELECT ref_id as deviceId, command, message_length messageLength FROM commands " +
"WHERE type = 'COLLECTION' and ref_type = 'DEVICE' and id < 10099 order by id";
private final JdbcTemplate jdbcTemplate;
private final ModbusTCPController controller;
@Scheduled(cron = "0/30 * * * * ? ")
public void autoCollect() {
int pageIndex = 0;
int pageSize = 1000;
long timestamp = System.currentTimeMillis();
List<Map<String, Object>> resultList = null;
while (Objects.isNull(resultList) || resultList.size() >= pageSize) {
resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++) + "," + pageSize);
if (ObjectUtils.isNotEmpty(resultList)) {
this.controller.collect(resultList.stream()
.filter(item -> StringUtils.isNotEmpty(MapUtil.getStr(item, "identifier")))
.map(item -> ModbusCommandDto.builder()
.command(MapUtil.getStr(item, "command"))
.identifier("4B454E454E4731343030303030333538")
.length(MapUtil.getInt(item, "messageLength"))
.type(CommandTypeComparable.CommandType.COLLECTION)
.key("4B454E454E4731343030303030333538/" + MapUtil.getStr(item, "deviceId") + "/" + timestamp)
.build())
.collect(Collectors.toList()));
}
}
}
}

View File

@ -0,0 +1,40 @@
package com.iot.modbus_rtcp.netty;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ChannelGroup {
private final ConcurrentHashMap<String, SyncPriorityChannel> mChannelMap = new ConcurrentHashMap<>();
public void put(String identity, SyncPriorityChannel channel) {
this.mChannelMap.put(identity, channel);
}
public SyncPriorityChannel get(String identity) {
return this.mChannelMap.get(identity);
}
public SyncPriorityChannel get(String ip, int port) {
return this.mChannelMap.get(getKey(ip, port));
}
public void remove(String ip, int port) {
this.mChannelMap.remove(getKey(ip, port));
}
public void remove(String identity) {
this.mChannelMap.remove(identity);
}
public void see() {
log.info("当前连接有:{}", new ArrayList<>(this.mChannelMap.keySet()));
}
public static String getKey(String ip, int port) {
return ip + ":" + port;
}
}

View File

@ -0,0 +1,25 @@
package com.iot.modbus_rtcp.netty;
import java.util.concurrent.ConcurrentHashMap;
public class DataCache {
private static ConcurrentHashMap<String, byte[]> mDataCacheMap = new ConcurrentHashMap<>();
public static void put(String identity, byte[] data) {
synchronized (DataCache.class) {
mDataCacheMap.put(identity, data);
}
}
public static byte[] get(String identity) {
synchronized (DataCache.class) {
return mDataCacheMap.get(identity);
}
}
public static byte[] remove(String identity) {
synchronized (DataCache.class) {
return mDataCacheMap.remove(identity);
}
}
}

View File

@ -0,0 +1,39 @@
package com.iot.modbus_rtcp.netty;
import cn.hutool.core.map.BiMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class IPGatewayRelation {
// key ip value 网关编码(网关标识)
private static Map<String, String> mIPGatewayMap = new ConcurrentHashMap<>();
// key 网关标识 value ip端口
private static BiMap<String, String> addressMap = new BiMap<>(new ConcurrentHashMap<>());
public static String get(String ipAddressKey) {
return mIPGatewayMap.get(ipAddressKey);
}
public static void put(String ipAddressKey, String gatewayCodeMapping) {
mIPGatewayMap.put(ipAddressKey, gatewayCodeMapping);
}
public static void putIPAddressMap(String gatewayIdentifier, String ipAddressKey) {
addressMap.put(gatewayIdentifier, ipAddressKey);
}
public static String getIPAddress(String gatewayIdentifier) {
return addressMap.get(gatewayIdentifier);
}
public static String getGatewayIdentifier(String ipAddressKey) {
return addressMap.getInverse().get(ipAddressKey);
}
public static String getGatewayIdentifier(String ip, int port) {
return getGatewayIdentifier(ChannelGroup.getKey(ip, port));
}
}

View File

@ -0,0 +1,95 @@
package com.iot.modbus_rtcp.netty;
import cn.hutool.core.util.ArrayUtil;
import com.iot.modbus_rtcp.config.EquipmentIPProperties;
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import com.iot.modbus_rtcp.utils.HexUtil;
import com.iot.modbus_rtcp.utils.SpringUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@Slf4j
public class ModbusDecoder extends ByteToMessageDecoder {
private final ChannelGroup channelGroup;
private final HashSet<String> identityList;
private final EquipmentIPProperties equipmentIPProperties;
public ModbusDecoder(ChannelGroup channelGroup) {
this.channelGroup = channelGroup;
this.equipmentIPProperties = SpringUtil.getBean(EquipmentIPProperties.class);
this.identityList = this.equipmentIPProperties.keys();
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
String ip = ((InetSocketAddress) ctx.channel().remoteAddress()).getHostString();
int port = ((InetSocketAddress) ctx.channel().remoteAddress()).getPort();
byte[] b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
String hex = HexUtil.bytesToHexString(b);
log.info("解析到:{}", hex);
if (this.identityList.contains(hex.toUpperCase())) { // 心跳
String msgUpperCase = hex.toUpperCase();
for (String identity : this.identityList) {
if (Objects.equals(identity, msgUpperCase)) {
log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase);
String currentAddress = ChannelGroup.getKey(ip, port);
SyncPriorityChannel channel = this.channelGroup.get(currentAddress);
channel.setIdentifier(msgUpperCase);
this.channelGroup.put(msgUpperCase, channel);
SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(channel.getCollectQueue()).build());
IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase));
String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase);
if (StringUtils.isNotBlank(currentAddress) &&
StringUtils.isNotBlank(oldAddress) &&
!currentAddress.equals(oldAddress)) {
//说明设备重连后IP+端口发生了改变
String[] split = oldAddress.split(":");
//删除老连接
this.channelGroup.remove(split[0], Integer.parseInt(split[1]));
log.info("删除channelGroup通道:{}连接:{},新增该通道连接:{}", this.equipmentIPProperties.get(msgUpperCase), oldAddress, currentAddress);
}
IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress);
return;
}
}
}
SyncPriorityChannel channel = this.channelGroup.get(ip, port);
ModbusCommandDto message = channel.getCurrentMessage();
if (message == null) {
log.warn("非法传输:{}", HexUtil.bytesToHexString(b));
return;
}
String identity = ChannelGroup.getKey(ip, port);
byte[] bytesCache = DataCache.get(identity);
if (bytesCache == null) {
bytesCache = b;
} else {
bytesCache = ArrayUtil.addAll(bytesCache, b);
}
if (bytesCache.length < (message.getLength() / 2)) {
DataCache.put(identity, bytesCache);
} else if (bytesCache.length == (message.getLength() / 2)) {
out.add(HexUtil.bytesToHexString(bytesCache));
}
}
}

View File

@ -0,0 +1,15 @@
package com.iot.modbus_rtcp.netty;
import com.iot.modbus_rtcp.utils.HexUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class ModbusEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) {
byteBuf.writeBytes(HexUtil.HexStringToBytes(s));
}
}

View File

@ -0,0 +1,56 @@
package com.iot.modbus_rtcp.netty;
import com.iot.modbus_rtcp.config.EquipmentIPProperties;
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import com.iot.modbus_rtcp.utils.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 报文分发器
* 通信报文{@link ModbusCommandDto} 提取命令的内容获取PLC地址
* 按地址将报文分发到对应的 {@link SyncPriorityChannel}
*/
@Slf4j
public class ModbusSender {
private final ChannelGroup channelGroup;
private final EquipmentIPProperties equipmentIPProperties;
public ModbusSender(ChannelGroup channelGroup) {
this.channelGroup = channelGroup;
this.equipmentIPProperties = SpringUtil.getBean(EquipmentIPProperties.class);
}
public void send(List<ModbusCommandDto> modbusCommandBoList) {
Map<String, List<ModbusCommandDto>> splitCommandMap = this.split(modbusCommandBoList);
splitCommandMap.forEach((identity, v) -> {
SyncPriorityChannel channel = this.channelGroup.get(identity);
if (channel == null) {
throw new RuntimeException("设备" + identity + "的连接未配置或该设备已断开与服务器的连接,请稍后再试或联系管理员");
}
log.info("{}通道({})提交{}条命令:{}", this.equipmentIPProperties.get(identity), identity, v.size(), v);
channel.addMessages(v);
});
}
private Map<String, List<ModbusCommandDto>> split(List<ModbusCommandDto> modbusCommandBoList) {
Map<String, List<ModbusCommandDto>> map = new HashMap<>();
modbusCommandBoList.stream().forEach(modbusCommandBo -> {
if (modbusCommandBo.getIdentifier() == null) {
return;
}
List<ModbusCommandDto> list = map.getOrDefault(modbusCommandBo.getIdentifier(), new ArrayList<>());
list.add(modbusCommandBo);
map.put(modbusCommandBo.getIdentifier(), list);
});
return map;
}
}

View File

@ -0,0 +1,82 @@
package com.iot.modbus_rtcp.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServer extends ChannelInitializer<SocketChannel> {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ChannelFuture future;
private int port, nThread;
private ChannelGroup group;
private ModbusSender sender;
private ChannelHandler handler;
/**
* 创建指定服务端口指定线程数的服务端
*
* @param port 服务端口
* @param nThread 执行线程池线程数
*/
public NettyServer(int port, int nThread) {
this.port = port;
this.nThread = nThread;
this.group = new ChannelGroup();
this.sender = new ModbusSender(this.group);
this.handler = new SyncHandler(this.group);
}
/**
* 启动服务
*/
public void start() {
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(this.nThread);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(this);
this.future = bootstrap.bind(this.port);
}
/**
* 停止服务
*/
public void stop() {
this.future.channel().closeFuture();
this.workerGroup.shutdownGracefully();
this.bossGroup.shutdownGracefully();
}
public ModbusSender sender() {
return this.sender;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
log.info("创建channel:{}", ch);
String ip = ch.remoteAddress().getHostString();
//过滤掉docker 网关请求
if ("172.17.0.1".equals(ip)) return;
int port = ch.remoteAddress().getPort();
this.group.put(ip + ":" + port, new SyncPriorityChannel(ch));
pipeline.addLast(new ModbusEncoder());
pipeline.addLast("decoder", new ModbusDecoder(this.group));
// pipeline.addLast(new ReadTimeoutHandler(10000, TimeUnit.MILLISECONDS));
pipeline.addLast(this.handler);
}
}

View File

@ -0,0 +1,83 @@
package com.iot.modbus_rtcp.netty;
import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import com.iot.modbus_rtcp.utils.SpringUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.net.InetSocketAddress;
import java.util.Objects;
/**
*
*/
@Slf4j
@ChannelHandler.Sharable
public class SyncHandler extends ChannelInboundHandlerAdapter {
private final ChannelGroup channelGroup;
private final RabbitTemplate rabbitTemplate;
public SyncHandler(ChannelGroup channelGroup) {
this.channelGroup = channelGroup;
this.rabbitTemplate = SpringUtil.getBean(RabbitTemplate.class);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
int port = remoteAddress.getPort();
String ip = remoteAddress.getHostString();
SyncPriorityChannel channel = this.channelGroup.get(ip, port);
ModbusCommandDto message = channel.getCurrentMessage();
try {
channel.getChannelPromise().setSuccess();
} catch (IllegalStateException e) {
log.info("数据返回慢了,已经超时!");
return;
} catch (Exception e) {
log.info("丢失数据:{}", msg);
return;
}
if (Objects.isNull(message)) {
log.error("未找到发送源: {}", msg);
return;
}
// 控制返回丢弃
if (CommandTypeComparable.CommandType.CONTROL.equals(message.getType())) {
return;
}
String json = message.getKey() + "/" + System.currentTimeMillis() + "/" + msg;
log.info("推数据到MQ({}): {}", channel.getCollectQueue(), json);
try {
this.rabbitTemplate.convertAndSend(channel.getCollectQueue(), json);
} catch (Exception e) {
log.error("推数据到MQ失败({})", channel.getCollectQueue(), e);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info("用户事件: {}-> {}", evt.getClass().getName(), evt);
super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("异常: ", cause);
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String ipKey = ChannelGroup.getKey(remoteAddress.getHostString(), remoteAddress.getPort());
this.channelGroup.remove(ipKey);
String gatewayIdentifier = IPGatewayRelation.getGatewayIdentifier(ipKey);
this.channelGroup.remove(gatewayIdentifier);
ctx.close();
}
}

View File

@ -0,0 +1,145 @@
package com.iot.modbus_rtcp.netty;
import com.iot.modbus_rtcp.dto.CommandTypeComparable;
import com.iot.modbus_rtcp.dto.ModbusCommandDto;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* 同步控制命令优先Channel
* 该Channel使用组合方式结合了{@link SocketChannel} 具备Channel的功能
* 发送之后必须得到客户端响应或者超时后执行下一条命令发送即同步发送机制同时保证控制命令优先执行
* <p>
* 调用添加命令方法作为一次任务提交给线程池执行如果在执行期间还有调用添加命令方法便会在同一任务中追加执行的命令
*/
@Slf4j
public class SyncPriorityChannel implements Runnable {
private static final int allowTimeout = 1;
private static final boolean isControl = true;
private final SocketChannel channel;
private int timeoutCount;
@Getter
private volatile String identifier;
@Getter
private volatile String collectQueue;
private volatile boolean running = false;
private final Object object = new Object();
private final PriorityBlockingQueue<ModbusCommandDto> messageQueue = new PriorityBlockingQueue<>();
private final AtomicReference<ChannelPromise> channelPromiseReference = new AtomicReference<>();
private final AtomicReference<ModbusCommandDto> currentMessageReference = new AtomicReference<>();
public SyncPriorityChannel(SocketChannel channel) {
this.channel = channel;
}
public void addMessages(List<ModbusCommandDto> messages) {
this.messageQueue.addAll(messages);
synchronized (this.object) {
if (this.running) {
return;
} else {
this.running = true;
}
}
ThreadPoolConsumer.submit(this);
}
@Override
public void run() {
this.sendNext();
}
public synchronized void sendNext() {
if (this.timeoutCount == 0) {
this.currentMessageReference.setRelease(this.messageQueue.poll());
}
// 消费完毕终止递归发送
if (Objects.isNull(this.getCurrentMessage())) {
this.running = false;
return;
}
InetSocketAddress remoteAddress = this.channel.remoteAddress();
int port = remoteAddress.getPort();
String ip = remoteAddress.getHostString();
this.channelPromiseReference.setRelease(this.channel.newPromise());
if (isControl || Objects.equals(CommandTypeComparable.CommandType.COLLECTION, this.getCurrentMessage().getType())) {
log.info("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
this.getCurrentMessage().getCommand(),
this.getCurrentMessage().getLength(),
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
ChannelGroup.getKey(ip, port),
this.messageQueue.size());
this.channel.writeAndFlush(this.getCurrentMessage().getCommand());
}
// 清空缓存数据
DataCache.remove(ChannelGroup.getKey(ip, port));
boolean timeout;
try {
timeout = !this.getChannelPromise().await(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("channelPromise.await发生异常,", e);
return;
}
if (timeout && this.timeoutCount == (allowTimeout - 1)) {
log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
this.getCurrentMessage().getCommand(),
this.getCurrentMessage().getLength(),
IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
ChannelGroup.getKey(ip, port),
this.messageQueue.size());
}
if (timeout && this.timeoutCount < (allowTimeout - 1)) {
this.timeoutCount++;
} else {
this.timeoutCount = 0;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.sendNext();
}
public ChannelPromise getChannelPromise() {
return this.channelPromiseReference.getAcquire();
}
public ModbusCommandDto getCurrentMessage() {
return this.currentMessageReference.getAcquire();
}
public void setIdentifier(String identifier) {
this.identifier = identifier;
this.setCollectQueue();
}
public void setCollectQueue() {
this.collectQueue = "/modbus/device/" + this.identifier + "/collect";
}
}

View File

@ -0,0 +1,21 @@
package com.iot.modbus_rtcp.netty;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Slf4j
public class ThreadPoolConsumer {
private static final ThreadPoolTaskExecutor mThreadPoolExecutor;
static {
mThreadPoolExecutor = new ThreadPoolTaskExecutor();
mThreadPoolExecutor.setCorePoolSize(4);
mThreadPoolExecutor.setMaxPoolSize(8);
mThreadPoolExecutor.setKeepAliveSeconds(60);
mThreadPoolExecutor.initialize();
}
public static void submit(Runnable task) {
mThreadPoolExecutor.submit(task);
}
}

View File

@ -0,0 +1,38 @@
package com.iot.modbus_rtcp.utils;
public class CRCUtil {
public static String getCRC(byte[] bytes, boolean cvs) {
int CRC = 0x0000ffff;
int POLYNOMIAL = 0x0000a001;
int i, j;
for (i = 0; i < bytes.length; i++) {
CRC ^= ((int) bytes[i] & 0x000000ff);
for (j = 0; j < 8; j++) {
if ((CRC & 0x00000001) != 0) {
CRC >>= 1;
CRC ^= POLYNOMIAL;
} else {
CRC >>= 1;
}
}
}
if (cvs) {
CRC = ((CRC & 0x0000FF00) >> 8) | ((CRC & 0x000000FF) << 8);
}
return String.format("%04x", CRC).toUpperCase();
}
public static String getCRC(byte[] bytes) {
return getCRC(bytes, true);
}
public static void main(String[] args) {
String[] array = new String[]{"0A0300000019", "0A0303930023", "0A0301A4002D", "0A0301D6002D", "0A0300320064", "0A0300960064", "0A0300FA0064", "0A03015E0064", "0A0200000050"};
for (String str : array) {
System.out.println(str + CRCUtil.getCRC(HexUtil.HexStringToBytes(str)));
}
}
}

View File

@ -0,0 +1,29 @@
package com.iot.modbus_rtcp.utils;
public class HexUtil {
public static byte[] HexStringToBytes(String src) {
int l = src.length() / 2;
byte[] ret = new byte[l];
for (int i = 0; i < l; i++) {
ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
}
return ret;
}
public static String bytesToHexString(byte[] src) {
String strHex = "";
StringBuilder sb = new StringBuilder("");
for (int n = 0; n < src.length; n++) {
strHex = Integer.toHexString(src[n] & 0xFF);
// 每个字节由两个字符表示位数不够高位补0
sb.append((strHex.length() == 1) ? "0" + strHex : strHex);
}
return sb.toString().trim().toUpperCase();
}
public static void main(String[] args) {
System.out.println(HexStringToBytes("0D"));
System.out.println(new byte[]{(byte) Integer.parseInt("0D", 16)});
}
}

View File

@ -0,0 +1,53 @@
package com.iot.modbus_rtcp.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringUtil.applicationContext == null) {
SpringUtil.applicationContext = applicationContext;
}
log.info("ApplicationContext配置成功,applicationContext=" + SpringUtil.applicationContext);
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
public static void registerBean(String beanName, BeanDefinitionBuilder beanDefinitionBuilder) {
if (StringUtils.isBlank(beanName) || beanDefinitionBuilder == null) {
return;
}
ConfigurableApplicationContext context = (ConfigurableApplicationContext) getApplicationContext();
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) context.getBeanFactory();
beanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());
}
}

View File

@ -0,0 +1,82 @@
package com.iot.modbus_rtcp.vo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serial;
import java.io.Serializable;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Response<T> implements Serializable {
@Serial
private static final long serialVersionUID = 2289553801394006859L;
private int code = ResponseCode.SUCCESS.code();
private String message = ResponseCode.SUCCESS.message();
private T data;
public static <T> Response<T> succeed() {
return Response.<T>builder().code(ResponseCode.SUCCESS.code()).message(ResponseCode.SUCCESS.message()).build();
}
public static <T> Response<T> succeed(String message) {
return Response.<T>builder().code(ResponseCode.SUCCESS.code()).message(message).build();
}
public static <T> Response<T> succeed(T t) {
return Response.<T>builder().code(ResponseCode.SUCCESS.code()).message(ResponseCode.SUCCESS.message()).data(t)
.build();
}
public static <T> Response<T> succeed(String message, T t) {
return Response.<T>builder().code(ResponseCode.SUCCESS.code()).message(message).data(t)
.build();
}
public static <T> Response<T> succeed(ResponseCode responseCode) {
return Response.<T>builder().code(responseCode.code()).message(responseCode.message()).build();
}
public static <T> Response<T> succeed(ResponseCode responseCode, T t) {
return Response.<T>builder().code(responseCode.code()).message(responseCode.message()).data(t)
.build();
}
public static <T> Response<T> failed() {
return Response.<T>builder().code(ResponseCode.FAILURE.code()).message(ResponseCode.FAILURE.message()).build();
}
public static <T> Response<T> failed(String message) {
return Response.<T>builder().code(ResponseCode.FAILURE.code()).message(message).build();
}
public static <T> Response<T> failed(int code, String message) {
return Response.<T>builder().code(code).message(message).build();
}
public static <T> Response<T> failed(ResponseCode responseCode) {
return Response.<T>builder().code(responseCode.code()).message(responseCode.message()).build();
}
public static <T> Response<T> failed(ResponseCode responseCode, T t) {
return Response.<T>builder().code(responseCode.code()).message(responseCode.message()).data(t).build();
}
public static <T> Response<T> failed(T t) {
return Response.<T>builder().code(ResponseCode.FAILURE.code()).message(ResponseCode.FAILURE.message()).data(t)
.build();
}
public static <T> Response<T> define(int code, T t) {
return Response.<T>builder().code(code).message(ResponseCode.SUCCESS.message()).data(t).build();
}
public static <T> Response<T> define(int code, String message, T t) {
return Response.<T>builder().code(code).message(message).data(t).build();
}
}

View File

@ -0,0 +1,53 @@
package com.iot.modbus_rtcp.vo;
public enum ResponseCode {
SUCCESS(200, "操作成功"),
FAILURE(400, "操作失败"),
/**
* 参数错误代码
*/
PARAM_IS_INVALID(1001, "参数无效"),
PARAM_IS_BLANK(1002, "参数为空"),
PARAM_TYPE_BIND_ERROR(1003, "参数类型错误"),
PARAM_NOT_COMPLETE(1004, "参数缺失"),
PARAM_ERROR(1004, "参数错误"),
/**
* 用户错误代码
*/
USER_NOT_LOGIN_IN(2001, "手机号未登录"),
USER_LOGIN_ERROR(2002, "账号或密码输入有误"),
USER_ACCOUNT_FORBIDDEN(2003, "手机号被冻结"),
USER_NOT_EXIST(2004, "账号或手机号未注册"),
USER_HAS_EXISTED(2005, "账号或手机号已注册"),
USER_NOT_BIND(2006, "用户需要绑定手机号"),
USER_MORE(2007, "系统存在多个正常状态的账号或手机号,请联系管理员"),
/**
* 权限验证错误
*/
USER_AUTHORIZATION_ERROR(3001, "用户权限错误"),
USER_NO_LOGIN(3002, "尚未登录,请登录"),
USER_PREV_NO_ATTENDED(3003, "上轮您未投票,不可以继续投票了"),
USER_NO_ATTENDED(3004, "本轮您未投票"),
/**
* 其他系统错误
*/
SYSTEM_ERROR(4001, "系统错误"),
VERIFY_CODE_ERROR(4002, "验证码错误!");
int code;
String message;
ResponseCode(int code, String message) {
this.message = message;
this.code = code;
}
public String message() {
return this.message;
}
public int code() {
return this.code;
}
}

View File

@ -0,0 +1,37 @@
spring:
application:
name: modbus-rtcp
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1:3306/gas_well_watch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
username: cq
password: cq
rabbitmq:
host: ${RABBIT_MQ_HOST:localhost}
port: ${RABBIT_MQ_PORT:5672}
username: ${RABBIT_MQ_USERNAME:ModbusAdmin}
password: ${RABBIT_MQ_PASSWORD:ModbusPassword}
virtual-host: /
publisher-confirm-type: correlated
#接收确认,默认关闭建议false或不配置在代码中根据实际情况进行ack销毁
publisher-returns: true
template:
mandatory: true
# kafka:
# bootstrap-servers: 172.17.0.1:9092
# template:
# default-topic: collector-modbus-rtcp-group
snowflake:
worker: 0
dataCenter: 1
server:
port: 9999
netty:
identifiers:
4B454E454E4731343030303030333538: KENENG1400000358
3030303030: 00000

View File

@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="true">
<springProperty name="SERVICE_NAME" scope="context" source="spring.application.name" defaultValue="rtcp"/>
<!-- %40():如果字符没有37个字符长度,则左侧用空格补齐 -->
<!-- %-40():如果字符没有40个字符长度,则右侧用空格补齐 -->
<!-- %15.15():如果记录的线程字符长度小于15(第一个)则用空格在左侧补齐,如果字符长度大于15(第二个),则从开头开始截断多余的字符 -->
<!-- %-40.40():如果记录的logger字符长度小于40(第一个)则用空格在右侧补齐,如果字符长度大于40(第二个),则从开头开始截断多余的字符 -->
<!-- %msg日志打印详情 -->
<!-- %n:换行符 -->
<!-- %highlight():转换说明符以粗体红色显示其级别为ERROR的事件红色为WARNBLUE为INFO以及其他级别的默认颜色。 -->
<property name="LOG_PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) --- [%15.15(%thread)] [%X{traceId}] %cyan(%-40.40(%logger{40})) : %msg%n"/>
<property name="LOG_HOME" value="/Users/snow/log/venta/mtcp"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!--<immediateFlush>true</immediateFlush>-->
<encoder>
<pattern>${LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="LOCAL_INFO_LOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_HOME}/info/info.log</File>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/info/info.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxHistory>30</maxHistory>
<totalSizeCap>2GB</totalSizeCap>
<maxFileSize>50MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>${LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="LOCAL_ERROR_LOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_HOME}/error/error.log</File>
<append>true</append>
<immediateFlush>true</immediateFlush>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/error/error.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxHistory>30</maxHistory>
<totalSizeCap>2GB</totalSizeCap>
<maxFileSize>50MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>${LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="LOCAL_INFO_LOG"/>
<appender-ref ref="LOCAL_ERROR_LOG"/>
</root>
</configuration>

View File

@ -0,0 +1,13 @@
package com.iot.modbus_rtcp;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ModbusRtcpApplicationTests {
@Test
void contextLoads() {
}
}

View File

@ -0,0 +1,113 @@
package com.iot.modbus_rtcp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/20 16:01
*/
public class NonBlockingSocketTest {
private static final String HOST = "127.0.0.1";
private static final Integer PORT = 1200;
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(HOST, PORT));
while (!socketChannel.finishConnect()) {
// 等待连接建立
System.out.println("正在建立连接...");
}
long lastSentHeartBeatTime = System.currentTimeMillis();
// 连接已建立发送和接收数据
// ByteBuffer heartBeatBuffer = ByteBuffer.wrap("Hello, Server!".getBytes());
ByteBuffer heartBeatBuffer = ByteBuffer.wrap("KENENG1400000358".getBytes());
// ByteBuffer heartBeatBuffer = ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes());
socketChannel.write(heartBeatBuffer);
long nowTime = 0L;
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isConnected()) {
readBuffer.clear();
socketChannel.read(readBuffer);
readBuffer.flip();
// 是否有可用数据
if (!readBuffer.hasRemaining()) {
nowTime = System.currentTimeMillis();
if (nowTime - lastSentHeartBeatTime > 5000) {
lastSentHeartBeatTime = nowTime;
socketChannel.write(heartBeatBuffer);
}
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
continue;
}
String line = byteBufferToHexString(readBuffer).trim();
lastSentHeartBeatTime = System.currentTimeMillis();
System.out.println(LocalDateTime.now() + "<==:收到服务器端请求:" + line);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
// 接收到请求
switch (line.toUpperCase()) {
case "01040000001531C5" ->
// 发送响应字符串
socketChannel.write(ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes()));
// 接收到开井请求
case "01050001FF00DDFA" ->
// 发送响应字符串
socketChannel.write(ByteBuffer.wrap("01050001FF00DDFA".getBytes()));
// 接收到关井请求
case "0105000100009C0A" ->
// 发送响应字符串
socketChannel.write(ByteBuffer.wrap("0105000100009C0A".getBytes()));
// 接收到读取运行模式请求
case "010300640001C5D5" ->
// 发送响应字符串
socketChannel.write(ByteBuffer.wrap("01 03 02 00 03 F8 45".replaceAll(" ", "").getBytes()));
// 接收到退出请求
case "exit" -> socketChannel.close();
}
System.out.println(LocalDateTime.now() + "==>:已响应服务器端请求:" + line);
}
}
public static String bytesToHexString(byte[] src) {
String strHex = "";
StringBuilder sb = new StringBuilder(50);
for (int n = 0; n < src.length; n++) {
strHex = Integer.toHexString(src[n] & 0xFF);
// 每个字节由两个字符表示位数不够高位补0
sb.append((strHex.length() == 1) ? "0" + strHex : strHex);
}
return sb.toString().trim().toUpperCase();
}
public static String byteBufferToHexString(ByteBuffer buffer) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
}
}

View File

@ -0,0 +1,76 @@
package com.iot.modbus_rtcp;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Objects;
/**
* @author <a href="mailto:shilong.wang@alpha-ess.com">王仕龙</a>
* 2024/11/20 15:13
*/
public class SocketTest {
private static final String HOST = "127.0.0.1";
private static final Integer PORT = 1200;
@Test
@SneakyThrows
public void testModbus() {
try (Socket socket = new Socket(HOST, PORT, InetAddress.getByName(HOST), 10001);
PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
socket.setKeepAlive(true);
// socket.setTcpNoDelay(true);
socket.setSoTimeout(1500);
String line = null;
while (true) {
if (socket.isClosed()) {
socket.close();
break;
}
try {
// 接收服务器响应
line = reader.readLine();
if (responseCommand(line, writer, socket)) break;
} catch (Exception e) {
}
}
}
}
private static boolean responseCommand(String line, PrintWriter writer, Socket socket) throws IOException {
// 接收到请求
if (Objects.equals(line, "01040000001531C5")) {
// 发送响应字符串
writer.println("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", ""));
}
// 接收到开井请求
else if (Objects.equals(line, "01050001FF00DDFA")) {
// 发送响应字符串
writer.println("01050001FF00DDFA");
}
// 接收到关井请求
else if (Objects.equals(line, "0105000100009C0A")) {
// 发送响应字符串
writer.println("0105000100009C0A");
}
// 接收到读取运行模式请求
else if (Objects.equals(line, "010300640001C5D5")) {
// 发送响应字符串
writer.println("01 03 02 00 03 F8 45".replaceAll(" ", ""));
} else {
socket.close();
return true;
}
return false;
}
}