From 2f7ba359e4723fb3a279222e1ece4a912979ad03 Mon Sep 17 00:00:00 2001 From: wangshilong Date: Sun, 24 Nov 2024 12:07:47 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=87=E9=9B=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitattributes | 2 + pom.xml | 131 ++++++++++++++++ .../modbus_rtcp/ModbusRtcpApplication.java | 37 +++++ .../config/EquipmentIPProperties.java | 25 +++ .../controller/ModbusTCPController.java | 97 ++++++++++++ .../com/iot/modbus_rtcp/dto/CommandDto.java | 4 + .../dto/CommandTypeComparable.java | 41 +++++ .../iot/modbus_rtcp/dto/ModbusCommandDto.java | 32 ++++ .../iot/modbus_rtcp/jobs/AutoCollectJobs.java | 56 +++++++ .../iot/modbus_rtcp/netty/ChannelGroup.java | 40 +++++ .../com/iot/modbus_rtcp/netty/DataCache.java | 25 +++ .../modbus_rtcp/netty/IPGatewayRelation.java | 39 +++++ .../iot/modbus_rtcp/netty/ModbusDecoder.java | 95 ++++++++++++ .../iot/modbus_rtcp/netty/ModbusEncoder.java | 15 ++ .../iot/modbus_rtcp/netty/ModbusSender.java | 56 +++++++ .../iot/modbus_rtcp/netty/NettyServer.java | 82 ++++++++++ .../iot/modbus_rtcp/netty/SyncHandler.java | 83 ++++++++++ .../netty/SyncPriorityChannel.java | 145 ++++++++++++++++++ .../modbus_rtcp/netty/ThreadPoolConsumer.java | 21 +++ .../com/iot/modbus_rtcp/utils/CRCUtil.java | 38 +++++ .../com/iot/modbus_rtcp/utils/HexUtil.java | 29 ++++ .../com/iot/modbus_rtcp/utils/SpringUtil.java | 53 +++++++ .../java/com/iot/modbus_rtcp/vo/Response.java | 82 ++++++++++ .../com/iot/modbus_rtcp/vo/ResponseCode.java | 53 +++++++ src/main/resources/application.yml | 37 +++++ src/main/resources/logback-spring.xml | 63 ++++++++ .../ModbusRtcpApplicationTests.java | 13 ++ .../modbus_rtcp/NonBlockingSocketTest.java | 113 ++++++++++++++ .../java/com/iot/modbus_rtcp/SocketTest.java | 76 +++++++++ 29 files changed, 1583 insertions(+) create mode 100644 .gitattributes create mode 100644 pom.xml create mode 100644 src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java create mode 100644 src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java create mode 100644 src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java create mode 100644 src/main/java/com/iot/modbus_rtcp/dto/CommandDto.java create mode 100644 src/main/java/com/iot/modbus_rtcp/dto/CommandTypeComparable.java create mode 100644 src/main/java/com/iot/modbus_rtcp/dto/ModbusCommandDto.java create mode 100644 src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java create mode 100644 src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java create mode 100644 src/main/java/com/iot/modbus_rtcp/netty/DataCache.java create mode 100644 src/main/java/com/iot/modbus_rtcp/netty/IPGatewayRelation.java create mode 100644 src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java create mode 100644 src/main/java/com/iot/modbus_rtcp/netty/ModbusEncoder.java create mode 100644 src/main/java/com/iot/modbus_rtcp/netty/ModbusSender.java create mode 100644 src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java create mode 100644 src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java create mode 100644 src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java create mode 100644 src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java create mode 100644 src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java create mode 100644 src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java create mode 100644 src/main/java/com/iot/modbus_rtcp/utils/SpringUtil.java create mode 100644 src/main/java/com/iot/modbus_rtcp/vo/Response.java create mode 100644 src/main/java/com/iot/modbus_rtcp/vo/ResponseCode.java create mode 100644 src/main/resources/application.yml create mode 100644 src/main/resources/logback-spring.xml create mode 100644 src/test/java/com/iot/modbus_rtcp/ModbusRtcpApplicationTests.java create mode 100644 src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java create mode 100644 src/test/java/com/iot/modbus_rtcp/SocketTest.java diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..3b41682 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +/mvnw text eol=lf +*.cmd text eol=crlf diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..bfcd6f7 --- /dev/null +++ b/pom.xml @@ -0,0 +1,131 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 3.3.5 + + + com.iot.modbus_rtcp + modbus-rtcp + 0.0.1-SNAPSHOT + ModBus-rtcp + modbus-rtcp + + + 21 + ${java.version} + ${java.version} + UTF-8 + 2.0.51 + 1.18.34 + 5.8.29 + 6.0.0 + + + + + + + jakarta.servlet + jakarta.servlet-api + ${jakarta.version} + provided + + + org.projectlombok + lombok + ${lombok.version} + compile + + + org.springframework.boot + spring-boot-dependencies + 3.3.5 + pom + import + + + + cn.hutool + hutool-all + ${hutool.version} + + + + + + + org.projectlombok + lombok + + + cn.hutool + hutool-all + + + org.apache.commons + commons-lang3 + + + org.springframework.boot + spring-boot-starter-jdbc + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-amqp + + + + + com.alibaba + druid-spring-boot-3-starter + 1.2.20 + + + mysql + mysql-connector-java + 8.0.28 + + + io.netty + netty-all + 4.1.43.Final + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-devtools + true + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 3.3.5 + + + + org.projectlombok + lombok + + + + + + + + diff --git a/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java b/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java new file mode 100644 index 0000000..6de04d7 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/ModbusRtcpApplication.java @@ -0,0 +1,37 @@ +package com.iot.modbus_rtcp; + +import cn.hutool.json.JSONUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.BeansException; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.scheduling.annotation.EnableScheduling; + +@EnableScheduling +@SpringBootApplication(scanBasePackages = {"com.iot.modbus_rtcp"}) +public class ModbusRtcpApplication implements ApplicationContextAware { + + private static final Logger logger = LoggerFactory.getLogger("com.iot.modbus_rtcp.rabbit"); + + public static void main(String[] args) { + SpringApplication.run(ModbusRtcpApplication.class, args); + } + + // 为RabbitTemplate设置路由到队列失败时调用的方法 + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + //获取RabbitTemplate + RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); + //设置ReturnCallback + rabbitTemplate.setReturnsCallback((returnedMessage) -> { + logger.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", returnedMessage.getReplyCode(), + returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), + JSONUtil.toJsonStr(returnedMessage.getMessage())); + }); + } + +} diff --git a/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java b/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java new file mode 100644 index 0000000..c9c8dc8 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/config/EquipmentIPProperties.java @@ -0,0 +1,25 @@ +package com.iot.modbus_rtcp.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import java.util.HashSet; +import java.util.Map; + +@Configuration +@Data +@ConfigurationProperties(prefix = "server.netty") +public class EquipmentIPProperties { + + private Map identifiers; + + public HashSet keys() { + return new HashSet<>(this.identifiers.keySet()); + } + + public String get(String key) { + return this.identifiers.get(key); + } + +} diff --git a/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java b/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java new file mode 100644 index 0000000..99b3ab0 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/controller/ModbusTCPController.java @@ -0,0 +1,97 @@ +package com.iot.modbus_rtcp.controller; + +import com.iot.modbus_rtcp.dto.CommandTypeComparable; +import com.iot.modbus_rtcp.dto.ModbusCommandDto; +import com.iot.modbus_rtcp.netty.NettyServer; +import com.iot.modbus_rtcp.utils.CRCUtil; +import com.iot.modbus_rtcp.utils.HexUtil; +import com.iot.modbus_rtcp.vo.Response; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * Modbus-TCP协议API + * 通信方式采用TCP的方式 + */ +@Slf4j +@RestController +@RequestMapping("modbus-tcp") +public class ModbusTCPController implements ApplicationRunner { + private NettyServer nettyServer; + + @PreDestroy + private void destroy() { + this.nettyServer.stop(); + } + + /** + * 采集命令API + * 采集命令API使用异步访问的方式,设备响应数据后会将其推送到Kafka + * + * @param modbusCommandBoList + * @return + */ + @PostMapping("/collect") + public Response collect(@RequestBody List modbusCommandBoList) { + log.debug("采集请求:{}", modbusCommandBoList); + + try { + modbusCommandBoList.stream().forEach(modbusCommandBo -> { + modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.HexStringToBytes(modbusCommandBo.getCommand()))); + + modbusCommandBo.setType(CommandTypeComparable.CommandType.COLLECTION); + modbusCommandBo.setTimestamp(System.nanoTime()); + }); + + this.nettyServer.sender().send(modbusCommandBoList); + } catch (Exception e) { + log.error("", e); + return Response.failed(e.getMessage()); + } + + return Response.succeed(); + } + + /** + * 控制命令API + * 控制命令API使用异步访问的方式,设备响应数据自动丢弃 + * + * @param modbusCommandBoList + * @return 设备响应返回 + */ + @PostMapping("/control") + public Response control(@RequestBody List modbusCommandBoList) { + log.debug("控制请求:{}", modbusCommandBoList); + + try { + modbusCommandBoList.stream().forEach(modbusCommandBo -> { + modbusCommandBo.setCommand(modbusCommandBo.getCommand() + CRCUtil.getCRC(HexUtil.HexStringToBytes(modbusCommandBo.getCommand()))); + + modbusCommandBo.setType(CommandTypeComparable.CommandType.CONTROL); + modbusCommandBo.setTimestamp(System.nanoTime()); + }); + + this.nettyServer.sender().send(modbusCommandBoList); + } catch (Exception e) { + log.error("", e); + return Response.failed(e.getMessage()); + } + + return Response.succeed(); + } + + @Override + public void run(ApplicationArguments args) throws Exception { + this.nettyServer = new NettyServer(502, 10); + this.nettyServer.start(); + } + +} diff --git a/src/main/java/com/iot/modbus_rtcp/dto/CommandDto.java b/src/main/java/com/iot/modbus_rtcp/dto/CommandDto.java new file mode 100644 index 0000000..a47256f --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/dto/CommandDto.java @@ -0,0 +1,4 @@ +package com.iot.modbus_rtcp.dto; + +public interface CommandDto { +} diff --git a/src/main/java/com/iot/modbus_rtcp/dto/CommandTypeComparable.java b/src/main/java/com/iot/modbus_rtcp/dto/CommandTypeComparable.java new file mode 100644 index 0000000..9b3c387 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/dto/CommandTypeComparable.java @@ -0,0 +1,41 @@ +package com.iot.modbus_rtcp.dto; + +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@NoArgsConstructor +public class CommandTypeComparable implements Comparable { + private CommandType type; + + private long timestamp; + + @Override + public int compareTo(Object o) { + CommandTypeComparable commandTypeComparable = (CommandTypeComparable) o; + + if (this.getType() == commandTypeComparable.getType()) { + return this.timestamp > commandTypeComparable.timestamp ? 1 : -1; + } else if (this.getType() == CommandType.CONTROL) { + return -1; + } else if (this.getType() == CommandType.COLLECTION) { + return 1; + } else { + throw new RuntimeException("无法排序"); + } + } + + @Getter + public enum CommandType { + CONTROL("控制"), COLLECTION("采集"); + + private final String name; + + CommandType(String name) { + this.name = name; + } + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/dto/ModbusCommandDto.java b/src/main/java/com/iot/modbus_rtcp/dto/ModbusCommandDto.java new file mode 100644 index 0000000..6dd8142 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/dto/ModbusCommandDto.java @@ -0,0 +1,32 @@ +package com.iot.modbus_rtcp.dto; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + + +@Data +@SuperBuilder +@NoArgsConstructor +public class ModbusCommandDto extends CommandTypeComparable implements CommandDto { + /** + * 12位十六进制的命令 + */ + private String command; + + /** + * 采集长度 + */ + private int length; + + /** + * 自定义标识 + * 采集情况下会推送到MQ的内容格式为:自定义标识+设备反馈报文 + */ + private String key; + + /** + * 网关唯一标识符 + */ + private String identifier; +} diff --git a/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java new file mode 100644 index 0000000..aa8c0b8 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/jobs/AutoCollectJobs.java @@ -0,0 +1,56 @@ +package com.iot.modbus_rtcp.jobs; + +import cn.hutool.core.map.MapUtil; +import com.iot.modbus_rtcp.controller.ModbusTCPController; +import com.iot.modbus_rtcp.dto.CommandTypeComparable; +import com.iot.modbus_rtcp.dto.ModbusCommandDto; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * @author 王仕龙 + * 2024/11/23 18:33 + */ +@Component +@RequiredArgsConstructor +public class AutoCollectJobs { + + private static final String SQL = "SELECT ref_id as deviceId, command, message_length messageLength FROM commands " + + "WHERE type = 'COLLECTION' and ref_type = 'DEVICE' and id < 10099 order by id"; + + private final JdbcTemplate jdbcTemplate; + private final ModbusTCPController controller; + + @Scheduled(cron = "0/30 * * * * ? ") + public void autoCollect() { + int pageIndex = 0; + int pageSize = 1000; + long timestamp = System.currentTimeMillis(); + List> resultList = null; + while (Objects.isNull(resultList) || resultList.size() >= pageSize) { + resultList = this.jdbcTemplate.queryForList(SQL + " LIMIT " + (pageIndex++) + "," + pageSize); + if (ObjectUtils.isNotEmpty(resultList)) { + this.controller.collect(resultList.stream() + .filter(item -> StringUtils.isNotEmpty(MapUtil.getStr(item, "identifier"))) + .map(item -> ModbusCommandDto.builder() + .command(MapUtil.getStr(item, "command")) + .identifier("4B454E454E4731343030303030333538") + .length(MapUtil.getInt(item, "messageLength")) + .type(CommandTypeComparable.CommandType.COLLECTION) + .key("4B454E454E4731343030303030333538/" + MapUtil.getStr(item, "deviceId") + "/" + timestamp) + .build()) + .collect(Collectors.toList())); + } + } + + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java b/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java new file mode 100644 index 0000000..b2ea8a4 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/netty/ChannelGroup.java @@ -0,0 +1,40 @@ +package com.iot.modbus_rtcp.netty; + +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +public class ChannelGroup { + + private final ConcurrentHashMap mChannelMap = new ConcurrentHashMap<>(); + + public void put(String identity, SyncPriorityChannel channel) { + this.mChannelMap.put(identity, channel); + } + + public SyncPriorityChannel get(String identity) { + return this.mChannelMap.get(identity); + } + + public SyncPriorityChannel get(String ip, int port) { + return this.mChannelMap.get(getKey(ip, port)); + } + + public void remove(String ip, int port) { + this.mChannelMap.remove(getKey(ip, port)); + } + + public void remove(String identity) { + this.mChannelMap.remove(identity); + } + + public void see() { + log.info("当前连接有:{}", new ArrayList<>(this.mChannelMap.keySet())); + } + + public static String getKey(String ip, int port) { + return ip + ":" + port; + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/netty/DataCache.java b/src/main/java/com/iot/modbus_rtcp/netty/DataCache.java new file mode 100644 index 0000000..6b697f1 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/netty/DataCache.java @@ -0,0 +1,25 @@ +package com.iot.modbus_rtcp.netty; + +import java.util.concurrent.ConcurrentHashMap; + +public class DataCache { + private static ConcurrentHashMap mDataCacheMap = new ConcurrentHashMap<>(); + + public static void put(String identity, byte[] data) { + synchronized (DataCache.class) { + mDataCacheMap.put(identity, data); + } + } + + public static byte[] get(String identity) { + synchronized (DataCache.class) { + return mDataCacheMap.get(identity); + } + } + + public static byte[] remove(String identity) { + synchronized (DataCache.class) { + return mDataCacheMap.remove(identity); + } + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/netty/IPGatewayRelation.java b/src/main/java/com/iot/modbus_rtcp/netty/IPGatewayRelation.java new file mode 100644 index 0000000..370b173 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/netty/IPGatewayRelation.java @@ -0,0 +1,39 @@ +package com.iot.modbus_rtcp.netty; + +import cn.hutool.core.map.BiMap; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class IPGatewayRelation { + + // key :ip value :网关编码(网关标识) + private static Map mIPGatewayMap = new ConcurrentHashMap<>(); + // key :网关标识 value :ip端口 + private static BiMap addressMap = new BiMap<>(new ConcurrentHashMap<>()); + + public static String get(String ipAddressKey) { + return mIPGatewayMap.get(ipAddressKey); + } + + public static void put(String ipAddressKey, String gatewayCodeMapping) { + mIPGatewayMap.put(ipAddressKey, gatewayCodeMapping); + } + + public static void putIPAddressMap(String gatewayIdentifier, String ipAddressKey) { + addressMap.put(gatewayIdentifier, ipAddressKey); + } + + public static String getIPAddress(String gatewayIdentifier) { + return addressMap.get(gatewayIdentifier); + } + + public static String getGatewayIdentifier(String ipAddressKey) { + return addressMap.getInverse().get(ipAddressKey); + } + + public static String getGatewayIdentifier(String ip, int port) { + return getGatewayIdentifier(ChannelGroup.getKey(ip, port)); + } + +} diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java new file mode 100644 index 0000000..81513a8 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/netty/ModbusDecoder.java @@ -0,0 +1,95 @@ +package com.iot.modbus_rtcp.netty; + +import cn.hutool.core.util.ArrayUtil; +import com.iot.modbus_rtcp.config.EquipmentIPProperties; +import com.iot.modbus_rtcp.dto.ModbusCommandDto; +import com.iot.modbus_rtcp.utils.HexUtil; +import com.iot.modbus_rtcp.utils.SpringUtil; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.core.RabbitAdmin; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; + +@Slf4j +public class ModbusDecoder extends ByteToMessageDecoder { + private final ChannelGroup channelGroup; + private final HashSet identityList; + private final EquipmentIPProperties equipmentIPProperties; + + public ModbusDecoder(ChannelGroup channelGroup) { + this.channelGroup = channelGroup; + this.equipmentIPProperties = SpringUtil.getBean(EquipmentIPProperties.class); + this.identityList = this.equipmentIPProperties.keys(); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) { + String ip = ((InetSocketAddress) ctx.channel().remoteAddress()).getHostString(); + int port = ((InetSocketAddress) ctx.channel().remoteAddress()).getPort(); + + byte[] b = new byte[buffer.readableBytes()]; + buffer.readBytes(b); + + String hex = HexUtil.bytesToHexString(b); + log.info("解析到:{}", hex); + + if (this.identityList.contains(hex.toUpperCase())) { // 心跳 + String msgUpperCase = hex.toUpperCase(); + for (String identity : this.identityList) { + if (Objects.equals(identity, msgUpperCase)) { + log.info("注册{}({}:{}):{}", this.equipmentIPProperties.get(msgUpperCase), ip, port, msgUpperCase); + String currentAddress = ChannelGroup.getKey(ip, port); + SyncPriorityChannel channel = this.channelGroup.get(currentAddress); + channel.setIdentifier(msgUpperCase); + this.channelGroup.put(msgUpperCase, channel); + SpringUtil.getBean(RabbitAdmin.class).declareQueue(QueueBuilder.durable(channel.getCollectQueue()).build()); + + IPGatewayRelation.put(currentAddress, String.format("%s(%s)", this.equipmentIPProperties.get(msgUpperCase), msgUpperCase)); + String oldAddress = IPGatewayRelation.getIPAddress(msgUpperCase); + if (StringUtils.isNotBlank(currentAddress) && + StringUtils.isNotBlank(oldAddress) && + !currentAddress.equals(oldAddress)) { + //说明设备重连后IP+端口发生了改变 + String[] split = oldAddress.split(":"); + //删除老连接 + this.channelGroup.remove(split[0], Integer.parseInt(split[1])); + log.info("删除channelGroup通道:{}连接:{},新增该通道连接:{}", this.equipmentIPProperties.get(msgUpperCase), oldAddress, currentAddress); + } + IPGatewayRelation.putIPAddressMap(msgUpperCase, currentAddress); + return; + } + } + } + + SyncPriorityChannel channel = this.channelGroup.get(ip, port); + ModbusCommandDto message = channel.getCurrentMessage(); + + if (message == null) { + log.warn("非法传输:{}", HexUtil.bytesToHexString(b)); + return; + } + + String identity = ChannelGroup.getKey(ip, port); + byte[] bytesCache = DataCache.get(identity); + if (bytesCache == null) { + bytesCache = b; + } else { + bytesCache = ArrayUtil.addAll(bytesCache, b); + } + + if (bytesCache.length < (message.getLength() / 2)) { + DataCache.put(identity, bytesCache); + } else if (bytesCache.length == (message.getLength() / 2)) { + out.add(HexUtil.bytesToHexString(bytesCache)); + } + } + +} diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ModbusEncoder.java b/src/main/java/com/iot/modbus_rtcp/netty/ModbusEncoder.java new file mode 100644 index 0000000..5da897e --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/netty/ModbusEncoder.java @@ -0,0 +1,15 @@ +package com.iot.modbus_rtcp.netty; + +import com.iot.modbus_rtcp.utils.HexUtil; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +public class ModbusEncoder extends MessageToByteEncoder { + + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) { + byteBuf.writeBytes(HexUtil.HexStringToBytes(s)); + } +} + diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ModbusSender.java b/src/main/java/com/iot/modbus_rtcp/netty/ModbusSender.java new file mode 100644 index 0000000..d4cf6c4 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/netty/ModbusSender.java @@ -0,0 +1,56 @@ +package com.iot.modbus_rtcp.netty; + +import com.iot.modbus_rtcp.config.EquipmentIPProperties; +import com.iot.modbus_rtcp.dto.ModbusCommandDto; +import com.iot.modbus_rtcp.utils.SpringUtil; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 报文分发器 + * 通信报文{@link ModbusCommandDto} 中,提取命令的内容获取PLC地址, + * 按地址将报文分发到对应的 {@link SyncPriorityChannel} + */ +@Slf4j +public class ModbusSender { + private final ChannelGroup channelGroup; + private final EquipmentIPProperties equipmentIPProperties; + + public ModbusSender(ChannelGroup channelGroup) { + this.channelGroup = channelGroup; + this.equipmentIPProperties = SpringUtil.getBean(EquipmentIPProperties.class); + } + + public void send(List modbusCommandBoList) { + Map> splitCommandMap = this.split(modbusCommandBoList); + splitCommandMap.forEach((identity, v) -> { + SyncPriorityChannel channel = this.channelGroup.get(identity); + if (channel == null) { + throw new RuntimeException("设备" + identity + "的连接未配置或该设备已断开与服务器的连接,请稍后再试或联系管理员"); + } + + log.info("{}通道({})提交{}条命令:{}", this.equipmentIPProperties.get(identity), identity, v.size(), v); + channel.addMessages(v); + }); + } + + private Map> split(List modbusCommandBoList) { + Map> map = new HashMap<>(); + modbusCommandBoList.stream().forEach(modbusCommandBo -> { + if (modbusCommandBo.getIdentifier() == null) { + return; + } + + List list = map.getOrDefault(modbusCommandBo.getIdentifier(), new ArrayList<>()); + list.add(modbusCommandBo); + + map.put(modbusCommandBo.getIdentifier(), list); + }); + + return map; + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java b/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java new file mode 100644 index 0000000..6d8ab78 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/netty/NettyServer.java @@ -0,0 +1,82 @@ +package com.iot.modbus_rtcp.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class NettyServer extends ChannelInitializer { + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private ChannelFuture future; + private int port, nThread; + + private ChannelGroup group; + private ModbusSender sender; + private ChannelHandler handler; + + /** + * 创建指定服务端口,指定线程数的服务端 + * + * @param port 服务端口 + * @param nThread 执行线程池线程数 + */ + public NettyServer(int port, int nThread) { + this.port = port; + this.nThread = nThread; + this.group = new ChannelGroup(); + this.sender = new ModbusSender(this.group); + this.handler = new SyncHandler(this.group); + } + + /** + * 启动服务 + */ + public void start() { + this.bossGroup = new NioEventLoopGroup(1); + this.workerGroup = new NioEventLoopGroup(this.nThread); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(this.bossGroup, this.workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(this); + this.future = bootstrap.bind(this.port); + } + + /** + * 停止服务 + */ + public void stop() { + this.future.channel().closeFuture(); + this.workerGroup.shutdownGracefully(); + this.bossGroup.shutdownGracefully(); + } + + public ModbusSender sender() { + return this.sender; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + + log.info("创建channel:{}", ch); + + String ip = ch.remoteAddress().getHostString(); + //过滤掉docker 网关请求 + if ("172.17.0.1".equals(ip)) return; + int port = ch.remoteAddress().getPort(); + this.group.put(ip + ":" + port, new SyncPriorityChannel(ch)); + + pipeline.addLast(new ModbusEncoder()); + pipeline.addLast("decoder", new ModbusDecoder(this.group)); +// pipeline.addLast(new ReadTimeoutHandler(10000, TimeUnit.MILLISECONDS)); + pipeline.addLast(this.handler); + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java b/src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java new file mode 100644 index 0000000..1328579 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/netty/SyncHandler.java @@ -0,0 +1,83 @@ +package com.iot.modbus_rtcp.netty; + +import com.iot.modbus_rtcp.dto.CommandTypeComparable; +import com.iot.modbus_rtcp.dto.ModbusCommandDto; +import com.iot.modbus_rtcp.utils.SpringUtil; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import java.net.InetSocketAddress; +import java.util.Objects; + +/** + * + */ +@Slf4j +@ChannelHandler.Sharable +public class SyncHandler extends ChannelInboundHandlerAdapter { + private final ChannelGroup channelGroup; + private final RabbitTemplate rabbitTemplate; + + public SyncHandler(ChannelGroup channelGroup) { + this.channelGroup = channelGroup; + this.rabbitTemplate = SpringUtil.getBean(RabbitTemplate.class); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); + int port = remoteAddress.getPort(); + String ip = remoteAddress.getHostString(); + + SyncPriorityChannel channel = this.channelGroup.get(ip, port); + ModbusCommandDto message = channel.getCurrentMessage(); + try { + channel.getChannelPromise().setSuccess(); + } catch (IllegalStateException e) { + log.info("数据返回慢了,已经超时!"); + return; + } catch (Exception e) { + log.info("丢失数据:{}", msg); + return; + } + + if (Objects.isNull(message)) { + log.error("未找到发送源: {}", msg); + return; + } + + // 控制返回丢弃 + if (CommandTypeComparable.CommandType.CONTROL.equals(message.getType())) { + return; + } + String json = message.getKey() + "/" + System.currentTimeMillis() + "/" + msg; + log.info("推数据到MQ({}): {}", channel.getCollectQueue(), json); + try { + this.rabbitTemplate.convertAndSend(channel.getCollectQueue(), json); + } catch (Exception e) { + log.error("推数据到MQ失败({})", channel.getCollectQueue(), e); + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + log.info("用户事件: {}-> {}", evt.getClass().getName(), evt); + super.userEventTriggered(ctx, evt); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.info("异常: ", cause); + InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); + String ipKey = ChannelGroup.getKey(remoteAddress.getHostString(), remoteAddress.getPort()); + this.channelGroup.remove(ipKey); + + String gatewayIdentifier = IPGatewayRelation.getGatewayIdentifier(ipKey); + this.channelGroup.remove(gatewayIdentifier); + + ctx.close(); + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java new file mode 100644 index 0000000..40e4877 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/netty/SyncPriorityChannel.java @@ -0,0 +1,145 @@ +package com.iot.modbus_rtcp.netty; + +import com.iot.modbus_rtcp.dto.CommandTypeComparable; +import com.iot.modbus_rtcp.dto.ModbusCommandDto; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.SocketChannel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * 同步控制命令优先Channel + * 该Channel使用组合方式结合了{@link SocketChannel} 具备Channel的功能 + * 发送之后必须得到客户端响应或者超时后,执行下一条命令发送即同步发送机制,同时保证控制命令优先执行。 + *

+ * 调用添加命令方法作为一次任务提交给线程池执行,如果在执行期间还有调用添加命令方法便会在同一任务中追加执行的命令。 + */ +@Slf4j +public class SyncPriorityChannel implements Runnable { + private static final int allowTimeout = 1; + private static final boolean isControl = true; + + private final SocketChannel channel; + + private int timeoutCount; + @Getter + private volatile String identifier; + @Getter + private volatile String collectQueue; + private volatile boolean running = false; + + private final Object object = new Object(); + private final PriorityBlockingQueue messageQueue = new PriorityBlockingQueue<>(); + private final AtomicReference channelPromiseReference = new AtomicReference<>(); + private final AtomicReference currentMessageReference = new AtomicReference<>(); + + public SyncPriorityChannel(SocketChannel channel) { + this.channel = channel; + } + + public void addMessages(List messages) { + this.messageQueue.addAll(messages); + + synchronized (this.object) { + if (this.running) { + return; + } else { + this.running = true; + } + } + + ThreadPoolConsumer.submit(this); + } + + @Override + public void run() { + this.sendNext(); + } + + public synchronized void sendNext() { + if (this.timeoutCount == 0) { + this.currentMessageReference.setRelease(this.messageQueue.poll()); + } + + // 消费完毕终止递归发送 + if (Objects.isNull(this.getCurrentMessage())) { + this.running = false; + return; + } + + InetSocketAddress remoteAddress = this.channel.remoteAddress(); + int port = remoteAddress.getPort(); + String ip = remoteAddress.getHostString(); + + + this.channelPromiseReference.setRelease(this.channel.newPromise()); + if (isControl || Objects.equals(CommandTypeComparable.CommandType.COLLECTION, this.getCurrentMessage().getType())) { + log.info("发送命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送", + this.getCurrentMessage().getCommand(), + this.getCurrentMessage().getLength(), + IPGatewayRelation.get(ChannelGroup.getKey(ip, port)), + ChannelGroup.getKey(ip, port), + this.messageQueue.size()); + + this.channel.writeAndFlush(this.getCurrentMessage().getCommand()); + } + + // 清空缓存数据 + DataCache.remove(ChannelGroup.getKey(ip, port)); + + boolean timeout; + try { + timeout = !this.getChannelPromise().await(3000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.error("channelPromise.await发生异常,", e); + return; + } + + if (timeout && this.timeoutCount == (allowTimeout - 1)) { + log.warn("#超时命令:{},长度:{},通道:{},IP:{},还有{}条命令待发送", + this.getCurrentMessage().getCommand(), + this.getCurrentMessage().getLength(), + IPGatewayRelation.get(ChannelGroup.getKey(ip, port)), + ChannelGroup.getKey(ip, port), + this.messageQueue.size()); + } + + if (timeout && this.timeoutCount < (allowTimeout - 1)) { + this.timeoutCount++; + } else { + this.timeoutCount = 0; + } + + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + this.sendNext(); + } + + public ChannelPromise getChannelPromise() { + return this.channelPromiseReference.getAcquire(); + } + + public ModbusCommandDto getCurrentMessage() { + return this.currentMessageReference.getAcquire(); + } + + public void setIdentifier(String identifier) { + this.identifier = identifier; + this.setCollectQueue(); + } + + public void setCollectQueue() { + this.collectQueue = "/modbus/device/" + this.identifier + "/collect"; + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java b/src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java new file mode 100644 index 0000000..71f922e --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/netty/ThreadPoolConsumer.java @@ -0,0 +1,21 @@ +package com.iot.modbus_rtcp.netty; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Slf4j +public class ThreadPoolConsumer { + private static final ThreadPoolTaskExecutor mThreadPoolExecutor; + + static { + mThreadPoolExecutor = new ThreadPoolTaskExecutor(); + mThreadPoolExecutor.setCorePoolSize(4); + mThreadPoolExecutor.setMaxPoolSize(8); + mThreadPoolExecutor.setKeepAliveSeconds(60); + mThreadPoolExecutor.initialize(); + } + + public static void submit(Runnable task) { + mThreadPoolExecutor.submit(task); + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java b/src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java new file mode 100644 index 0000000..edae9f5 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/utils/CRCUtil.java @@ -0,0 +1,38 @@ +package com.iot.modbus_rtcp.utils; + +public class CRCUtil { + public static String getCRC(byte[] bytes, boolean cvs) { + int CRC = 0x0000ffff; + int POLYNOMIAL = 0x0000a001; + + int i, j; + for (i = 0; i < bytes.length; i++) { + CRC ^= ((int) bytes[i] & 0x000000ff); + for (j = 0; j < 8; j++) { + if ((CRC & 0x00000001) != 0) { + CRC >>= 1; + CRC ^= POLYNOMIAL; + } else { + CRC >>= 1; + } + } + } + + if (cvs) { + CRC = ((CRC & 0x0000FF00) >> 8) | ((CRC & 0x000000FF) << 8); + } + + return String.format("%04x", CRC).toUpperCase(); + } + + public static String getCRC(byte[] bytes) { + return getCRC(bytes, true); + } + + public static void main(String[] args) { + String[] array = new String[]{"0A0300000019", "0A0303930023", "0A0301A4002D", "0A0301D6002D", "0A0300320064", "0A0300960064", "0A0300FA0064", "0A03015E0064", "0A0200000050"}; + for (String str : array) { + System.out.println(str + CRCUtil.getCRC(HexUtil.HexStringToBytes(str))); + } + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java b/src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java new file mode 100644 index 0000000..3024be5 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/utils/HexUtil.java @@ -0,0 +1,29 @@ +package com.iot.modbus_rtcp.utils; + +public class HexUtil { + + public static byte[] HexStringToBytes(String src) { + int l = src.length() / 2; + byte[] ret = new byte[l]; + for (int i = 0; i < l; i++) { + ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue(); + } + return ret; + } + + public static String bytesToHexString(byte[] src) { + String strHex = ""; + StringBuilder sb = new StringBuilder(""); + for (int n = 0; n < src.length; n++) { + strHex = Integer.toHexString(src[n] & 0xFF); + // 每个字节由两个字符表示,位数不够,高位补0 + sb.append((strHex.length() == 1) ? "0" + strHex : strHex); + } + return sb.toString().trim().toUpperCase(); + } + + public static void main(String[] args) { + System.out.println(HexStringToBytes("0D")); + System.out.println(new byte[]{(byte) Integer.parseInt("0D", 16)}); + } +} diff --git a/src/main/java/com/iot/modbus_rtcp/utils/SpringUtil.java b/src/main/java/com/iot/modbus_rtcp/utils/SpringUtil.java new file mode 100644 index 0000000..ae66e90 --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/utils/SpringUtil.java @@ -0,0 +1,53 @@ +package com.iot.modbus_rtcp.utils; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class SpringUtil implements ApplicationContextAware { + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + if (SpringUtil.applicationContext == null) { + SpringUtil.applicationContext = applicationContext; + } + + log.info("ApplicationContext配置成功,applicationContext=" + SpringUtil.applicationContext); + } + + public static ApplicationContext getApplicationContext() { + return applicationContext; + } + + public static Object getBean(String name) { + return getApplicationContext().getBean(name); + } + + public static T getBean(Class clazz) { + return getApplicationContext().getBean(clazz); + } + + public static T getBean(String name, Class clazz) { + return getApplicationContext().getBean(name, clazz); + } + + public static void registerBean(String beanName, BeanDefinitionBuilder beanDefinitionBuilder) { + if (StringUtils.isBlank(beanName) || beanDefinitionBuilder == null) { + return; + } + ConfigurableApplicationContext context = (ConfigurableApplicationContext) getApplicationContext(); + DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) context.getBeanFactory(); + beanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition()); + } + + +} diff --git a/src/main/java/com/iot/modbus_rtcp/vo/Response.java b/src/main/java/com/iot/modbus_rtcp/vo/Response.java new file mode 100644 index 0000000..48fb1de --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/vo/Response.java @@ -0,0 +1,82 @@ +package com.iot.modbus_rtcp.vo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serial; +import java.io.Serializable; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Response implements Serializable { + @Serial + private static final long serialVersionUID = 2289553801394006859L; + private int code = ResponseCode.SUCCESS.code(); + private String message = ResponseCode.SUCCESS.message(); + private T data; + + public static Response succeed() { + return Response.builder().code(ResponseCode.SUCCESS.code()).message(ResponseCode.SUCCESS.message()).build(); + } + + public static Response succeed(String message) { + return Response.builder().code(ResponseCode.SUCCESS.code()).message(message).build(); + } + + public static Response succeed(T t) { + return Response.builder().code(ResponseCode.SUCCESS.code()).message(ResponseCode.SUCCESS.message()).data(t) + .build(); + } + + public static Response succeed(String message, T t) { + return Response.builder().code(ResponseCode.SUCCESS.code()).message(message).data(t) + .build(); + } + + public static Response succeed(ResponseCode responseCode) { + return Response.builder().code(responseCode.code()).message(responseCode.message()).build(); + } + + public static Response succeed(ResponseCode responseCode, T t) { + return Response.builder().code(responseCode.code()).message(responseCode.message()).data(t) + .build(); + } + + public static Response failed() { + return Response.builder().code(ResponseCode.FAILURE.code()).message(ResponseCode.FAILURE.message()).build(); + } + + public static Response failed(String message) { + return Response.builder().code(ResponseCode.FAILURE.code()).message(message).build(); + } + + public static Response failed(int code, String message) { + return Response.builder().code(code).message(message).build(); + } + + public static Response failed(ResponseCode responseCode) { + return Response.builder().code(responseCode.code()).message(responseCode.message()).build(); + } + + public static Response failed(ResponseCode responseCode, T t) { + return Response.builder().code(responseCode.code()).message(responseCode.message()).data(t).build(); + } + + public static Response failed(T t) { + return Response.builder().code(ResponseCode.FAILURE.code()).message(ResponseCode.FAILURE.message()).data(t) + .build(); + } + + public static Response define(int code, T t) { + return Response.builder().code(code).message(ResponseCode.SUCCESS.message()).data(t).build(); + } + + public static Response define(int code, String message, T t) { + return Response.builder().code(code).message(message).data(t).build(); + } + +} diff --git a/src/main/java/com/iot/modbus_rtcp/vo/ResponseCode.java b/src/main/java/com/iot/modbus_rtcp/vo/ResponseCode.java new file mode 100644 index 0000000..f46f42b --- /dev/null +++ b/src/main/java/com/iot/modbus_rtcp/vo/ResponseCode.java @@ -0,0 +1,53 @@ +package com.iot.modbus_rtcp.vo; + +public enum ResponseCode { + SUCCESS(200, "操作成功"), + FAILURE(400, "操作失败"), + /** + * 参数错误代码 + */ + PARAM_IS_INVALID(1001, "参数无效"), + PARAM_IS_BLANK(1002, "参数为空"), + PARAM_TYPE_BIND_ERROR(1003, "参数类型错误"), + PARAM_NOT_COMPLETE(1004, "参数缺失"), + PARAM_ERROR(1004, "参数错误"), + /** + * 用户错误代码 + */ + USER_NOT_LOGIN_IN(2001, "手机号未登录"), + USER_LOGIN_ERROR(2002, "账号或密码输入有误"), + USER_ACCOUNT_FORBIDDEN(2003, "手机号被冻结"), + USER_NOT_EXIST(2004, "账号或手机号未注册"), + USER_HAS_EXISTED(2005, "账号或手机号已注册"), + USER_NOT_BIND(2006, "用户需要绑定手机号"), + USER_MORE(2007, "系统存在多个正常状态的账号或手机号,请联系管理员"), + + /** + * 权限验证错误 + */ + USER_AUTHORIZATION_ERROR(3001, "用户权限错误"), + USER_NO_LOGIN(3002, "尚未登录,请登录"), + USER_PREV_NO_ATTENDED(3003, "上轮您未投票,不可以继续投票了"), + USER_NO_ATTENDED(3004, "本轮您未投票"), + /** + * 其他系统错误 + */ + SYSTEM_ERROR(4001, "系统错误"), + VERIFY_CODE_ERROR(4002, "验证码错误!"); + + int code; + String message; + + ResponseCode(int code, String message) { + this.message = message; + this.code = code; + } + + public String message() { + return this.message; + } + + public int code() { + return this.code; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..3bee7f2 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,37 @@ +spring: + application: + name: modbus-rtcp + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + type: com.alibaba.druid.pool.DruidDataSource + url: jdbc:mysql://127.0.0.1:3306/gas_well_watch?characterEncoding=UTF-8&useUnicode=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai + username: cq + password: cq + rabbitmq: + host: ${RABBIT_MQ_HOST:localhost} + port: ${RABBIT_MQ_PORT:5672} + username: ${RABBIT_MQ_USERNAME:ModbusAdmin} + password: ${RABBIT_MQ_PASSWORD:ModbusPassword} + virtual-host: / + publisher-confirm-type: correlated + #接收确认,默认关闭,建议false或不配置,在代码中根据实际情况进行ack销毁 + publisher-returns: true + template: + mandatory: true +# kafka: +# bootstrap-servers: 172.17.0.1:9092 +# template: +# default-topic: collector-modbus-rtcp-group + +snowflake: + worker: 0 + dataCenter: 1 + +server: + port: 9999 + netty: + identifiers: + 4B454E454E4731343030303030333538: KENENG1400000358 + 3030303030: 00000 + + diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..b948c95 --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,63 @@ + + + + + + + + + + + + + + + + + ${LOG_PATTERN} + UTF-8 + + + + + ${LOG_HOME}/info/info.log + true + + ${LOG_HOME}/info/info.%d{yyyy-MM-dd}.%i.log + 30 + 2GB + 50MB + + + ${LOG_PATTERN} + UTF-8 + + + + + ${LOG_HOME}/error/error.log + true + true + + ERROR + + + ${LOG_HOME}/error/error.%d{yyyy-MM-dd}.%i.log + 30 + 2GB + 50MB + + + ${LOG_PATTERN} + UTF-8 + + + + + + + + + + diff --git a/src/test/java/com/iot/modbus_rtcp/ModbusRtcpApplicationTests.java b/src/test/java/com/iot/modbus_rtcp/ModbusRtcpApplicationTests.java new file mode 100644 index 0000000..b8932e7 --- /dev/null +++ b/src/test/java/com/iot/modbus_rtcp/ModbusRtcpApplicationTests.java @@ -0,0 +1,13 @@ +package com.iot.modbus_rtcp; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class ModbusRtcpApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java b/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java new file mode 100644 index 0000000..6e7f5df --- /dev/null +++ b/src/test/java/com/iot/modbus_rtcp/NonBlockingSocketTest.java @@ -0,0 +1,113 @@ +package com.iot.modbus_rtcp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.time.LocalDateTime; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +/** + * @author 王仕龙 + * 2024/11/20 16:01 + */ +public class NonBlockingSocketTest { + + private static final String HOST = "127.0.0.1"; + private static final Integer PORT = 1200; + + public static void main(String[] args) throws IOException { + SocketChannel socketChannel = SocketChannel.open(); + // 设置为非阻塞模式 + socketChannel.configureBlocking(false); + socketChannel.connect(new InetSocketAddress(HOST, PORT)); + + while (!socketChannel.finishConnect()) { + // 等待连接建立 + System.out.println("正在建立连接..."); + } + + long lastSentHeartBeatTime = System.currentTimeMillis(); + // 连接已建立,发送和接收数据 +// ByteBuffer heartBeatBuffer = ByteBuffer.wrap("Hello, Server!".getBytes()); + ByteBuffer heartBeatBuffer = ByteBuffer.wrap("KENENG1400000358".getBytes()); +// ByteBuffer heartBeatBuffer = ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes()); + socketChannel.write(heartBeatBuffer); + + long nowTime = 0L; + ByteBuffer readBuffer = ByteBuffer.allocate(1024); + while (socketChannel.isConnected()) { + readBuffer.clear(); + socketChannel.read(readBuffer); + readBuffer.flip(); + // 是否有可用数据 + if (!readBuffer.hasRemaining()) { + nowTime = System.currentTimeMillis(); + if (nowTime - lastSentHeartBeatTime > 5000) { + lastSentHeartBeatTime = nowTime; + socketChannel.write(heartBeatBuffer); + } + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500)); + continue; + } + String line = byteBufferToHexString(readBuffer).trim(); + lastSentHeartBeatTime = System.currentTimeMillis(); + + System.out.println(LocalDateTime.now() + "<==:收到服务器端请求:" + line); + + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); + + // 接收到请求 + switch (line.toUpperCase()) { + case "01040000001531C5" -> + // 发送响应字符串 + socketChannel.write(ByteBuffer.wrap("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "").getBytes())); + // 接收到开井请求 + case "01050001FF00DDFA" -> + // 发送响应字符串 + socketChannel.write(ByteBuffer.wrap("01050001FF00DDFA".getBytes())); + + // 接收到关井请求 + case "0105000100009C0A" -> + // 发送响应字符串 + socketChannel.write(ByteBuffer.wrap("0105000100009C0A".getBytes())); + + // 接收到读取运行模式请求 + case "010300640001C5D5" -> + // 发送响应字符串 + socketChannel.write(ByteBuffer.wrap("01 03 02 00 03 F8 45".replaceAll(" ", "").getBytes())); + // 接收到退出请求 + case "exit" -> socketChannel.close(); + } + System.out.println(LocalDateTime.now() + "==>:已响应服务器端请求:" + line); + } + + } + + public static String bytesToHexString(byte[] src) { + String strHex = ""; + StringBuilder sb = new StringBuilder(50); + for (int n = 0; n < src.length; n++) { + strHex = Integer.toHexString(src[n] & 0xFF); + // 每个字节由两个字符表示,位数不够,高位补0 + sb.append((strHex.length() == 1) ? "0" + strHex : strHex); + } + return sb.toString().trim().toUpperCase(); + } + + public static String byteBufferToHexString(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + StringBuilder hexString = new StringBuilder(); + for (byte b : bytes) { + String hex = Integer.toHexString(0xff & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + return hexString.toString(); + } + +} diff --git a/src/test/java/com/iot/modbus_rtcp/SocketTest.java b/src/test/java/com/iot/modbus_rtcp/SocketTest.java new file mode 100644 index 0000000..5783d38 --- /dev/null +++ b/src/test/java/com/iot/modbus_rtcp/SocketTest.java @@ -0,0 +1,76 @@ +package com.iot.modbus_rtcp; + +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.InetAddress; +import java.net.Socket; +import java.util.Objects; + +/** + * @author 王仕龙 + * 2024/11/20 15:13 + */ + +public class SocketTest { + + private static final String HOST = "127.0.0.1"; + private static final Integer PORT = 1200; + + @Test + @SneakyThrows + public void testModbus() { + try (Socket socket = new Socket(HOST, PORT, InetAddress.getByName(HOST), 10001); + PrintWriter writer = new PrintWriter(socket.getOutputStream(), true); + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { + socket.setKeepAlive(true); +// socket.setTcpNoDelay(true); + socket.setSoTimeout(1500); + String line = null; + while (true) { + if (socket.isClosed()) { + socket.close(); + break; + } + + try { + // 接收服务器响应 + line = reader.readLine(); + if (responseCommand(line, writer, socket)) break; + } catch (Exception e) { + } + } + } + } + + private static boolean responseCommand(String line, PrintWriter writer, Socket socket) throws IOException { + // 接收到请求 + if (Objects.equals(line, "01040000001531C5")) { + // 发送响应字符串 + writer.println("01 04 2A 07 E8 00 0B 00 13 00 17 00 11 00 23 00 03 00 01 00 00 00 00 00 02 00 3B 03 E7 00 39 00 00 0D AC 0D AC 00 00 00 00 00 00 0A 14 4A 98".replaceAll(" ", "")); + } + // 接收到开井请求 + else if (Objects.equals(line, "01050001FF00DDFA")) { + // 发送响应字符串 + writer.println("01050001FF00DDFA"); + } + // 接收到关井请求 + else if (Objects.equals(line, "0105000100009C0A")) { + // 发送响应字符串 + writer.println("0105000100009C0A"); + } + // 接收到读取运行模式请求 + else if (Objects.equals(line, "010300640001C5D5")) { + // 发送响应字符串 + writer.println("01 03 02 00 03 F8 45".replaceAll(" ", "")); + } else { + socket.close(); + return true; + } + return false; + } +}