Commit 021b9fbe authored by duanruiming's avatar duanruiming

通过websocket推送告警数据给前端页面作为弹窗

parent 3b712665
......@@ -28,6 +28,10 @@
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
......
package net.wanji.web.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author duanruiming
* @date 2023/01/06 8:55
*/
@Configuration
public class WebSocketConfig {
/**
* 注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
......@@ -3,26 +3,17 @@ package net.wanji.web.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import net.wanji.web.common.entity.JsonViewObject;
import net.wanji.web.common.util.StringUtils;
import net.wanji.web.entity.TBaseCrossInfo;
import net.wanji.web.service.SituationDetectionService;
import net.wanji.web.util.HttpRestUtil;
import net.wanji.web.vo.situationDetection.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import javax.ws.rs.core.MediaType;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static net.wanji.web.util.HttpRestUtil.buildHeader;
/**
......@@ -36,14 +27,6 @@ import static net.wanji.web.util.HttpRestUtil.buildHeader;
@RestController
public class SituationDetectionController extends BaseController {
@Autowired
private HttpRestUtil httpRestUtil;
@Value("${utcServiceUrl}")
private String utcServiceUrl;
@Value("${syncDeviceSatusUrn}")
private String syncDeviceSatusUrl;
@Autowired
SituationDetectionService situationDetectionService;
......@@ -172,44 +155,4 @@ public class SituationDetectionController extends BaseController {
return jsonViewObject.success(list);
}
@ApiOperation(value = "统一事件告警检测", notes = "统一事件告警检测", response = JsonViewObject.class,
produces = MediaType.APPLICATION_JSON)
@GetMapping(value = "commonEventWarnNotify", produces = MediaType.APPLICATION_JSON)
public JsonViewObject commonEventWarnNotify() {
// 路口报警
// 事件报警
// 信号机故障
Map<String, String> header = buildHeader();
JsonViewObject jsonViewObject = httpRestUtil.doPost(utcServiceUrl.concat(syncDeviceSatusUrl), header, "", JsonViewObject.class);
if (Objects.isNull(jsonViewObject) || jsonViewObject.getCode() != 200) {
return jsonViewObject.fail("远程服务调用异常".concat(utcServiceUrl.concat(syncDeviceSatusUrl)));
}
LocalDateTime now = LocalDateTime.now();
String formatNow = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(now);
List<Map<String, Object>> content = (List<Map<String, Object>>) jsonViewObject.getContent();
Map<String, Object> result = content.get(0);
String signalId = (String) result.get("signalId");
String crossId = (String) result.get("crossId");
Integer faultType = (Integer) result.get("faultType");
if (Objects.nonNull(faultType) && faultType != 0 && StringUtils.isNotEmpty(crossId)) {
CommonEventAlarmOutVo commonEventAlarmOutVo = new CommonEventAlarmOutVo();
commonEventAlarmOutVo.setEventType(3);
commonEventAlarmOutVo.setEventId(signalId);
commonEventAlarmOutVo.setEventName("信号机故障");// todo 信号机厂商
commonEventAlarmOutVo.setEventDesc("信号机故障");
commonEventAlarmOutVo.setStatus(String.valueOf(faultType));
commonEventAlarmOutVo.setStartTime(formatNow);
TBaseCrossInfo tBaseCrossInfo = situationDetectionService.selectCrossInfoById(crossId);
if (Objects.nonNull(tBaseCrossInfo)) {
commonEventAlarmOutVo.setCoordinate(tBaseCrossInfo.getLocation());
}
jsonViewObject.setContent(commonEventAlarmOutVo);
} else {
jsonViewObject.setContent(null);
jsonViewObject.setMessage("无告警数据");
}
return jsonViewObject;
}
}
......@@ -21,8 +21,10 @@ public class TDeviceStatusInfo {
private String name;
@ApiModelProperty(value = "设备类型:1信号;2卡口;3地磁;4视频;5微波;6激光;7电警;8 MEC")
private int type;
@ApiModelProperty(value = "设备状态:0离线;1在线;2故障")
@ApiModelProperty(value = "设备状态:0离线;1在线;")
private int status;
@ApiModelProperty(value = "信号机故障类型:0正常;1检测器故障;2时钟故障;3电源故障;4驱动模块故障;5信号灯故障;6箱门开启;7方案错误;8绿冲突;9红全熄;10行人红熄;",notes = "")
private int faultType ;
@ApiModelProperty(value = "创建时间")
private Date gmtCreate ;
}
package net.wanji.web.task;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.web.common.entity.JsonViewObject;
import net.wanji.web.common.util.StringUtils;
import net.wanji.web.entity.TBaseCrossInfo;
import net.wanji.web.entity.TDeviceStatusInfo;
import net.wanji.web.mapper.TDeviceStatusMapper;
import net.wanji.web.service.SituationDetectionService;
import net.wanji.web.util.HttpRestUtil;
import net.wanji.web.vo.situationDetection.CommonEventAlarmOutVo;
import net.wanji.web.websocket.CommonEventWarnWebSocketServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import static net.wanji.web.util.HttpRestUtil.buildHeader;
/**
* @author duanruiming
* @date 2023/01/06 10:45
*/
@Component
@Slf4j
public class CommonEventWarnTask {
@Autowired
private HttpRestUtil httpRestUtil;
@Value("${utcServiceUrl}")
private String utcServiceUrl;
@Value("${syncDeviceSatusUrn}")
private String syncDeviceSatusUrl;
@Autowired
private SituationDetectionService situationDetectionService;
@Autowired
private TDeviceStatusMapper tDeviceStatusMapper;
@Scheduled(fixedRate = 60 * 1000)
public void pushEvenWarnData() {
try {
ConcurrentHashMap<String, CommonEventWarnWebSocketServer> evenWarnWebSocketMap = CommonEventWarnWebSocketServer.getEvenWarnWebSocketMap();
if (!evenWarnWebSocketMap.isEmpty()) {
for (Map.Entry<String, CommonEventWarnWebSocketServer> entry : evenWarnWebSocketMap.entrySet()) {
String key = entry.getKey();
getCommonEnventWarnData(key);
}
}
} catch (Exception e) {
log.error("统一推送事件告警数据失败!", e);
}
}
private void getCommonEnventWarnData(String key) throws IOException {
// 信号机故障
JsonViewObject signalWarnData = getSignalWarnData();
if (Objects.nonNull(signalWarnData)) {
CommonEventWarnWebSocketServer.sendInfo(JSONObject.toJSONString(signalWarnData), key);
}
// 路口告警
JsonViewObject crossWarnData = getCrossWarnData();
// 事件告警
JsonViewObject crossEventWarnData = getCrossEventWarnData();
}
private JsonViewObject getCrossEventWarnData() {
return null;
}
private JsonViewObject getCrossWarnData() {
return null;
}
/**
* 信号机故障告警数据
*
* @return
*/
private JsonViewObject getSignalWarnData() {
List<CommonEventAlarmOutVo> results = new ArrayList<>();
Map<String, String> header = buildHeader();
JsonViewObject jsonViewObject = httpRestUtil.doPost(utcServiceUrl.concat(syncDeviceSatusUrl), header, "", JsonViewObject.class);
if (Objects.isNull(jsonViewObject) || jsonViewObject.getCode() != 200) {
return jsonViewObject.fail("远程服务调用异常".concat(utcServiceUrl.concat(syncDeviceSatusUrl)));
}
LocalDateTime now = LocalDateTime.now();
String formatNow = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(now);
List<Map<String, Object>> content = (List<Map<String, Object>>) jsonViewObject.getContent();
content.forEach(map -> {
Map<String, Object> result = map;
String signalId = (String) result.get("signalId");
String crossId = (String) result.get("crossId");
Integer currentFaultType = (Integer) result.get("faultType");
// 测试数据
if (Objects.equals(crossId, "13FNK0C6790")) {
currentFaultType = 1;
}
String coordinate = "";
int faultTypeDB = 0;
if (StringUtils.isNotEmpty(crossId)) {
TBaseCrossInfo tBaseCrossInfo = situationDetectionService.selectCrossInfoById(crossId);
if (Objects.nonNull(tBaseCrossInfo)) {
coordinate = tBaseCrossInfo.getLocation();
}
QueryWrapper<TDeviceStatusInfo> queryWrapper = new QueryWrapper<>();
TDeviceStatusInfo tDeviceStatusInfo = tDeviceStatusMapper.selectOne(queryWrapper.eq("code", crossId));
if (Objects.nonNull(tDeviceStatusInfo)) {
faultTypeDB = tDeviceStatusInfo.getFaultType();
}
}
if (Objects.nonNull(currentFaultType) && currentFaultType != faultTypeDB) {
CommonEventAlarmOutVo commonEventAlarmOutVo = new CommonEventAlarmOutVo();
commonEventAlarmOutVo.setEventType(3);
commonEventAlarmOutVo.setEventId(signalId);
commonEventAlarmOutVo.setEventName("信号机故障");// todo 信号机厂商
commonEventAlarmOutVo.setEventDesc("信号机故障");
commonEventAlarmOutVo.setStatus(String.valueOf(currentFaultType));
commonEventAlarmOutVo.setStartTime(formatNow);
commonEventAlarmOutVo.setCoordinate(coordinate);
results.add(commonEventAlarmOutVo);
}
});
// 无告警数据
if (CollectionUtils.isEmpty(results)) {
return null;
}
jsonViewObject.setTimestamp(now.toInstant(ZoneOffset.of("+8")).toEpochMilli());
jsonViewObject.setContent(results);
return jsonViewObject;
}
}
package net.wanji.web.task;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.web.common.entity.JsonViewObject;
import net.wanji.web.common.util.StringUtils;
import net.wanji.web.entity.TDeviceStatusInfo;
import net.wanji.web.mapper.TDeviceStatusMapper;
import net.wanji.web.util.HttpRestUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
......@@ -17,17 +16,14 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import static net.wanji.web.util.HttpRestUtil.buildHeader;
/**
* @author duanruiming
* @date 2023/01/03 10:43
*/
@Component
@Slf4j
public class SignalStatusTask {
private static final Logger LOGGER = LoggerFactory.getLogger(SignalStatusTask.class);
@Autowired
private HttpRestUtil httpRestUtil;
......@@ -39,29 +35,38 @@ public class SignalStatusTask {
@Autowired
private TDeviceStatusMapper tDeviceStatusMapper;
@Scheduled(fixedRate = 5 * 60 * 1000)
@Scheduled(fixedRate = 1 * 60 * 1000)
public void syncSignalStatus() {
try {
// 远程通过UTC服务获取信号机状态
Map<String, String> header = buildHeader();
JsonViewObject jsonViewObject = httpRestUtil.doPost(utcServiceUrl.concat(syncDeviceSatusUrl), header, "", JsonViewObject.class);
JsonViewObject jsonViewObject = httpRestUtil.doPost(utcServiceUrl.concat(syncDeviceSatusUrl), null, "", JsonViewObject.class);
if (Objects.isNull(jsonViewObject) || jsonViewObject.getCode() != 200) {
LOGGER.error("定时任务同步信号机设备状态远程utcService调用异常,url:".concat(utcServiceUrl.concat("/signalStatus/runningStatusAlarm")));
log.error("定时任务同步信号机设备状态远程utcService调用异常,url:".concat(utcServiceUrl.concat("/signalStatus/runningStatusAlarm")));
return;
}
List<Map<String, Object>> content = (List<Map<String, Object>>) jsonViewObject.getContent();
Map<String, Object> result = content.get(0);
String signalId = (String) result.get("signalId");
Integer currentSignalStatus = (Integer) result.get("status");
Integer currentSignalStatus = result.get("status") == null ? 0 : (Integer) result.get("status");
Integer currentFaultType = result.get("faultType") == null ? 0 : (Integer) result.get("faultType");
// 当前服务数据库状态
LambdaQueryWrapper<TDeviceStatusInfo> lambdaQueryWrapper = new LambdaQueryWrapper<>();
if (StringUtils.isNotEmpty(signalId)) {
lambdaQueryWrapper.eq(TDeviceStatusInfo::getCode, signalId);
TDeviceStatusInfo tDeviceStatusInfo = tDeviceStatusMapper.selectOne(lambdaQueryWrapper);
if (Objects.nonNull(tDeviceStatusInfo) && currentSignalStatus != tDeviceStatusInfo.getStatus()) {
if (isExucuteUpdate(currentSignalStatus, currentFaultType, tDeviceStatusInfo)) {
tDeviceStatusInfo.setStatus(currentSignalStatus);
tDeviceStatusInfo.setFaultType(currentFaultType);
tDeviceStatusMapper.updateById(tDeviceStatusInfo);
}
}
} catch (Exception e) {
log.error("定时任务同步信号机设备状态远程utcService调用异常", e);
}
}
private static boolean isExucuteUpdate(Integer currentSignalStatus, Integer currentFaultType, TDeviceStatusInfo tDeviceStatusInfo) {
return Objects.nonNull(tDeviceStatusInfo) && (currentSignalStatus != tDeviceStatusInfo.getStatus() || currentFaultType != tDeviceStatusInfo.getFaultType());
}
}
package net.wanji.web.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.PathParam;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author duanruiming
* @date 2023/01/06 8:58
*/
@ServerEndpoint("/websocket/eventWarn/{userId}")
@Component
@Slf4j
public class CommonEventWarnWebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
@Getter
private static ConcurrentHashMap<String, CommonEventWarnWebSocketServer> evenWarnWebSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userId = "";
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
//TODO evenWarnWebSocketMap中的key为userId 如果多个用户使用同一个userid登录 一个用户退出代表所有公用该接口的用户都退出了,需要将key改为sessionid
if (evenWarnWebSocketMap.containsKey(userId)) {
evenWarnWebSocketMap.remove(userId);
evenWarnWebSocketMap.put(userId, this);
//加入set中
} else {
evenWarnWebSocketMap.put(userId, this);
//加入set中
addOnlineCount();
//在线数加1
}
log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("用户:" + userId + ",网络异常!!!!!!");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (evenWarnWebSocketMap.containsKey(userId)) {
evenWarnWebSocketMap.remove(userId);
//从set中删除
subOnlineCount();
}
log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("用户消息:" + userId + ",报文:" + message);
//可以群发消息
//消息保存到数据库、redis
if (StringUtils.isNotBlank(message)) {
try {
//解析发送的报文
JSONObject jsonObject = JSON.parseObject(message);
//追加发送人(防止串改)
jsonObject.put("fromUserId", this.userId);
String toUserId = jsonObject.getString("toUserId");
//传送给对应toUserId用户的websocket
if (StringUtils.isNotBlank(toUserId) && evenWarnWebSocketMap.containsKey(toUserId)) {
evenWarnWebSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
} else {
log.error("请求的userId:" + toUserId + "不在该服务器上");
//否则不在这个服务器上,发送到mysql或者redis
}
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
// error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 发送自定义消息
*/
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
//log.info("发送消息到:" + userId + ",报文:" + message);
if (StringUtils.isNotBlank(userId) && evenWarnWebSocketMap.containsKey(userId)) {
try {
evenWarnWebSocketMap.get(userId).sendMessage(message);
} catch (Exception e) {
log.error("用户" + userId + ",不在线!{}", e.getMessage());
}
} else {
log.debug("用户" + userId + ",不在线!");
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
CommonEventWarnWebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
CommonEventWarnWebSocketServer.onlineCount--;
}
}
......@@ -409,7 +409,7 @@ public class HKControlCommandServiceImpl implements ControlCommandService {
param.put("laneInfos", laneList);
param.put("controlNo", 0);
// 控制时长,单位 s,0 则持续控制
param.put("duration", Objects.isNull(commandVO.getDuration()) ? 999 : commandVO.getDuration());
param.put("duration", Objects.isNull(commandVO.getDuration()) || commandVO.getDuration() == 0 ? 999 : commandVO.getDuration());
String strResult = ArtemisHttpUtil.doPostStringArtemis(artemisConfig,
PathUtil.getPathMapByApiCode("setSignalControl"),
JSON.toJSONString(param), null, null, "application/json", null);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment