Commit 922121a5 authored by duanruiming's avatar duanruiming

[update] 代码优化

parent 676a9330
......@@ -2,10 +2,14 @@ package net.wanji.com;
import net.wanji.com.netty.NettyServer;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
......@@ -17,14 +21,18 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableTransactionManagement
@EnableScheduling
@SuppressWarnings("all")
public class Application implements CommandLineRunner {
public class Application implements ApplicationRunner {
@Qualifier(value = "threadPoolExecutor")
@Autowired
static ThreadPoolTaskExecutor threadPoolExecutor;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
NettyServer.start(3000);
public void run(ApplicationArguments args) throws Exception {
NettyServer.start(5050, threadPoolExecutor);
}
}
......@@ -12,7 +12,9 @@ public class Constants {
public static final String COMMAND_HEAD = "7e";
public static final String COMMAND_TAIL = "7d";
public static final String COMMAND_HEARTBEAT = "01000000000000000170";
public static final String COMMAND_HEARTBEAT = "01000100000001000170";
public static final String COMMAND_SET_IP = "";
/**
* 消息体添加头、尾
......
package net.wanji.com.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import net.wanji.com.cache.netty.NettyMessageCache;
......@@ -19,8 +17,6 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.ByteOrder;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
......@@ -34,48 +30,30 @@ public class NettyServer {
private static final Map<String, ChannelId> IP_PORT_CHANNEL_ID_MAP = new HashMap<>();
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Resource(name = "threadPoolExecutor")
ThreadPoolTaskExecutor threadPoolExecutor;
/**
* 建立连接
*/
public static void start(int port) throws InterruptedException {
final EventLoopGroup parent = new NioEventLoopGroup();
final EventLoopGroup child = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parent, child)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
public static void start(int port, ThreadPoolTaskExecutor threadPoolExecutor) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioSocketChannel socketChannel) {
socketChannel.pipeline()
.addFirst()
// .addLast(new MessageDecoder())
// .addLast(new MessageEnCoder())
// 格林威解码器
.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE, 4, 2, 0, 0, true))
.addLast(new NettyServerHandler());
channelGroup.add(socketChannel);
String key = StringUtils.join(socketChannel.remoteAddress().getHostString(), ":".intern(), socketChannel.remoteAddress().getPort());
log.info("client {} is connect success", key);
IP_PORT_CHANNEL_ID_MAP.put(key, socketChannel.id());
protected void initChannel(NioDatagramChannel datagramChannel) {
datagramChannel.pipeline()
.addLast(new NettyServerHandler(threadPoolExecutor));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
log.info("server is started in {}", channelFuture.channel().localAddress());
//收到停止信号
ChannelFuture channelFuture = bootstrap.bind(port).sync();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
//停掉所有的线程
channelGroup.close().syncUninterruptibly();
parent.shutdownGracefully();
child.shutdownGracefully();
group.shutdownGracefully();
channelFuture.channel().closeFuture().syncUninterruptibly();
log.warn("server is closed");
}));
log.info("udp application is running. binding port is {}", port);
}
public static void sendMessage(String ip, int port, String message) {
......@@ -111,12 +89,12 @@ public class NettyServer {
public void checkWaitTimeout() {
long now = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
NettyMessageCache.NETTY_MESSAGE_RESULT_MAP.forEach((k, v) -> {
threadPoolExecutor.execute(() -> {
// threadPoolExecutor.execute(() -> {
if (now - v.getStartTime() > v.getWaitMillisecond()) {
log.warn("wait {}ms {} timeout", v.getWaitMillisecond(), k);
v.getCountDownLatch().countDown();
}
});
});
// });
}
}
\ No newline at end of file
package net.wanji.com.netty.handler;
import cn.hutool.core.util.HexUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.wanji.com.cache.CrossInfoCache;
......@@ -14,16 +17,24 @@ import net.wanji.com.pojo.netty.MessageResultPojo;
import net.wanji.common.framework.spring.ServiceBeanContext;
import net.wanji.databus.po.CrossInfoPO;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.Objects;
@Slf4j
@NoArgsConstructor
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public class NettyServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Resource
private CrossInfoCache crossInfoCache;
private ThreadPoolTaskExecutor threadPoolExecutor;
public NettyServerHandler(ThreadPoolTaskExecutor threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}
/**
* 建立连接后
*
......@@ -44,9 +55,19 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
ByteBuf content = packet.content();
InetSocketAddress sender = packet.sender();
int i = content.readableBytes();
byte[] b = new byte[i];
content.readBytes(b);
String s = HexUtil.encodeHexStr(b);
// String hexString = HexUtils.bytesToHexString(b);
String key = ctx.channel().remoteAddress().toString();
String data = msg.toString();
// String data = msg.toString();
String data = null;
log.debug("receive server data :{}", data);
String hexSign = AgreementCommon.getHexSign(data);
......
package net.wanji.com.netty.response.impl;
import cn.hutool.core.util.HexUtil;
import lombok.extern.slf4j.Slf4j;
import net.wanji.com.common.constants.Constants;
import net.wanji.com.common.enums.CommandResultSign;
import net.wanji.com.netty.NettyServer;
......@@ -13,6 +15,7 @@ import org.springframework.stereotype.Service;
* @date 2023/05/12 11:00
*/
@Service
@Slf4j
public class HeartBeatService implements CommandResponseService {
@Override
public String getCommandResponse(CommandPojo commandPojo) {
......@@ -23,4 +26,32 @@ public class HeartBeatService implements CommandResponseService {
return heartBeatRequest;
}
// @Scheduled(fixedRate = 1000)
public void heartbeat() {
// String body = String.format("%02x", Constants.COMMAND_HEARTBEAT.length() / 2).concat(Constants.COMMAND_HEARTBEAT);
// String crc16HexStr = CRC16Utils.getCRC16HexStr(body);
// String heartBeatRequest = Constants.buildMessageBody(body, crc16HexStr);
String str = "55ac00000000000017050f11060b0000000000000000000000000000000000";
String s = NettyServer.sendMessage("10.102.1.204", 5050, str, "80", 300);
log.error("信号返回字符串:{}", s);
}
public static void main(String[] args) {
String body = String.format("%04x", Constants.COMMAND_HEARTBEAT.length() / 2).concat(Constants.COMMAND_HEARTBEAT);
String crc16HexStr = CRC16Utils.getCRC16HexStr(body);
String heartBeatRequest = Constants.buildMessageBody(body, crc16HexStr);
System.err.println(heartBeatRequest);
// NettyServer.sendMessage(commandPojo.getSignalIp(), commandPojo.getPort(), heartBeatRequest, CommandResultSign.HEARTBEAT.getHexSign(), 300);
String str = "55AB00A7000000000000000002000000FFFFFFFF00000000FFFFFFFF01000000FFFFFFFF1010101004101010101010100410101000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000";
byte[] bytes = str.getBytes();
System.err.println(HexUtil.encodeHexStr(bytes));
for (byte aByte : bytes) {
System.err.println(HexUtil.toHex(aByte));
}
}
}
server:
port: 32002
port: 8080
servlet:
display-name: Wanji
context-path: /com
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment