Commit 10ec2bf0 authored by duanruiming's avatar duanruiming

[update] 优化东土协议

parent 909bef1d
package net.wanji.com.netty; package net.wanji.com.netty;
import cn.hutool.core.util.HexUtil; import io.netty.bootstrap.ServerBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.com.cache.netty.NettyMessageCache; import net.wanji.com.cache.netty.NettyMessageCache;
import net.wanji.com.netty.handler.NettyServerHandler; import net.wanji.com.netty.handler.NettyServerHandler;
...@@ -17,7 +19,7 @@ import org.springframework.scheduling.annotation.Scheduled; ...@@ -17,7 +19,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.net.InetSocketAddress; import java.nio.ByteOrder;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.HashMap; import java.util.HashMap;
...@@ -30,52 +32,55 @@ import java.util.concurrent.TimeUnit; ...@@ -30,52 +32,55 @@ import java.util.concurrent.TimeUnit;
public class NettyClient { public class NettyClient {
private static final Map<String, ChannelId> IP_PORT_CHANNEL_ID_MAP = new HashMap<>(); private static final Map<String, ChannelId> IP_PORT_CHANNEL_ID_MAP = new HashMap<>();
private static ChannelFuture udpChannelFuture = null; private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/** /**
* 建立连接 * 建立连接
*/ */
public static void start(int localPort, int remotePort, ThreadPoolTaskExecutor threadPoolExecutor) throws InterruptedException { public static void start(int localPort, int remotePort, ThreadPoolTaskExecutor threadPoolExecutor) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup(); final EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap(); final EventLoopGroup child = new NioEventLoopGroup();
bootstrap.group(group) ServerBootstrap bootstrap = new ServerBootstrap();
.channel(NioDatagramChannel.class) bootstrap.group(group, child)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535)) .channel(NioServerSocketChannel.class)
// .localAddress(new InetSocketAddress(5051)) .childOption(ChannelOption.TCP_NODELAY, true)
// 绑定端口 .childOption(ChannelOption.SO_KEEPALIVE, true)
.remoteAddress(new InetSocketAddress(remotePort)) .handler(new ChannelInitializer<NioSocketChannel>() {
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override @Override
protected void initChannel(NioDatagramChannel datagramChannel) { protected void initChannel(NioSocketChannel socketChannel) {
datagramChannel.pipeline() socketChannel.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE, 4, 2, 0, 0, true))
.addLast(new NettyServerHandler(threadPoolExecutor)); .addLast(new NettyServerHandler(threadPoolExecutor));
channelGroup.add(socketChannel);
String key = StringUtils.join(socketChannel.remoteAddress().getHostString(), ":".intern(), socketChannel.remoteAddress().getPort());
log.info("client {} is connect success", key);
IP_PORT_CHANNEL_ID_MAP.put(key, socketChannel.id());
} }
}); });
ChannelFuture channelFuture = bootstrap.bind(localPort).sync(); ChannelFuture channelFuture = bootstrap.bind(localPort).sync();
log.info("server is started in {}", channelFuture.channel().localAddress());
//收到停止信号
Runtime.getRuntime().addShutdownHook(new Thread(() -> { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
//停掉所有的线程
channelGroup.close().syncUninterruptibly();
group.shutdownGracefully(); group.shutdownGracefully();
child.shutdownGracefully();
channelFuture.channel().closeFuture().syncUninterruptibly(); channelFuture.channel().closeFuture().syncUninterruptibly();
log.warn("server is closed"); log.warn("server is closed");
})); }));
log.info("udp application is running. binding port is {}", remotePort);
NettyClient.udpChannelFuture = channelFuture;
} }
public static void sendMsg(String ip, Integer port, String msg) { public static void sendMessage(String ip, int port, String message) {
byte[] bytes = HexUtil.decodeHex(msg); Channel channel = channelGroup.find(IP_PORT_CHANNEL_ID_MAP.get(StringUtils.join(ip, ":", port)));
try { channel.writeAndFlush(message);
if (udpChannelFuture == null) { }
return;
} public static Object sendMessage(String ip, int port, String message, String command) {
udpChannelFuture.channel() sendMessage(ip, port, message, command, 500);
.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(bytes), new InetSocketAddress(ip, port))).sync(); return null;
log.info("send msg {} to {}/{}", msg, ip, port);
} catch (InterruptedException e) {
log.error("sendMsg is error", e);
}
} }
public static MessageResultPojo sendMsg(String ip, Integer port, String msg, String command, int timeout) { public static MessageResultPojo sendMessage(String ip, Integer port, String msg, String command, int timeout) {
try { try {
if (timeout > 0) { if (timeout > 0) {
CountDownLatch countDownLatch = new CountDownLatch(1); CountDownLatch countDownLatch = new CountDownLatch(1);
...@@ -83,7 +88,7 @@ public class NettyClient { ...@@ -83,7 +88,7 @@ public class NettyClient {
long now = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); long now = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
MessageResultPojo messageResultPojo = new MessageResultPojo(command, countDownLatch, now, timeout, null); MessageResultPojo messageResultPojo = new MessageResultPojo(command, countDownLatch, now, timeout, null);
NettyMessageCache.NETTY_MESSAGE_RESULT_MAP.put(key, messageResultPojo); NettyMessageCache.NETTY_MESSAGE_RESULT_MAP.put(key, messageResultPojo);
sendMsg(ip, port, msg); sendMessage(ip, port, msg);
countDownLatch.await(300, TimeUnit.MILLISECONDS); countDownLatch.await(300, TimeUnit.MILLISECONDS);
return NettyMessageCache.NETTY_MESSAGE_RESULT_MAP.remove(key); return NettyMessageCache.NETTY_MESSAGE_RESULT_MAP.remove(key);
} }
......
package net.wanji.com.netty.handler; package net.wanji.com.netty.handler;
import cn.hutool.core.util.HexUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.com.cache.CrossInfoCache; import net.wanji.com.cache.CrossInfoCache;
import net.wanji.com.cache.netty.NettyMessageCache; import net.wanji.com.cache.netty.NettyMessageCache;
...@@ -19,11 +16,10 @@ import org.apache.commons.lang3.StringUtils; ...@@ -19,11 +16,10 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.Objects; import java.util.Objects;
@Slf4j @Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<DatagramPacket> { public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Resource @Resource
private CrossInfoCache crossInfoCache; private CrossInfoCache crossInfoCache;
...@@ -34,30 +30,31 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<DatagramPack ...@@ -34,30 +30,31 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<DatagramPack
} }
/** /**
* 读取消息 * 建立连接后
* *
* @param ctx * @param ctx
* @param packet
* @throws Exception * @throws Exception
*/ */
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("收到服务端包数据: {}", packet); log.info("{} 连接成功....", ctx.channel().remoteAddress());
ByteBuf content = packet.content(); ctx.fireChannelActive();
InetSocketAddress sender = packet.sender(); }
log.info("收到消息地址: {}, 端口: {}", sender.getAddress(), sender.getPort());
int i = content.readableBytes();
byte[] b = new byte[i];
content.readBytes(b);
String s = HexUtil.encodeHexStr(b);
log.info("收到返回消息:{}", s);
// String hexString = HexUtils.bytesToHexString(b);
/**
* 读取消息
*
* @param ctx
* @param message
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
String key = ctx.channel().remoteAddress().toString(); String key = ctx.channel().remoteAddress().toString();
// String data = msg.toString(); String data = message.toString();
String data = null; log.info("收到数据体: {}", data);
log.debug("receive server data :{}", data); log.info("收到消息地址: {}", key);
String hexSign = AgreementCommon.getHexSign(data); String hexSign = AgreementCommon.getHexSign(data);
if (StringUtils.isEmpty(hexSign)) { if (StringUtils.isEmpty(hexSign)) {
...@@ -65,7 +62,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<DatagramPack ...@@ -65,7 +62,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<DatagramPack
return; return;
} }
String className = CommandResultSign.getClassNameByHexSign(hexSign); String className = CommandResultSign.getClassNameByHexSign(hexSign);
CommandResponseService commandResponseService = (CommandResponseService) ServiceBeanContext.getBean(className); CommandResponseService commandResponseService = (CommandResponseService) ServiceBeanContext.getBean("heartBeatService");
CommandPojo commandPojo = getCommandPojo(key, data); CommandPojo commandPojo = getCommandPojo(key, data);
String hexResult = commandResponseService.getCommandResponse(commandPojo); String hexResult = commandResponseService.getCommandResponse(commandPojo);
if (Objects.nonNull(hexResult)) { if (Objects.nonNull(hexResult)) {
......
...@@ -24,7 +24,7 @@ public class HeartBeatService implements CommandResponseService { ...@@ -24,7 +24,7 @@ public class HeartBeatService implements CommandResponseService {
String body = String.format("%02x", Constants.COMMAND_HEARTBEAT.length() / 2).concat(Constants.COMMAND_HEARTBEAT); String body = String.format("%02x", Constants.COMMAND_HEARTBEAT.length() / 2).concat(Constants.COMMAND_HEARTBEAT);
String crc16HexStr = CRC16Utils.getCRC16HexStr(body); String crc16HexStr = CRC16Utils.getCRC16HexStr(body);
String heartBeatRequest = Constants.buildMessageBody(body, crc16HexStr); String heartBeatRequest = Constants.buildMessageBody(body, crc16HexStr);
NettyClient.sendMsg(commandPojo.getSignalIp(), commandPojo.getPort(), heartBeatRequest, CommandResultSign.HEARTBEAT.getHexSign(), 300); NettyClient.sendMessage(commandPojo.getSignalIp(), commandPojo.getPort(), heartBeatRequest, CommandResultSign.HEARTBEAT.getHexSign(), 300);
return heartBeatRequest; return heartBeatRequest;
} }
...@@ -34,9 +34,13 @@ public class HeartBeatService implements CommandResponseService { ...@@ -34,9 +34,13 @@ public class HeartBeatService implements CommandResponseService {
String crc16HexStr = CRC16Utils.getCRC16HexStr(body); String crc16HexStr = CRC16Utils.getCRC16HexStr(body);
String heartBeatRequest = Constants.buildMessageBody(body, crc16HexStr); String heartBeatRequest = Constants.buildMessageBody(body, crc16HexStr);
// String str = "55ac00000000000017050f11060b0000000000000000000000000000000000"; // String str = "55ac00000000000017050f11060b0000000000000000000000000000000000";
String str = "0x7e00120001010000000101011001000501030000005CB40x7d"; String str = "2a320d0a24340d0a494e464f0d0a2431310d0a7265706c69636174696f6e0d0a";
MessageResultPojo resultPojo = NettyClient.sendMsg("10.102.1.204", 5050, str, "80", 300); MessageResultPojo resultPojo = NettyClient.sendMessage("10.102.1.204", 5050, str, "80", 300);
log.info("信号返回字符串:{}", resultPojo); log.info("信号返回字符串:{}", resultPojo);
String str1 = "2a310d0a24340d0a50494e470d0a";
MessageResultPojo resultPojo1 = NettyClient.sendMessage("10.102.1.204", 5050, str1, "80", 300);
log.info("信号返回字符串1:{}", resultPojo1);
} }
public static void main(String[] args) { public static void main(String[] args) {
......
...@@ -12,7 +12,7 @@ import net.wanji.databus.po.CrossInfoPO; ...@@ -12,7 +12,7 @@ import net.wanji.databus.po.CrossInfoPO;
public class PhaseProtocolConversionService implements ProtocolConversion<Object, Object> { public class PhaseProtocolConversionService implements ProtocolConversion<Object, Object> {
@Override @Override
public Object protocolConvertEntity(CrossInfoPO crossInfo) { public Object protocolConvertEntity(CrossInfoPO crossInfo) {
MessageResultPojo resultPojo = NettyClient.sendMsg("ip", 80, "aadd", "1122", 300); MessageResultPojo resultPojo = NettyClient.sendMessage("ip", 80, "aadd", "1122", 300);
return null; return null;
} }
......
...@@ -16,7 +16,7 @@ import org.springframework.stereotype.Service; ...@@ -16,7 +16,7 @@ import org.springframework.stereotype.Service;
public class SignalDeviceProtocolConversionService implements ProtocolConversion { public class SignalDeviceProtocolConversionService implements ProtocolConversion {
@Override @Override
public Object protocolConvertEntity(CrossInfoPO crossInfo) { public Object protocolConvertEntity(CrossInfoPO crossInfo) {
MessageResultPojo resultPojo = NettyClient.sendMsg("10.102.1.204", 5050, "0x7e00120001010000000101011001000501030000005CB40x7d", "20", 300); MessageResultPojo resultPojo = NettyClient.sendMessage("10.102.1.204", 5050, "0x7e00120001010000000101011001000501030000005CB40x7d", "20", 300);
log.info("设备基本信息: {}", resultPojo); log.info("设备基本信息: {}", resultPojo);
return null; return null;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment