package com.iot.modbus_rtcp.netty; import com.iot.modbus_rtcp.dto.CommandTypeComparable; import com.iot.modbus_rtcp.dto.ModbusCommandDto; import com.iot.modbus_rtcp.utils.ModbusFunctionCodeEnums; import com.iot.modbus_rtcp.utils.SpringUtil; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.core.RabbitTemplate; import java.net.InetSocketAddress; import java.util.Objects; /** * */ @Slf4j @ChannelHandler.Sharable public class SyncHandler extends ChannelInboundHandlerAdapter { private final ChannelGroup channelGroup; private final RabbitTemplate rabbitTemplate; public SyncHandler(ChannelGroup channelGroup) { this.channelGroup = channelGroup; this.rabbitTemplate = SpringUtil.getBean(RabbitTemplate.class); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); int port = remoteAddress.getPort(); String ip = remoteAddress.getHostString(); SyncPriorityChannel channel = this.channelGroup.get(ip, port); ModbusCommandDto message = channel.getCurrentMessage(); try { channel.getChannelPromise().setSuccess(); } catch (IllegalStateException e) { log.warn("数据返回慢了,已经超时!"); return; } catch (Exception e) { log.warn("丢失数据:{}", msg); return; } if (Objects.isNull(message)) { log.error("未找到发送源: {}", msg); return; } // 控制返回丢弃 if (CommandTypeComparable.CommandType.CONTROL.equals(message.getType())) { return; } if (!ModbusFunctionCodeEnums.isHandlerHexCode(StringUtils.substring(msg.toString(), 2, 4))) { log.warn("Gateway {} fails to send command {} to return data: {}", channel.getIdentifier(), message.getCommand(), msg); // TODO 可在此处添加设备告警 return; } String json = message.getKey() + "/" + System.currentTimeMillis() + "/" + msg; log.debug("推数据到MQ({}): {}", channel.getCollectQueue(), json); try { this.rabbitTemplate.convertAndSend(channel.getCollectQueue(), json); } catch (Exception e) { log.error("推数据到MQ失败({})", channel.getCollectQueue(), e); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("用户事件: {}-> {}", evt.getClass().getName(), evt); super.userEventTriggered(ctx, evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.warn("异常: ", cause); InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ipKey = ChannelGroup.getKey(remoteAddress.getHostString(), remoteAddress.getPort()); this.channelGroup.remove(ipKey); String gatewayIdentifier = IPGatewayRelation.getGatewayIdentifier(ipKey); this.channelGroup.remove(gatewayIdentifier); ctx.close(); } }