{
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private ChannelFuture future;
+ private int port, nThread;
+
+ private ChannelGroup group;
+ private ModbusSender sender;
+ private ChannelHandler handler;
+
+ /**
+ * 创建指定服务端口,指定线程数的服务端
+ *
+ * @param port 服务端口
+ * @param nThread 执行线程池线程数
+ */
+ public NettyServer(int port, int nThread) {
+ this.port = port;
+ this.nThread = nThread;
+ this.group = new ChannelGroup();
+ this.sender = new ModbusSender(this.group);
+ this.handler = new SyncHandler(this.group);
+ }
+
+ /**
+ * 启动服务
+ */
+ public void start() {
+ this.bossGroup = new NioEventLoopGroup(1);
+ this.workerGroup = new NioEventLoopGroup(this.nThread);
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(this.bossGroup, this.workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childHandler(this);
+ this.future = bootstrap.bind(this.port);
+ }
+
+ /**
+ * 停止服务
+ */
+ public void stop() {
+ this.future.channel().closeFuture();
+ this.workerGroup.shutdownGracefully();
+ this.bossGroup.shutdownGracefully();
+ }
+
+ public ModbusSender sender() {
+ return this.sender;
+ }
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+
+ log.info("创建channel:{}", ch);
+
+ String ip = ch.remoteAddress().getHostString();
+ //过滤掉docker 网关请求
+ if ("172.17.0.1".equals(ip)) return;
+ int port = ch.remoteAddress().getPort();
+ this.group.put(ip + ":" + port, new SyncPriorityChannel(ch));
+
+ pipeline.addLast(new ModbusEncoder());
+ pipeline.addLast("decoder", new ModbusDecoder(this.group));
+// pipeline.addLast(new ReadTimeoutHandler(10000, TimeUnit.MILLISECONDS));
+ pipeline.addLast(this.handler);
+ }
+}
diff --git a/src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java b/src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java
new file mode 100644
index 0000000..1328579
--- /dev/null
+++ b/src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java
@@ -0,0 +1,83 @@
+package com.iot.modbus_rtcp.netty;
+
+import com.iot.modbus_rtcp.dto.CommandTypeComparable;
+import com.iot.modbus_rtcp.dto.ModbusCommandDto;
+import com.iot.modbus_rtcp.utils.SpringUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
+/**
+ *
+ */
+@Slf4j
+@ChannelHandler.Sharable
+public class SyncHandler extends ChannelInboundHandlerAdapter {
+ private final ChannelGroup channelGroup;
+ private final RabbitTemplate rabbitTemplate;
+
+ public SyncHandler(ChannelGroup channelGroup) {
+ this.channelGroup = channelGroup;
+ this.rabbitTemplate = SpringUtil.getBean(RabbitTemplate.class);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
+ int port = remoteAddress.getPort();
+ String ip = remoteAddress.getHostString();
+
+ SyncPriorityChannel channel = this.channelGroup.get(ip, port);
+ ModbusCommandDto message = channel.getCurrentMessage();
+ try {
+ channel.getChannelPromise().setSuccess();
+ } catch (IllegalStateException e) {
+ log.info("数据返回慢了,已经超时!");
+ return;
+ } catch (Exception e) {
+ log.info("丢失数据:{}", msg);
+ return;
+ }
+
+ if (Objects.isNull(message)) {
+ log.error("未找到发送源: {}", msg);
+ return;
+ }
+
+ // 控制返回丢弃
+ if (CommandTypeComparable.CommandType.CONTROL.equals(message.getType())) {
+ return;
+ }
+ String json = message.getKey() + "/" + System.currentTimeMillis() + "/" + msg;
+ log.info("推数据到MQ({}): {}", channel.getCollectQueue(), json);
+ try {
+ this.rabbitTemplate.convertAndSend(channel.getCollectQueue(), json);
+ } catch (Exception e) {
+ log.error("推数据到MQ失败({})", channel.getCollectQueue(), e);
+ }
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ log.info("用户事件: {}-> {}", evt.getClass().getName(), evt);
+ super.userEventTriggered(ctx, evt);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ log.info("异常: ", cause);
+ InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
+ String ipKey = ChannelGroup.getKey(remoteAddress.getHostString(), remoteAddress.getPort());
+ this.channelGroup.remove(ipKey);
+
+ String gatewayIdentifier = IPGatewayRelation.getGatewayIdentifier(ipKey);
+ this.channelGroup.remove(gatewayIdentifier);
+
+ ctx.close();
+ }
+}
diff --git a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java
new file mode 100644
index 0000000..40e4877
--- /dev/null
+++ b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java
@@ -0,0 +1,145 @@
+package com.iot.modbus_rtcp.netty;
+
+import com.iot.modbus_rtcp.dto.CommandTypeComparable;
+import com.iot.modbus_rtcp.dto.ModbusCommandDto;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.socket.SocketChannel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * 同步控制命令优先Channel
+ * 该Channel使用组合方式结合了{@link SocketChannel} 具备Channel的功能
+ * 发送之后必须得到客户端响应或者超时后,执行下一条命令发送即同步发送机制,同时保证控制命令优先执行。
+ *
+ * 调用添加命令方法作为一次任务提交给线程池执行,如果在执行期间还有调用添加命令方法便会在同一任务中追加执行的命令。
+ */
+@Slf4j
+public class SyncPriorityChannel implements Runnable {
+ private static final int allowTimeout = 1;
+ private static final boolean isControl = true;
+
+ private final SocketChannel channel;
+
+ private int timeoutCount;
+ @Getter
+ private volatile String identifier;
+ @Getter
+ private volatile String collectQueue;
+ private volatile boolean running = false;
+
+ private final Object object = new Object();
+ private final PriorityBlockingQueue messageQueue = new PriorityBlockingQueue<>();
+ private final AtomicReference channelPromiseReference = new AtomicReference<>();
+ private final AtomicReference currentMessageReference = new AtomicReference<>();
+
+ public SyncPriorityChannel(SocketChannel channel) {
+ this.channel = channel;
+ }
+
+ public void addMessages(List messages) {
+ this.messageQueue.addAll(messages);
+
+ synchronized (this.object) {
+ if (this.running) {
+ return;
+ } else {
+ this.running = true;
+ }
+ }
+
+ ThreadPoolConsumer.submit(this);
+ }
+
+ @Override
+ public void run() {
+ this.sendNext();
+ }
+
+ public synchronized void sendNext() {
+ if (this.timeoutCount == 0) {
+ this.currentMessageReference.setRelease(this.messageQueue.poll());
+ }
+
+ // 消费完毕终止递归发送
+ if (Objects.isNull(this.getCurrentMessage())) {
+ this.running = false;
+ return;
+ }
+
+ InetSocketAddress remoteAddress = this.channel.remoteAddress();
+ int port = remoteAddress.getPort();
+ String ip = remoteAddress.getHostString();
+
+
+ this.channelPromiseReference.setRelease(this.channel.newPromise());
+ if (isControl || Objects.equals(CommandTypeComparable.CommandType.COLLECTION, this.getCurrentMessage().getType())) {
+ log.info("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
+ this.getCurrentMessage().getCommand(),
+ this.getCurrentMessage().getLength(),
+ IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
+ ChannelGroup.getKey(ip, port),
+ this.messageQueue.size());
+
+ this.channel.writeAndFlush(this.getCurrentMessage().getCommand());
+ }
+
+ // 清空缓存数据
+ DataCache.remove(ChannelGroup.getKey(ip, port));
+
+ boolean timeout;
+ try {
+ timeout = !this.getChannelPromise().await(3000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.error("channelPromise.await发生异常,", e);
+ return;
+ }
+
+ if (timeout && this.timeoutCount == (allowTimeout - 1)) {
+ log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送",
+ this.getCurrentMessage().getCommand(),
+ this.getCurrentMessage().getLength(),
+ IPGatewayRelation.get(ChannelGroup.getKey(ip, port)),
+ ChannelGroup.getKey(ip, port),
+ this.messageQueue.size());
+ }
+
+ if (timeout && this.timeoutCount < (allowTimeout - 1)) {
+ this.timeoutCount++;
+ } else {
+ this.timeoutCount = 0;
+ }
+
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ this.sendNext();
+ }
+
+ public ChannelPromise getChannelPromise() {
+ return this.channelPromiseReference.getAcquire();
+ }
+
+ public ModbusCommandDto getCurrentMessage() {
+ return this.currentMessageReference.getAcquire();
+ }
+
+ public void setIdentifier(String identifier) {
+ this.identifier = identifier;
+ this.setCollectQueue();
+ }
+
+ public void setCollectQueue() {
+ this.collectQueue = "/modbus/device/" + this.identifier + "/collect";
+ }
+}
diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java b/src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java
new file mode 100644
index 0000000..71f922e
--- /dev/null
+++ b/src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java
@@ -0,0 +1,21 @@
+package com.iot.modbus_rtcp.netty;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@Slf4j
+public class ThreadPoolConsumer {
+ private static final ThreadPoolTaskExecutor mThreadPoolExecutor;
+
+ static {
+ mThreadPoolExecutor = new ThreadPoolTaskExecutor();
+ mThreadPoolExecutor.setCorePoolSize(4);
+ mThreadPoolExecutor.setMaxPoolSize(8);
+ mThreadPoolExecutor.setKeepAliveSeconds(60);
+ mThreadPoolExecutor.initialize();
+ }
+
+ public static void submit(Runnable task) {
+ mThreadPoolExecutor.submit(task);
+ }
+}
diff --git a/src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java b/src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java
new file mode 100644
index 0000000..edae9f5
--- /dev/null
+++ b/src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java
@@ -0,0 +1,38 @@
+package com.iot.modbus_rtcp.utils;
+
+public class CRCUtil {
+ public static String getCRC(byte[] bytes, boolean cvs) {
+ int CRC = 0x0000ffff;
+ int POLYNOMIAL = 0x0000a001;
+
+ int i, j;
+ for (i = 0; i < bytes.length; i++) {
+ CRC ^= ((int) bytes[i] & 0x000000ff);
+ for (j = 0; j < 8; j++) {
+ if ((CRC & 0x00000001) != 0) {
+ CRC >>= 1;
+ CRC ^= POLYNOMIAL;
+ } else {
+ CRC >>= 1;
+ }
+ }
+ }
+
+ if (cvs) {
+ CRC = ((CRC & 0x0000FF00) >> 8) | ((CRC & 0x000000FF) << 8);
+ }
+
+ return String.format("%04x", CRC).toUpperCase();
+ }
+
+ public static String getCRC(byte[] bytes) {
+ return getCRC(bytes, true);
+ }
+
+ public static void main(String[] args) {
+ String[] array = new String[]{"0A0300000019", "0A0303930023", "0A0301A4002D", "0A0301D6002D", "0A0300320064", "0A0300960064", "0A0300FA0064", "0A03015E0064", "0A0200000050"};
+ for (String str : array) {
+ System.out.println(str + CRCUtil.getCRC(HexUtil.HexStringToBytes(str)));
+ }
+ }
+}
diff --git a/src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java b/src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java
new file mode 100644
index 0000000..3024be5
--- /dev/null
+++ b/src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java
@@ -0,0 +1,29 @@
+package com.iot.modbus_rtcp.utils;
+
+public class HexUtil {
+
+ public static byte[] HexStringToBytes(String src) {
+ int l = src.length() / 2;
+ byte[] ret = new byte[l];
+ for (int i = 0; i < l; i++) {
+ ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
+ }
+ return ret;
+ }
+
+ public static String bytesToHexString(byte[] src) {
+ String strHex = "";
+ StringBuilder sb = new StringBuilder("");
+ for (int n = 0; n < src.length; n++) {
+ strHex = Integer.toHexString(src[n] & 0xFF);
+ // 每个字节由两个字符表示,位数不够,高位补0
+ sb.append((strHex.length() == 1) ? "0" + strHex : strHex);
+ }
+ return sb.toString().trim().toUpperCase();
+ }
+
+ public static void main(String[] args) {
+ System.out.println(HexStringToBytes("0D"));
+ System.out.println(new byte[]{(byte) Integer.parseInt("0D", 16)});
+ }
+}
diff --git a/src/main/java/com/iot/modbus_rtcp/utils/SpringUtil.java b/src/main/java/com/iot/modbus_rtcp/utils/SpringUtil.java
new file mode 100644
index 0000000..ae66e90
--- /dev/null
+++ b/src/main/java/com/iot/modbus_rtcp/utils/SpringUtil.java
@@ -0,0 +1,53 @@
+package com.iot.modbus_rtcp.utils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.support.BeanDefinitionBuilder;
+import org.springframework.beans.factory.support.DefaultListableBeanFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class SpringUtil implements ApplicationContextAware {
+ private static ApplicationContext applicationContext;
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ if (SpringUtil.applicationContext == null) {
+ SpringUtil.applicationContext = applicationContext;
+ }
+
+ log.info("ApplicationContext配置成功,applicationContext=" + SpringUtil.applicationContext);
+ }
+
+ public static ApplicationContext getApplicationContext() {
+ return applicationContext;
+ }
+
+ public static Object getBean(String name) {
+ return getApplicationContext().getBean(name);
+ }
+
+ public static T getBean(Class clazz) {
+ return getApplicationContext().getBean(clazz);
+ }
+
+ public static T getBean(String name, Class clazz) {
+ return getApplicationContext().getBean(name, clazz);
+ }
+
+ public static void registerBean(String beanName, BeanDefinitionBuilder beanDefinitionBuilder) {
+ if (StringUtils.isBlank(beanName) || beanDefinitionBuilder == null) {
+ return;
+ }
+ ConfigurableApplicationContext context = (ConfigurableApplicationContext) getApplicationContext();
+ DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) context.getBeanFactory();
+ beanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());
+ }
+
+
+}
diff --git a/src/main/java/com/iot/modbus_rtcp/vo/Response.java b/src/main/java/com/iot/modbus_rtcp/vo/Response.java
new file mode 100644
index 0000000..48fb1de
--- /dev/null
+++ b/src/main/java/com/iot/modbus_rtcp/vo/Response.java
@@ -0,0 +1,82 @@
+package com.iot.modbus_rtcp.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class Response implements Serializable {
+ @Serial
+ private static final long serialVersionUID = 2289553801394006859L;
+ private int code = ResponseCode.SUCCESS.code();
+ private String message = ResponseCode.SUCCESS.message();
+ private T data;
+
+ public static Response succeed() {
+ return Response.builder().code(ResponseCode.SUCCESS.code()).message(ResponseCode.SUCCESS.message()).build();
+ }
+
+ public static Response succeed(String message) {
+ return Response.builder().code(ResponseCode.SUCCESS.code()).message(message).build();
+ }
+
+ public static Response succeed(T t) {
+ return Response.builder().code(ResponseCode.SUCCESS.code()).message(ResponseCode.SUCCESS.message()).data(t)
+ .build();
+ }
+
+ public static Response succeed(String message, T t) {
+ return Response.builder().code(ResponseCode.SUCCESS.code()).message(message).data(t)
+ .build();
+ }
+
+ public static Response succeed(ResponseCode responseCode) {
+ return Response.builder().code(responseCode.code()).message(responseCode.message()).build();
+ }
+
+ public static Response succeed(ResponseCode responseCode, T t) {
+ return Response.builder().code(responseCode.code()).message(responseCode.message()).data(t)
+ .build();
+ }
+
+ public static Response failed() {
+ return Response.builder().code(ResponseCode.FAILURE.code()).message(ResponseCode.FAILURE.message()).build();
+ }
+
+ public static Response failed(String message) {
+ return Response.builder().code(ResponseCode.FAILURE.code()).message(message).build();
+ }
+
+ public static Response failed(int code, String message) {
+ return Response.builder().code(code).message(message).build();
+ }
+
+ public static Response failed(ResponseCode responseCode) {
+ return Response.builder().code(responseCode.code()).message(responseCode.message()).build();
+ }
+
+ public static Response failed(ResponseCode responseCode, T t) {
+ return Response.builder().code(responseCode.code()).message(responseCode.message()).data(t).build();
+ }
+
+ public static Response failed(T t) {
+ return Response.builder().code(ResponseCode.FAILURE.code()).message(ResponseCode.FAILURE.message()).data(t)
+ .build();
+ }
+
+ public static Response define(int code, T t) {
+ return Response.builder().code(code).message(ResponseCode.SUCCESS.message()).data(t).build();
+ }
+
+ public static Response define(int code, String message, T t) {
+ return Response.builder().code(code).message(message).data(t).build();
+ }
+
+}
diff --git a/src/main/java/com/iot/modbus_rtcp/vo/ResponseCode.java b/src/main/java/com/iot/modbus_rtcp/vo/ResponseCode.java
new file mode 100644
index 0000000..f46f42b
--- /dev/null
+++ b/src/main/java/com/iot/modbus_rtcp/vo/ResponseCode.java
@@ -0,0 +1,53 @@
+package com.iot.modbus_rtcp.vo;
+
+public enum ResponseCode {
+ SUCCESS(200, "操作成功"),
+ FAILURE(400, "操作失败"),
+ /**
+ * 参数错误代码
+ */
+ PARAM_IS_INVALID(1001, "参数无效"),
+ PARAM_IS_BLANK(1002, "参数为空"),
+ PARAM_TYPE_BIND_ERROR(1003, "参数类型错误"),
+ PARAM_NOT_COMPLETE(1004, "参数缺失"),
+ PARAM_ERROR(1004, "参数错误"),
+ /**
+ * 用户错误代码
+ */
+ USER_NOT_LOGIN_IN(2001, "手机号未登录"),
+ USER_LOGIN_ERROR(2002, "账号或密码输入有误"),
+ USER_ACCOUNT_FORBIDDEN(2003, "手机号被冻结"),
+ USER_NOT_EXIST(2004, "账号或手机号未注册"),
+ USER_HAS_EXISTED(2005, "账号或手机号已注册"),
+ USER_NOT_BIND(2006, "用户需要绑定手机号"),
+ USER_MORE(2007, "系统存在多个正常状态的账号或手机号,请联系管理员"),
+
+ /**
+ * 权限验证错误
+ */
+ USER_AUTHORIZATION_ERROR(3001, "用户权限错误"),
+ USER_NO_LOGIN(3002, "尚未登录,请登录"),
+ USER_PREV_NO_ATTENDED(3003, "上轮您未投票,不可以继续投票了"),
+ USER_NO_ATTENDED(3004, "本轮您未投票"),
+ /**
+ * 其他系统错误
+ */
+ SYSTEM_ERROR(4001, "系统错误"),
+ VERIFY_CODE_ERROR(4002, "验证码错误!");
+
+ int code;
+ String message;
+
+ ResponseCode(int code, String message) {
+ this.message = message;
+ this.code = code;
+ }
+
+ public String message() {
+ return this.message;
+ }
+
+ public int code() {
+ return this.code;
+ }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..3bee7f2
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,37 @@
+spring:
+ application:
+ name: modbus-rtcp
+ datasource:
+ driver-class-name: com.mysql.cj.jdbc.Driver
+ type: com.alibaba.druid.pool.DruidDataSource
+ url: jdbc:mysql://127.0.0.1:3306/gas_well_watch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
+ username: cq
+ password: cq
+ rabbitmq:
+ host: ${RABBIT_MQ_HOST:localhost}
+ port: ${RABBIT_MQ_PORT:5672}
+ username: ${RABBIT_MQ_USERNAME:ModbusAdmin}
+ password: ${RABBIT_MQ_PASSWORD:ModbusPassword}
+ virtual-host: /
+ publisher-confirm-type: correlated
+ #接收确认,默认关闭,建议false或不配置,在代码中根据实际情况进行ack销毁
+ publisher-returns: true
+ template:
+ mandatory: true
+# kafka:
+# bootstrap-servers: 172.17.0.1:9092
+# template:
+# default-topic: collector-modbus-rtcp-group
+
+snowflake:
+ worker: 0
+ dataCenter: 1
+
+server:
+ port: 9999
+ netty:
+ identifiers:
+ 4B454E454E4731343030303030333538: KENENG1400000358
+ 3030303030: 00000
+
+
diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml
new file mode 100644
index 0000000..b948c95
--- /dev/null
+++ b/src/main/resources/logback-spring.xml
@@ -0,0 +1,63 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${LOG_PATTERN}
+ UTF-8
+
+
+
+
+ ${LOG_HOME}/info/info.log
+ true
+
+ ${LOG_HOME}/info/info.%d{yyyy-MM-dd}.%i.log
+ 30
+ 2GB
+ 50MB
+
+
+ ${LOG_PATTERN}
+ UTF-8
+
+
+
+
+ ${LOG_HOME}/error/error.log
+ true
+ true
+
+ ERROR
+
+
+ ${LOG_HOME}/error/error.%d{yyyy-MM-dd}.%i.log
+ 30
+ 2GB
+ 50MB
+
+
+ ${LOG_PATTERN}
+ UTF-8
+
+
+
+
+
+
+
+
+
+
diff --git a/src/test/java/com/iot/modbus_rtcp/ModbusRtcpApplicationTests.java b/src/test/java/com/iot/modbus_rtcp/ModbusRtcpApplicationTests.java
new file mode 100644
index 0000000..b8932e7
--- /dev/null
+++ b/src/test/java/com/iot/modbus_rtcp/ModbusRtcpApplicationTests.java
@@ -0,0 +1,13 @@
+package com.iot.modbus_rtcp;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class ModbusRtcpApplicationTests {
+
+ @Test
+ void contextLoads() {
+ }
+
+}
diff --git a/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java b/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java
new file mode 100644
index 0000000..6e7f5df
--- /dev/null
+++ b/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java
@@ -0,0 +1,113 @@
+package com.iot.modbus_rtcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.time.LocalDateTime;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * @author 王仕龙
+ * 2024/11/20 16:01
+ */
+public class NonBlockingSocketTest {
+
+ private static final String HOST = "127.0.0.1";
+ private static final Integer PORT = 1200;
+
+ public static void main(String[] args) throws IOException {
+ SocketChannel socketChannel = SocketChannel.open();
+ // 设置为非阻塞模式
+ socketChannel.configureBlocking(false);
+ socketChannel.connect(new InetSocketAddress(HOST, PORT));
+
+ while (!socketChannel.finishConnect()) {
+ // 等待连接建立
+ System.out.println("正在建立连接...");
+ }
+
+ long lastSentHeartBeatTime = System.currentTimeMillis();
+ // 连接已建立,发送和接收数据
+// ByteBuffer heartBeatBuffer = ByteBuffer.wrap("Hello, Server!".getBytes());
+ ByteBuffer heartBeatBuffer = ByteBuffer.wrap("KENENG1400000358".getBytes());
+// ByteBuffer heartBeatBuffer = ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes());
+ socketChannel.write(heartBeatBuffer);
+
+ long nowTime = 0L;
+ ByteBuffer readBuffer = ByteBuffer.allocate(1024);
+ while (socketChannel.isConnected()) {
+ readBuffer.clear();
+ socketChannel.read(readBuffer);
+ readBuffer.flip();
+ // 是否有可用数据
+ if (!readBuffer.hasRemaining()) {
+ nowTime = System.currentTimeMillis();
+ if (nowTime - lastSentHeartBeatTime > 5000) {
+ lastSentHeartBeatTime = nowTime;
+ socketChannel.write(heartBeatBuffer);
+ }
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
+ continue;
+ }
+ String line = byteBufferToHexString(readBuffer).trim();
+ lastSentHeartBeatTime = System.currentTimeMillis();
+
+ System.out.println(LocalDateTime.now() + "<==:收到服务器端请求:" + line);
+
+ LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
+
+ // 接收到请求
+ switch (line.toUpperCase()) {
+ case "01040000001531C5" ->
+ // 发送响应字符串
+ socketChannel.write(ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes()));
+ // 接收到开井请求
+ case "01050001FF00DDFA" ->
+ // 发送响应字符串
+ socketChannel.write(ByteBuffer.wrap("01050001FF00DDFA".getBytes()));
+
+ // 接收到关井请求
+ case "0105000100009C0A" ->
+ // 发送响应字符串
+ socketChannel.write(ByteBuffer.wrap("0105000100009C0A".getBytes()));
+
+ // 接收到读取运行模式请求
+ case "010300640001C5D5" ->
+ // 发送响应字符串
+ socketChannel.write(ByteBuffer.wrap("01 03 02 00 03 F8 45".replaceAll(" ", "").getBytes()));
+ // 接收到退出请求
+ case "exit" -> socketChannel.close();
+ }
+ System.out.println(LocalDateTime.now() + "==>:已响应服务器端请求:" + line);
+ }
+
+ }
+
+ public static String bytesToHexString(byte[] src) {
+ String strHex = "";
+ StringBuilder sb = new StringBuilder(50);
+ for (int n = 0; n < src.length; n++) {
+ strHex = Integer.toHexString(src[n] & 0xFF);
+ // 每个字节由两个字符表示,位数不够,高位补0
+ sb.append((strHex.length() == 1) ? "0" + strHex : strHex);
+ }
+ return sb.toString().trim().toUpperCase();
+ }
+
+ public static String byteBufferToHexString(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ StringBuilder hexString = new StringBuilder();
+ for (byte b : bytes) {
+ String hex = Integer.toHexString(0xff & b);
+ if (hex.length() == 1) {
+ hexString.append('0');
+ }
+ hexString.append(hex);
+ }
+ return hexString.toString();
+ }
+
+}
diff --git a/src/test/java/com/iot/modbus_rtcp/SocketTest.java b/src/test/java/com/iot/modbus_rtcp/SocketTest.java
new file mode 100644
index 0000000..5783d38
--- /dev/null
+++ b/src/test/java/com/iot/modbus_rtcp/SocketTest.java
@@ -0,0 +1,76 @@
+package com.iot.modbus_rtcp;
+
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.Objects;
+
+/**
+ * @author 王仕龙
+ * 2024/11/20 15:13
+ */
+
+public class SocketTest {
+
+ private static final String HOST = "127.0.0.1";
+ private static final Integer PORT = 1200;
+
+ @Test
+ @SneakyThrows
+ public void testModbus() {
+ try (Socket socket = new Socket(HOST, PORT, InetAddress.getByName(HOST), 10001);
+ PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
+ socket.setKeepAlive(true);
+// socket.setTcpNoDelay(true);
+ socket.setSoTimeout(1500);
+ String line = null;
+ while (true) {
+ if (socket.isClosed()) {
+ socket.close();
+ break;
+ }
+
+ try {
+ // 接收服务器响应
+ line = reader.readLine();
+ if (responseCommand(line, writer, socket)) break;
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ private static boolean responseCommand(String line, PrintWriter writer, Socket socket) throws IOException {
+ // 接收到请求
+ if (Objects.equals(line, "01040000001531C5")) {
+ // 发送响应字符串
+ writer.println("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", ""));
+ }
+ // 接收到开井请求
+ else if (Objects.equals(line, "01050001FF00DDFA")) {
+ // 发送响应字符串
+ writer.println("01050001FF00DDFA");
+ }
+ // 接收到关井请求
+ else if (Objects.equals(line, "0105000100009C0A")) {
+ // 发送响应字符串
+ writer.println("0105000100009C0A");
+ }
+ // 接收到读取运行模式请求
+ else if (Objects.equals(line, "010300640001C5D5")) {
+ // 发送响应字符串
+ writer.println("01 03 02 00 03 F8 45".replaceAll(" ", ""));
+ } else {
+ socket.close();
+ return true;
+ }
+ return false;
+ }
+}