Commit 35659b04 authored by duanruiming's avatar duanruiming

[update] 代码优化

parent 76ccb0b0
package net.wanji.com; package net.wanji.com;
import net.wanji.com.netty.NettyServer; import net.wanji.com.netty.NettyClient;
import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
...@@ -33,6 +33,6 @@ public class Application implements ApplicationRunner { ...@@ -33,6 +33,6 @@ public class Application implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
NettyServer.start(5050, threadPoolExecutor); NettyClient.start(8081, threadPoolExecutor);
} }
} }
package net.wanji.com.netty; package net.wanji.com.netty;
import cn.hutool.core.util.HexUtil;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -17,19 +20,21 @@ import org.springframework.scheduling.annotation.Scheduled; ...@@ -17,19 +20,21 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Component @Component
public class NettyServer { public class NettyClient {
private static final Map<String, ChannelId> IP_PORT_CHANNEL_ID_MAP = new HashMap<>(); private static final Map<String, ChannelId> IP_PORT_CHANNEL_ID_MAP = new HashMap<>();
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static ChannelFuture udpChannelFuture = null;
/** /**
* 建立连接 * 建立连接
...@@ -45,9 +50,6 @@ public class NettyServer { ...@@ -45,9 +50,6 @@ public class NettyServer {
protected void initChannel(NioDatagramChannel datagramChannel) { protected void initChannel(NioDatagramChannel datagramChannel) {
datagramChannel.pipeline() datagramChannel.pipeline()
.addLast(new NettyServerHandler(threadPoolExecutor)); .addLast(new NettyServerHandler(threadPoolExecutor));
String key = StringUtils.join(datagramChannel.remoteAddress().getHostString(), ":".intern(), datagramChannel.remoteAddress().getPort());
log.info("client {} is connect success", key);
IP_PORT_CHANNEL_ID_MAP.put(key, datagramChannel.id());
} }
}); });
ChannelFuture channelFuture = bootstrap.bind(port).sync(); ChannelFuture channelFuture = bootstrap.bind(port).sync();
...@@ -57,38 +59,37 @@ public class NettyServer { ...@@ -57,38 +59,37 @@ public class NettyServer {
log.warn("server is closed"); log.warn("server is closed");
})); }));
log.info("udp application is running. binding port is {}", port); log.info("udp application is running. binding port is {}", port);
NettyClient.udpChannelFuture = channelFuture;
} }
public static void sendMessage(String ip, int port, String message) { public static void sendMsg(String ip, Integer port, String msg) {
String key = StringUtils.join(ip, ":", port); byte[] bytes = HexUtil.decodeHex(msg);
if (!IP_PORT_CHANNEL_ID_MAP.isEmpty()) { try {
ChannelId channelId = IP_PORT_CHANNEL_ID_MAP.get(key); if (udpChannelFuture == null) {
Channel channel = channelGroup.find(channelId); return;
channel.writeAndFlush(message); }
udpChannelFuture.channel()
.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(bytes), new InetSocketAddress(ip, port))).sync();
log.info("send msg {} to {}/{}", msg, ip, port);
} catch (InterruptedException e) {
log.error("sendMsg is error", e);
} }
} }
public static Object sendMessage(String ip, int port, String message, String command) { public static MessageResultPojo sendMsg(String ip, Integer port, String msg, String command, int timeout) {
sendMessage(ip, port, message, command, 500);
return null;
}
public static String sendMessage(String ip, int port, String message, String command, int waitMillisecond) {
CountDownLatch countDownLatch = new CountDownLatch(1);
String key = StringUtils.joinWith("#", ip, port, command);
long now = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
MessageResultPojo messageResultPojo = new MessageResultPojo(command, countDownLatch, now, waitMillisecond, null);
NettyMessageCache.NETTY_MESSAGE_RESULT_MAP.put(key, messageResultPojo);
try { try {
sendMessage(ip, port, message); if (timeout > 0) {
countDownLatch.await(); CountDownLatch countDownLatch = new CountDownLatch(1);
String key = StringUtils.joinWith("#", ip, port, command);
long now = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
MessageResultPojo messageResultPojo = new MessageResultPojo(command, countDownLatch, now, timeout, null);
NettyMessageCache.NETTY_MESSAGE_RESULT_MAP.put(key, messageResultPojo);
sendMsg(ip, port, msg);
countDownLatch.await(300, TimeUnit.MILLISECONDS);
return NettyMessageCache.NETTY_MESSAGE_RESULT_MAP.remove(key);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("error", e); log.error("sendMsg is error", e);
}
MessageResultPojo resultPojo = NettyMessageCache.NETTY_MESSAGE_RESULT_MAP.remove(key);
log.info("收到返回结果: {}", resultPojo);
if (Objects.nonNull(resultPojo)) {
return resultPojo.getHexMessageResult();
} }
return null; return null;
} }
......
...@@ -4,11 +4,11 @@ import cn.hutool.core.util.HexUtil; ...@@ -4,11 +4,11 @@ import cn.hutool.core.util.HexUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.com.common.constants.Constants; import net.wanji.com.common.constants.Constants;
import net.wanji.com.common.enums.CommandResultSign; import net.wanji.com.common.enums.CommandResultSign;
import net.wanji.com.netty.NettyServer; import net.wanji.com.netty.NettyClient;
import net.wanji.com.netty.pojo.CommandPojo; import net.wanji.com.netty.pojo.CommandPojo;
import net.wanji.com.netty.response.CommandResponseService; import net.wanji.com.netty.response.CommandResponseService;
import net.wanji.com.pojo.netty.MessageResultPojo;
import net.wanji.com.util.CRC16Utils; import net.wanji.com.util.CRC16Utils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
...@@ -23,18 +23,18 @@ public class HeartBeatService implements CommandResponseService { ...@@ -23,18 +23,18 @@ public class HeartBeatService implements CommandResponseService {
String body = String.format("%02x", Constants.COMMAND_HEARTBEAT.length() / 2).concat(Constants.COMMAND_HEARTBEAT); String body = String.format("%02x", Constants.COMMAND_HEARTBEAT.length() / 2).concat(Constants.COMMAND_HEARTBEAT);
String crc16HexStr = CRC16Utils.getCRC16HexStr(body); String crc16HexStr = CRC16Utils.getCRC16HexStr(body);
String heartBeatRequest = Constants.buildMessageBody(body, crc16HexStr); String heartBeatRequest = Constants.buildMessageBody(body, crc16HexStr);
NettyServer.sendMessage(commandPojo.getSignalIp(), commandPojo.getPort(), heartBeatRequest, CommandResultSign.HEARTBEAT.getHexSign(), 300); NettyClient.sendMsg(commandPojo.getSignalIp(), commandPojo.getPort(), heartBeatRequest, CommandResultSign.HEARTBEAT.getHexSign(), 300);
return heartBeatRequest; return heartBeatRequest;
} }
@Scheduled(fixedRate = 1000) // @Scheduled(fixedRate = 1000)
public void heartbeat() { public void heartbeat() {
String body = String.format("%02x", Constants.COMMAND_HEARTBEAT.length() / 2).concat(Constants.COMMAND_HEARTBEAT); String body = String.format("%02x", Constants.COMMAND_HEARTBEAT.length() / 2).concat(Constants.COMMAND_HEARTBEAT);
String crc16HexStr = CRC16Utils.getCRC16HexStr(body); String crc16HexStr = CRC16Utils.getCRC16HexStr(body);
String heartBeatRequest = Constants.buildMessageBody(body, crc16HexStr); String heartBeatRequest = Constants.buildMessageBody(body, crc16HexStr);
String str = "55ac00000000000017050f11060b0000000000000000000000000000000000"; String str = "55ac00000000000017050f11060b0000000000000000000000000000000000";
String s = NettyServer.sendMessage("10.102.1.204", 5050, str, "80", 300); MessageResultPojo resultPojo = NettyClient.sendMsg("10.102.1.204", 5050, str, "80", 300);
log.error("信号返回字符串:{}", s); log.error("信号返回字符串:{}", resultPojo);
} }
public static void main(String[] args) { public static void main(String[] args) {
......
package net.wanji.com.service.protocol.impl.base; package net.wanji.com.service.protocol.impl.base;
import net.wanji.com.netty.NettyServer; import net.wanji.com.netty.NettyClient;
import net.wanji.com.pojo.netty.MessageResultPojo;
import net.wanji.com.service.protocol.ProtocolConversion; import net.wanji.com.service.protocol.ProtocolConversion;
import net.wanji.databus.po.CrossInfoPO; import net.wanji.databus.po.CrossInfoPO;
...@@ -11,7 +12,7 @@ import net.wanji.databus.po.CrossInfoPO; ...@@ -11,7 +12,7 @@ import net.wanji.databus.po.CrossInfoPO;
public class PhaseProtocolConversionService implements ProtocolConversion<Object, Object> { public class PhaseProtocolConversionService implements ProtocolConversion<Object, Object> {
@Override @Override
public Object protocolConvertEntity(CrossInfoPO crossInfo) { public Object protocolConvertEntity(CrossInfoPO crossInfo) {
String hexResult = NettyServer.sendMessage("ip", 80, "aadd", "1122", 300); MessageResultPojo resultPojo = NettyClient.sendMsg("ip", 80, "aadd", "1122", 300);
return null; return null;
} }
......
...@@ -2,7 +2,7 @@ server: ...@@ -2,7 +2,7 @@ server:
port: 39002 port: 39002
servlet: servlet:
display-name: Wanji display-name: Wanji
context-path: /com context-path: /utc-dt
multipart: multipart:
max-file-size: 50MB max-file-size: 50MB
max-request-size: 50MB max-request-size: 50MB
......
spring: spring:
# dubbo启动需要程序名称 # dubbo启动需要程序名称
application: application:
name: com name: utc-dt
profiles: profiles:
active: dev active: dev
jackson: jackson:
...@@ -14,7 +14,7 @@ spring: ...@@ -14,7 +14,7 @@ spring:
pathmatch: pathmatch:
matching-strategy: ant_path_matcher matching-strategy: ant_path_matcher
service: service:
name: signal-communication-service name: signal-utc-dt-service
mybatis: mybatis:
type-aliases-package: net.wanji.*.model type-aliases-package: net.wanji.*.model
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment