Commit a55a03be authored by duanruiming's avatar duanruiming

[add] 态势检测-故障报警-信号服务,新增系统服务状态监控

parent 4f471a35
...@@ -16,4 +16,9 @@ public class Constant { ...@@ -16,4 +16,9 @@ public class Constant {
* 分隔符:减号 * 分隔符:减号
**/ **/
public static final String SEPARATOR_MINUS = "-"; public static final String SEPARATOR_MINUS = "-";
public static final String WEBSOCKET_TOPIC_EVENT_WARN = "eventWarn";
public static final String WEBSOCKET_TOPIC_SYS_SERVICE_STATUS = "sysStatus";
} }
...@@ -6,12 +6,13 @@ import lombok.extern.slf4j.Slf4j; ...@@ -6,12 +6,13 @@ import lombok.extern.slf4j.Slf4j;
import net.wanji.common.framework.rest.JsonViewObject; import net.wanji.common.framework.rest.JsonViewObject;
import net.wanji.databus.po.CrossInfoPO; import net.wanji.databus.po.CrossInfoPO;
import net.wanji.feign.service.UtcFeignClients; import net.wanji.feign.service.UtcFeignClients;
import net.wanji.web.common.constant.Constant;
import net.wanji.web.common.util.StringUtils; import net.wanji.web.common.util.StringUtils;
import net.wanji.web.entity.TDeviceStatusInfo; import net.wanji.web.entity.TDeviceStatusInfo;
import net.wanji.web.mapper.TDeviceStatusMapper; import net.wanji.web.mapper.TDeviceStatusMapper;
import net.wanji.web.service.SituationDetectionService; import net.wanji.web.service.SituationDetectionService;
import net.wanji.web.vo.situationDetection.CommonEventAlarmOutVo; import net.wanji.web.vo.situationDetection.CommonEventAlarmOutVo;
import net.wanji.web.websocket.CommonEventWarnWebSocketServer; import net.wanji.web.websocket.WebSocketServer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -21,11 +22,7 @@ import java.io.IOException; ...@@ -21,11 +22,7 @@ import java.io.IOException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* @author duanruiming * @author duanruiming
...@@ -47,11 +44,11 @@ public class CommonEventWarnTask { ...@@ -47,11 +44,11 @@ public class CommonEventWarnTask {
@Scheduled(fixedRate = 1 * 60 * 1000) @Scheduled(fixedRate = 1 * 60 * 1000)
public void pushEvenWarnData() { public void pushEvenWarnData() {
try { try {
ConcurrentHashMap<String, CommonEventWarnWebSocketServer> evenWarnWebSocketMap = CommonEventWarnWebSocketServer.getEvenWarnWebSocketMap(); Set<WebSocketServer> events = WebSocketServer.getWebSocketSet(Constant.WEBSOCKET_TOPIC_EVENT_WARN);
if (!evenWarnWebSocketMap.isEmpty()) { if (!CollectionUtils.isEmpty(events)) {
for (Map.Entry<String, CommonEventWarnWebSocketServer> entry : evenWarnWebSocketMap.entrySet()) { for (WebSocketServer entry : events) {
String key = entry.getKey(); String crossId = entry.getUserId();
getCommonEnventWarnData(key); getCommonEnventWarnData(crossId);
} }
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -59,11 +56,11 @@ public class CommonEventWarnTask { ...@@ -59,11 +56,11 @@ public class CommonEventWarnTask {
} }
} }
private void getCommonEnventWarnData(String key) throws IOException { private void getCommonEnventWarnData(String crossId) throws IOException {
// 信号机故障 // 信号机故障
JsonViewObject signalWarnData = getSignalWarnData(); JsonViewObject signalWarnData = getSignalWarnData();
if (Objects.nonNull(signalWarnData)) { if (Objects.nonNull(signalWarnData)) {
CommonEventWarnWebSocketServer.sendInfo(JSONObject.toJSONString(signalWarnData), key); WebSocketServer.sendInfo(JSONObject.toJSONString(signalWarnData), Constant.WEBSOCKET_TOPIC_EVENT_WARN, crossId);
} }
// todo 数据中心获取路口告警 // todo 数据中心获取路口告警
JsonViewObject crossWarnData = getCrossWarnData(); JsonViewObject crossWarnData = getCrossWarnData();
......
package net.wanji.web.task;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.dto.SystemServiceStatusDTO;
import net.wanji.common.utils.tool.SocketUtil;
import net.wanji.web.common.constant.Constant;
import net.wanji.web.websocket.WebSocketServer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author duanruiming
* @date 2023/03/21 10:42
*/
@Component
@Slf4j
public class SystemServiceStatusTask {
public static final List<SystemServiceStatusDTO> sysList = new ArrayList<SystemServiceStatusDTO>();
static {
sysList.add(new SystemServiceStatusDTO("MYSQL", "10.102.1.182", 3306, 30000, 0, null));
sysList.add(new SystemServiceStatusDTO("UTC", "10.102.1.182", 32000, 30000, 0, null));
sysList.add(new SystemServiceStatusDTO("海康", "10.102.1.183", 80, 30000, 0, null));
}
@Scheduled(fixedRate = 1 * 60 * 1000)
public void telnetStatus() throws Exception {
for (SystemServiceStatusDTO dto : sysList) {
String ip = dto.getIp();
int port = dto.getPort();
int timeout = dto.getTimeout();
int lastStatus = dto.getStatus();
int currentStatus = SocketUtil.telnet(ip, port, timeout) == true ? 0 : 1;
if (currentStatus != lastStatus) {
dto.setStatus(currentStatus);
dto.setStartTime(new Date());
}
}
WebSocketServer.sendInfo2All(JSON.toJSONString(sysList), Constant.WEBSOCKET_TOPIC_SYS_SERVICE_STATUS);
}
}
...@@ -2,25 +2,28 @@ package net.wanji.web.websocket; ...@@ -2,25 +2,28 @@ package net.wanji.web.websocket;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.websocket.*; import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.PathParam; import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
* @author duanruiming * @author duanruiming
* @date 2023/01/06 8:58 * @date 2023/01/06 8:58
*/ */
@ServerEndpoint("/websocket/eventWarn/{userId}") @ServerEndpoint("/websocket/{topic}/{userId}")
@Component @Component
@Slf4j @Slf4j
public class CommonEventWarnWebSocketServer { public class WebSocketServer {
/** /**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/ */
...@@ -28,12 +31,15 @@ public class CommonEventWarnWebSocketServer { ...@@ -28,12 +31,15 @@ public class CommonEventWarnWebSocketServer {
/** /**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/ */
@Getter private static ConcurrentHashMap<String, Set<WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, CommonEventWarnWebSocketServer> evenWarnWebSocketMap = new ConcurrentHashMap<>();
/** /**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据 * 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/ */
private Session session; private Session session;
/**
* 主题
*/
private String topic = "";
/** /**
* 接收userId * 接收userId
*/ */
...@@ -43,25 +49,31 @@ public class CommonEventWarnWebSocketServer { ...@@ -43,25 +49,31 @@ public class CommonEventWarnWebSocketServer {
* 连接建立成功调用的方法 * 连接建立成功调用的方法
*/ */
@OnOpen @OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) { public void onOpen(Session session, @PathParam("topic") String topic, @PathParam("userId") String userId) {
this.session = session; this.session = session;
this.topic = topic;
this.userId = userId; this.userId = userId;
//TODO evenWarnWebSocketMap中的key为userId 如果多个用户使用同一个userid登录 一个用户退出代表所有公用该接口的用户都退出了,需要将key改为sessionid int currentTopicSize = 0;
if (evenWarnWebSocketMap.containsKey(userId)) { if (webSocketMap.containsKey(topic)) {
evenWarnWebSocketMap.remove(userId); Set<WebSocketServer> set = webSocketMap.get(topic);
evenWarnWebSocketMap.put(userId, this); set.add(this);
webSocketMap.put(topic, set);
currentTopicSize = set.size();
//加入set中 //加入set中
} else { } else {
evenWarnWebSocketMap.put(userId, this); Set<WebSocketServer> set = new HashSet<>();
set.add(this);
webSocketMap.put(topic, set);
//加入set中 //加入set中
currentTopicSize = set.size();
addOnlineCount(); addOnlineCount();
//在线数加1 //在线数加1
} }
log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount()); log.info("当前主题:{}, socket连接:{}, 当前主题连接数量:{}, 所有socket连接数量:{}", topic, userId, currentTopicSize, getOnlineCount());
try { try {
sendMessage("连接成功"); sendMessage("连接成功");
} catch (IOException e) { } catch (IOException e) {
log.error("用户:" + userId + ",网络异常!!!!!!"); log.error("主题:{}, socket:{}, 网络异常!", topic, userId);
} }
} }
...@@ -70,24 +82,25 @@ public class CommonEventWarnWebSocketServer { ...@@ -70,24 +82,25 @@ public class CommonEventWarnWebSocketServer {
*/ */
@OnClose @OnClose
public void onClose() { public void onClose() {
if (evenWarnWebSocketMap.containsKey(userId)) { if (webSocketMap.containsKey(topic)) {
evenWarnWebSocketMap.remove(userId); webSocketMap.remove(topic);
//从set中删除 //从set中删除
subOnlineCount(); subOnlineCount();
} }
log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount()); log.info("{}主题连接关闭, 当前剩余连接数量:{}", topic, getOnlineCount());
} }
/** /**
* 收到客户端消息后调用的方法 * 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息 * @param message 客户端发送过来的消息
*/ */
@OnMessage @OnMessage
public void onMessage(String message, Session session) { public void onMessage(@PathParam("topic") String topic, @PathParam("userId") String userId, String message, Session session) {
log.info("用户消息:" + userId + ",报文:" + message); log.info("当前主题:{}, socket连接:{}, 消息:{}", topic, userId, message);
//可以群发消息 //可以群发消息
//消息保存到数据库、redis //消息保存到数据库、redis
if (StringUtils.isNotBlank(message)) { if (StringUtils.isNotBlank(topic) && StringUtils.isNotBlank(message)) {
try { try {
//解析发送的报文 //解析发送的报文
JSONObject jsonObject = JSON.parseObject(message); JSONObject jsonObject = JSON.parseObject(message);
...@@ -95,14 +108,18 @@ public class CommonEventWarnWebSocketServer { ...@@ -95,14 +108,18 @@ public class CommonEventWarnWebSocketServer {
jsonObject.put("fromUserId", this.userId); jsonObject.put("fromUserId", this.userId);
String toUserId = jsonObject.getString("toUserId"); String toUserId = jsonObject.getString("toUserId");
//传送给对应toUserId用户的websocket //传送给对应toUserId用户的websocket
if (StringUtils.isNotBlank(toUserId) && evenWarnWebSocketMap.containsKey(toUserId)) { if (webSocketMap.containsKey(topic)) {
evenWarnWebSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); Set<WebSocketServer> sets = webSocketMap.get(topic);
} else { for (WebSocketServer socket : sets) {
log.error("请求的userId:" + toUserId + "不在该服务器上"); String currentUserId = socket.getUserId();
//否则不在这个服务器上,发送到mysql或者redis if (StringUtils.equals(currentUserId, userId)) {
socket.sendMessage(message);
break;
}
}
} }
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage()); log.error("主题:{}, socket:{}, 发送消息失败, 异常信息:{}", topic, userId, e);
} }
} }
} }
...@@ -113,8 +130,7 @@ public class CommonEventWarnWebSocketServer { ...@@ -113,8 +130,7 @@ public class CommonEventWarnWebSocketServer {
*/ */
@OnError @OnError
public void onError(Session session, Throwable error) { public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); log.error("主题:{},socket:{},连接异常,异常信息:{}", topic, userId, error);
// error.printStackTrace();
} }
/** /**
...@@ -128,16 +144,44 @@ public class CommonEventWarnWebSocketServer { ...@@ -128,16 +144,44 @@ public class CommonEventWarnWebSocketServer {
/** /**
* 发送自定义消息 * 发送自定义消息
*/ */
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException { public static void sendInfo(String message, String topic, String userId) throws IOException {
//log.info("发送消息到:" + userId + ",报文:" + message); if (StringUtils.isNotBlank(topic) && webSocketMap.containsKey(topic)) {
if (StringUtils.isNotBlank(userId) && evenWarnWebSocketMap.containsKey(userId)) {
try { try {
evenWarnWebSocketMap.get(userId).sendMessage(message); Set<WebSocketServer> socketSets = webSocketMap.get(topic);
for (WebSocketServer socketSet : socketSets) {
String currentUserId = socketSet.getUserId();
if (StringUtils.equals(currentUserId, userId)) {
socketSet.sendMessage(message);
break;
}
}
} catch (Exception e) { } catch (Exception e) {
log.error("用户" + userId + ",不在线!{}", e.getMessage()); log.error("主题:{}, socket:{}, 发送消息异常, 异常信息:{}", topic, userId, e);
} }
} else { } else {
log.debug("用户" + userId + ",不在线!"); log.error("当前主题不存在:{}", topic);
}
}
/**
* 发送消息给所有连接
*
* @param message
* @param topic
* @throws IOException
*/
public static void sendInfo2All(String message, String topic) throws IOException {
if (StringUtils.isNotBlank(topic) && webSocketMap.containsKey(topic)) {
try {
Set<WebSocketServer> socketSets = webSocketMap.get(topic);
for (WebSocketServer socketSet : socketSets) {
socketSet.sendMessage(message);
}
} catch (Exception e) {
log.error("主题:{}, 发送消息异常, 异常信息:{}", topic, e);
}
} else {
log.error("当前主题不存在:{}", topic);
} }
} }
...@@ -146,10 +190,25 @@ public class CommonEventWarnWebSocketServer { ...@@ -146,10 +190,25 @@ public class CommonEventWarnWebSocketServer {
} }
public static synchronized void addOnlineCount() { public static synchronized void addOnlineCount() {
CommonEventWarnWebSocketServer.onlineCount++; WebSocketServer.onlineCount++;
} }
public static synchronized void subOnlineCount() { public static synchronized void subOnlineCount() {
CommonEventWarnWebSocketServer.onlineCount--; WebSocketServer.onlineCount--;
}
public String getUserId() {
return userId;
}
public String getTopic() {
return topic;
}
public static Set<WebSocketServer> getWebSocketSet(String topic) {
if (!webSocketMap.isEmpty()) {
return webSocketMap.get(topic);
}
return Collections.EMPTY_SET;
} }
} }
package net.wanji.common.dto;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* @author duanruiming
* @date 2023/03/21 10:45
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ApiModel(value = "SystemServiceStatusDTO", description = "系统服务状态实体")
public class SystemServiceStatusDTO {
@ApiModelProperty(value = "系统服务名",notes = "")
private String sysName;
@ApiModelProperty(value = "地址",notes = "")
private String ip;
@ApiModelProperty(value = "端口",notes = "")
private int port;
@ApiModelProperty(value = "超时时间",notes = "")
private int timeout;
@ApiModelProperty(value = "状态:0正常 1异常",notes = "")
private int status;
@ApiModelProperty(value = "开始时间",notes = "")
private Date startTime;
}
package net.wanji.common.utils.tool;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
* @author duanruiming
* @date 2023/03/21 10:32
*/
@Slf4j
public class SocketUtil {
/**
* 通过socket连接测试服务端口是否正常
*
* @param ip
* @param port
* @param timeout
* @return
* @throws Exception
*/
public static boolean telnet(String ip, int port, int timeout) throws Exception{
Socket socket = new Socket();
boolean status = false;
try {
socket.connect(new InetSocketAddress(ip, port), timeout);
status = socket.isConnected();
} catch (IOException e) {
log.error("当前IP:{}, port:{}, 连接失败,服务异常!", ip, port);
} finally {
try {
socket.close();
} catch (IOException e) {
log.error("当前IP:{}, port:{}, 关闭连接失败,服务异常!", ip, port);
}
}
return status;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment