Commit ed96871a authored by duanruiming's avatar duanruiming

[update] 优化海信netty框架

parent b7219371
package net.wanji.utc.hisense; package net.wanji.utc.hisense;
import net.wanji.utc.hisense.netty.TcpClient;
import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
...@@ -33,5 +34,6 @@ public class HisenseApplication implements CommandLineRunner { ...@@ -33,5 +34,6 @@ public class HisenseApplication implements CommandLineRunner {
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
//UdpClient.connection(localPort, remoteProt); //UdpClient.connection(localPort, remoteProt);
//TcpClient.connection(remoteIp, remoteProt); //TcpClient.connection(remoteIp, remoteProt);
TcpClient.connection("127.0.0.1", remoteProt);
} }
} }
package net.wanji.utc.hisense.netty; package net.wanji.utc.hisense.netty;
import cn.hutool.core.util.HexUtil;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.utc.hisense.cache.CrossInfoCache;
import net.wanji.utc.hisense.cache.netty.NettyMessageCache; import net.wanji.utc.hisense.cache.netty.NettyMessageCache;
import net.wanji.utc.hisense.common.enums.xml.MessageTypeEnum; import net.wanji.utc.hisense.common.enums.xml.MessageTypeEnum;
import net.wanji.utc.hisense.netty.codec.MessageDecoder; import net.wanji.utc.hisense.netty.codec.MessageDecoder;
import net.wanji.utc.hisense.netty.codec.MessageEnCoder; import net.wanji.utc.hisense.netty.codec.MessageEnCoder;
import net.wanji.utc.hisense.netty.handler.TcpClientHandler; import net.wanji.utc.hisense.netty.handler.TcpClientHandler;
import net.wanji.utc.hisense.pojo.netty.MessageResultPojo;
import net.wanji.utc.hisense.pojo.xml.pojo.messagecontent.HeartBeatRequest; import net.wanji.utc.hisense.pojo.xml.pojo.messagecontent.HeartBeatRequest;
import net.wanji.utc.hisense.util.XMLUtils; import net.wanji.utc.hisense.util.XMLUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.concurrent.CountDownLatch; import java.util.Objects;
import java.util.concurrent.TimeUnit;
/** /**
* @author duanruiming * @author duanruiming
...@@ -52,8 +44,6 @@ public class TcpClient { ...@@ -52,8 +44,6 @@ public class TcpClient {
.handler(new ChannelInitializer<Channel>() { // 设置通道初始化对象 .handler(new ChannelInitializer<Channel>() { // 设置通道初始化对象
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
//ch.pipeline().addLast(new XmlMessageDecoder()); // 添加自定义的解码器
//ch.pipeline().addLast(new XmlMessageEncoder()); // 添加自定义的编码器
ch.pipeline().addLast(new MessageDecoder()); ch.pipeline().addLast(new MessageDecoder());
ch.pipeline().addLast(new MessageEnCoder()); ch.pipeline().addLast(new MessageEnCoder());
ch.pipeline().addLast(new TcpClientHandler()); // 添加自定义的处理器 ch.pipeline().addLast(new TcpClientHandler()); // 添加自定义的处理器
...@@ -61,47 +51,53 @@ public class TcpClient { ...@@ -61,47 +51,53 @@ public class TcpClient {
}); });
// 启动客户端,连接到服务器端,并同步,生成ChannelFuture对象 // 启动客户端,连接到服务器端,并同步,生成ChannelFuture对象
ChannelFuture f = b.connect(host, port).sync(); ChannelFuture f = b.connect(host, port).sync();
//channelFutureMap.put(f.channel().id(), f);
tcpChannelFuture = f; tcpChannelFuture = f;
// 对关闭通道进行监听 // 对关闭通道进行监听
f.channel().closeFuture().sync(); //f.channel().closeFuture().sync();
} finally { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
group.shutdownGracefully(); // 关闭线程组 group.shutdownGracefully();
f.channel().closeFuture().syncUninterruptibly();
log.warn("tcp客户端关闭!");
}));
} catch (Exception e) {
log.error("tcp客户端连接服务端失败", e);
} }
} }
public static void sendMessage(String ip, Integer port, String msg) { //public static void sendMessage(String ip, Integer port, String msg) {
byte[] bytes = HexUtil.decodeHex(msg); // byte[] bytes = HexUtil.decodeHex(msg);
try { // try {
if (tcpChannelFuture == null) { // if (channelFutureMap.isEmpty()) {
return; // return;
} // }
tcpChannelFuture.channel() // tcpChannelFuture.channel()
.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(bytes), new InetSocketAddress(ip, port))).sync(); // .writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(bytes), new InetSocketAddress(ip, port))).sync();
log.debug("发送信号机:{}/{}命令消息:{}", ip, port, msg); // log.debug("发送信号机:{}/{}命令消息:{}", ip, port, msg);
} catch (InterruptedException e) { // } catch (InterruptedException e) {
log.error("sendMsg is error", e); // log.error("sendMsg is error", e);
} // }
} //}
public static MessageResultPojo sendMessage(String ip, Integer port, String msg, String command, int timeout) { //public static MessageResultPojo sendMessage(String ip, Integer port, String msg, String command, int timeout) {
try { // try {
if (timeout > 0) { // if (timeout > 0) {
CountDownLatch countDownLatch = new CountDownLatch(1); // CountDownLatch countDownLatch = new CountDownLatch(1);
String key = StringUtils.join("/", ip, ":", port, "/", command); // String key = StringUtils.join("/", ip, ":", port, "/", command);
long now = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); // long now = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
MessageResultPojo messageResultPojo = new MessageResultPojo(command, countDownLatch, now, timeout, null); // MessageResultPojo messageResultPojo = new MessageResultPojo(command, countDownLatch, now, timeout, null);
NettyMessageCache.NETTY_RESULT_TIMEOUT_MAP.put(key, messageResultPojo); // NettyMessageCache.NETTY_RESULT_TIMEOUT_MAP.put(key, messageResultPojo);
NettyMessageCache.COMMAND_RESULT_SIGN_MAP.put(key, CrossInfoCache.getCrossInfoByIp(ip)); // NettyMessageCache.COMMAND_RESULT_SIGN_MAP.put(key, CrossInfoCache.getCrossInfoByIp(ip));
sendMessage(ip, port, msg); // sendMessage(ip, port, msg);
countDownLatch.await(timeout, TimeUnit.MILLISECONDS); // countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return NettyMessageCache.NETTY_RESULT_TIMEOUT_MAP.remove(key); // return NettyMessageCache.NETTY_RESULT_TIMEOUT_MAP.remove(key);
} // }
} catch (InterruptedException e) { // } catch (InterruptedException e) {
log.error("sendMsg is error", e); // log.error("sendMsg is error", e);
} // }
return null; // return null;
} //}
@Scheduled(fixedRate = 300) @Scheduled(fixedRate = 300)
public void checkWaitTimeout() { public void checkWaitTimeout() {
...@@ -116,13 +112,15 @@ public class TcpClient { ...@@ -116,13 +112,15 @@ public class TcpClient {
}); });
} }
@Scheduled(fixedRate = 1000 * 20) @Scheduled(initialDelay = 1000 * 30, fixedRate = 1000 * 20)
public void hearBeat() throws Exception { public void hearBeat() throws Exception {
try { try {
HeartBeatRequest heartBeatRequest = new HeartBeatRequest(); HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
heartBeatRequest.setMessageType(MessageTypeEnum.HEARTBEAT_STATUS.getType()); heartBeatRequest.setMessageType(MessageTypeEnum.HEARTBEAT_STATUS.getType());
log.error("心跳消息:{}", XMLUtils.convertToXml(heartBeatRequest)); log.error("心跳消息:{}", XMLUtils.convertToXml(heartBeatRequest));
if (Objects.nonNull(tcpChannelFuture)) {
tcpChannelFuture.channel().writeAndFlush(XMLUtils.strToHex(XMLUtils.convertToXml(heartBeatRequest))); tcpChannelFuture.channel().writeAndFlush(XMLUtils.strToHex(XMLUtils.convertToXml(heartBeatRequest)));
}
} catch (Exception e) { } catch (Exception e) {
log.error("每20秒心跳发送失败:", e); log.error("每20秒心跳发送失败:", e);
throw new Exception(e); throw new Exception(e);
......
...@@ -21,7 +21,6 @@ import org.springframework.stereotype.Component; ...@@ -21,7 +21,6 @@ import org.springframework.stereotype.Component;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
...@@ -39,6 +38,12 @@ public class TcpClientHandler extends SimpleChannelInboundHandler<DatagramPacket ...@@ -39,6 +38,12 @@ public class TcpClientHandler extends SimpleChannelInboundHandler<DatagramPacket
ctx.fireChannelActive(); ctx.fireChannelActive();
} }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.error("收到消息channelRead:{}", msg);
super.channelRead(ctx, msg);
}
/** /**
* 读取消息 * 读取消息
* *
...@@ -48,6 +53,9 @@ public class TcpClientHandler extends SimpleChannelInboundHandler<DatagramPacket ...@@ -48,6 +53,9 @@ public class TcpClientHandler extends SimpleChannelInboundHandler<DatagramPacket
*/ */
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
log.error("收到消息channelRead0:{}", packet);
//String result1 = result;
//DatagramPacket packet = new DatagramPacket(null, null);
ByteBuf content = packet.content(); ByteBuf content = packet.content();
InetSocketAddress remote = packet.sender(); InetSocketAddress remote = packet.sender();
int i = content.readableBytes(); int i = content.readableBytes();
...@@ -102,9 +110,4 @@ public class TcpClientHandler extends SimpleChannelInboundHandler<DatagramPacket ...@@ -102,9 +110,4 @@ public class TcpClientHandler extends SimpleChannelInboundHandler<DatagramPacket
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
} }
public static void main(String[] args) {
Date date = new Date(1705024396764L);
System.err.println(date);
}
} }
\ No newline at end of file
...@@ -29,7 +29,7 @@ public class XMLUtils { ...@@ -29,7 +29,7 @@ public class XMLUtils {
// 创建输出流 // 创建输出流
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
try { try {
sw.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n\r"); sw.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\r\n");
// 利用jdk中自带的转换类实现 // 利用jdk中自带的转换类实现
JAXBContext context = JAXBContext.newInstance(obj.getClass()); JAXBContext context = JAXBContext.newInstance(obj.getClass());
Marshaller marshaller = context.createMarshaller(); Marshaller marshaller = context.createMarshaller();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment