Commit cc4dce64 authored by duanruiming's avatar duanruiming

信号机运行数据-万集统一标准接口服务

parent dd852437
......@@ -27,6 +27,10 @@
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
......
package net.wanji.utc.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author duanruiming
* @date 2023/01/06 8:55
*/
@Configuration
public class WebSocketConfig {
/**
* 注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
package net.wanji.utc.service.common;
package net.wanji.utc.service.rpcservice;
import net.wanji.utc.entity.JsonViewObject;
import net.wanji.utc.util.HttpRestUtil;
import java.util.Map;
import java.util.Objects;
/**
* @author duanruiming
......@@ -17,7 +19,11 @@ public interface rpcService {
* @param param
* @return
*/
default JsonViewObject rpcService(String url, Map<String, String> headers, String param) {
return JsonViewObject.newInstance();
default JsonViewObject rpcService(HttpRestUtil httpRestUtil, String url, Map<String, String> headers, String param) {
JsonViewObject jsonViewObject = httpRestUtil.doPost(url, headers, param, JsonViewObject.class);
if (Objects.isNull(jsonViewObject) || jsonViewObject.getCode() != 200) {
return jsonViewObject.fail("远程服务调用异常,url:".concat(url));
}
return jsonViewObject;
}
}
......@@ -12,5 +12,12 @@ import java.util.List;
public interface SignalStatusService {
List<SignalStatusLogPO> runningStatusAlarm();
/**
* todo 提供给其他平台,需要确认以厂商还是以路口形式推送数据
* @param crossId
* @return
*/
List<SignalStatusLogPO> runningStatusAlarm(String crossId);
List<LightsStatusVO> lightStatus();
}
package net.wanji.utc.service.runninginfo;
import net.wanji.utc.po.SignalStatusLogPO;
import net.wanji.utc.service.rpcservice.rpcService;
import net.wanji.utc.vo.LightsStatusVO;
import java.util.List;
/**
* @author duanruiming
* @date 2023/01/11 17:32
*/
public interface WanJiRunningStatusService extends rpcService {
/**
* 标准获取路口实时运行状态接口,包括告警数据
* @param crossIdList
* @return
*/
List<SignalStatusLogPO> runningStatus(List<String> crossIdList);
/**
* 标准获取路口实时灯态接口
* @param crossIdList
* @return
*/
List<LightsStatusVO> lightsStatus(List<String> crossIdList);
}
......@@ -15,6 +15,7 @@ import net.wanji.utc.po.SignalStatusLogPO;
import net.wanji.utc.service.runninginfo.HkLightsStatusService;
import net.wanji.utc.service.runninginfo.HkRunningStatusService;
import net.wanji.utc.service.runninginfo.SignalStatusService;
import net.wanji.utc.service.runninginfo.WanJiRunningStatusService;
import net.wanji.utc.util.ListUtil;
import net.wanji.utc.util.RedisUtil;
import net.wanji.utc.vo.LightsStatusVO;
......@@ -23,6 +24,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @date 2022/11/21 9:11
......@@ -36,6 +38,7 @@ public class SignalStatusServiceImpl implements SignalStatusService {
private final HkLightsStatusService hkLightsStatusService;
private final RedisUtil redisUtil;
private final SignalStatusLogMapper signalStatusLogMapper;
private final WanJiRunningStatusService wanJiRunningStatusService;
@Override
@Transactional
......@@ -44,14 +47,18 @@ public class SignalStatusServiceImpl implements SignalStatusService {
BasicEnum.ManufacturerEnum[] manufacturerEnums = BasicEnum.ManufacturerEnum.values();
for (BasicEnum.ManufacturerEnum manufacturerEnum : manufacturerEnums) {
List<BaseCrossInfo> baseCrossInfoList = getBaseCrossInfoList(manufacturerEnum);
if (baseCrossInfoList == null) continue;
if (baseCrossInfoList == null) {
continue;
}
List<SignalStatusLogPO> signalStatusLogPOList = null;
if(manufacturerEnum.getCode().equals(BasicEnum.ManufacturerEnum.HK.getCode())) {
if (manufacturerEnum.getCode().equals(BasicEnum.ManufacturerEnum.HK.getCode())) {
// 海康
// 3.2.5信号机运行状态和告警信息
signalStatusLogPOList = hkRunningStatusService.getHkRunningStatus(baseCrossInfoList);
} else {
// todo 其他厂商
List<String> crossIdList = baseCrossInfoList.stream().map(BaseCrossInfo::getCrossId).collect(Collectors.toList());
signalStatusLogPOList = wanJiRunningStatusService.runningStatus(crossIdList);
}
// 更新Redis
if (null != signalStatusLogPOList) {
......@@ -59,7 +66,7 @@ public class SignalStatusServiceImpl implements SignalStatusService {
String field = manufacturerEnum.getCode() + Constants.SEPARATOR_UNDER_LINE +
signalStatusLogPO.getSignalId();
redisUtil.setHash(RedisKeyConst.KEY_PREFIX + RedisKeyConst.TELESEME_STATUS, field,
signalStatusLogPO.getStatus()+"");
signalStatusLogPO.getStatus() + "");
}
// 更新日志数据库
if (ListUtil.isNotEmpty(signalStatusLogPOList)) {
......@@ -71,6 +78,12 @@ public class SignalStatusServiceImpl implements SignalStatusService {
return resList;
}
@Override
public List<SignalStatusLogPO> runningStatusAlarm(String crossId) {
List<SignalStatusLogPO> signalStatusLogPOList = new ArrayList<>();
return signalStatusLogPOList;
}
@Override
@Transactional
public List<LightsStatusVO> lightStatus() {
......@@ -78,14 +91,18 @@ public class SignalStatusServiceImpl implements SignalStatusService {
BasicEnum.ManufacturerEnum[] manufacturerEnums = BasicEnum.ManufacturerEnum.values();
for (BasicEnum.ManufacturerEnum manufacturerEnum : manufacturerEnums) {
List<BaseCrossInfo> baseCrossInfoList = getBaseCrossInfoList(manufacturerEnum);
if (baseCrossInfoList == null) continue;
if (baseCrossInfoList == null) {
continue;
}
List<LightsStatusVO> lightsStatusVOList = null;
if(manufacturerEnum.getCode().equals(BasicEnum.ManufacturerEnum.HK.getCode())) {
if (manufacturerEnum.getCode().equals(BasicEnum.ManufacturerEnum.HK.getCode())) {
// 海康
// 3.2.4灯态
lightsStatusVOList = hkLightsStatusService.getHkLightsStatus(baseCrossInfoList);
} else {
// todo 其他厂商
List<String> crossIdList = baseCrossInfoList.stream().map(BaseCrossInfo::getCrossId).collect(Collectors.toList());
lightsStatusVOList = wanJiRunningStatusService.lightsStatus(crossIdList);
}
// 更新Redis
if (lightsStatusVOList != null) {
......
package net.wanji.utc.service.runninginfo.impl;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.wanji.utc.entity.JsonViewObject;
import net.wanji.utc.po.SignalStatusLogPO;
import net.wanji.utc.service.runninginfo.WanJiRunningStatusService;
import net.wanji.utc.util.HttpRestUtil;
import net.wanji.utc.vo.LightsStatusVO;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* @author duanruiming
* @date 2023/01/11 17:34
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class WanJiRunningStatusServiceImpl implements WanJiRunningStatusService {
private final HttpRestUtil httpRestUtil;
@Value("url")
private String rpcUri;
@Override
public List<SignalStatusLogPO> runningStatus(List<String> crossIdList) {
JsonViewObject jsonViewObject = rpcService(httpRestUtil, rpcUri, null, JSONObject.toJSONString(crossIdList));
List<SignalStatusLogPO> result = (List<SignalStatusLogPO>) jsonViewObject.getContent();
return result;
}
@Override
public List<LightsStatusVO> lightsStatus(List<String> crossIdList) {
JsonViewObject jsonViewObject = rpcService(httpRestUtil, rpcUri, null, JSONObject.toJSONString(crossIdList));
List<LightsStatusVO> result = (List<LightsStatusVO>) jsonViewObject.getContent();
return result;
}
@Override
public JsonViewObject rpcService(HttpRestUtil httpRestUtil, String url, Map<String, String> headers, String param) {
return WanJiRunningStatusService.super.rpcService(httpRestUtil, url, headers, param);
}
}
package net.wanji.utc.service.common;
package net.wanji.utc.service.staticinfo;
import net.wanji.utc.common.Result;
import net.wanji.utc.po.CrossSchedulesPO;
import net.wanji.utc.service.rpcservice.rpcService;
import net.wanji.utc.vo.PlanSectionVO;
import net.wanji.utc.vo.SchemePhaseLightsVO;
......
......@@ -8,7 +8,7 @@ import net.wanji.utc.dto.PlanSectionDTO;
import net.wanji.utc.dto.SchemePhaseLightsDTO;
import net.wanji.utc.mapper.*;
import net.wanji.utc.po.*;
import net.wanji.utc.service.common.WanjiCommonStaticInfoService;
import net.wanji.utc.service.staticinfo.WanjiCommonStaticInfoService;
import net.wanji.utc.service.staticinfo.*;
import net.wanji.utc.vo.PlanSectionVO;
import net.wanji.utc.vo.SchemePhaseLightsVO;
......
package net.wanji.utc.service.common.impl;
package net.wanji.utc.service.staticinfo.impl;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
......@@ -7,7 +7,7 @@ import net.wanji.utc.common.Result;
import net.wanji.utc.entity.JsonViewObject;
import net.wanji.utc.mapper.*;
import net.wanji.utc.po.*;
import net.wanji.utc.service.common.WanjiCommonStaticInfoService;
import net.wanji.utc.service.staticinfo.WanjiCommonStaticInfoService;
import net.wanji.utc.util.HttpRestUtil;
import net.wanji.utc.vo.PlanSectionVO;
import net.wanji.utc.vo.SchemePhaseLightsVO;
......@@ -44,7 +44,7 @@ public class WanjiCommonStaticInfoServiceImpl implements WanjiCommonStaticInfoSe
@Override
public Result<SchemePhaseLightsVO> schemePhaseLights(String crossId, String signalId) {
Map<String, String> commonParamMap = getCommonParamMap(crossId, signalId);
JsonViewObject jsonViewObject = rpcService(rpcUri, null, JSONObject.toJSONString(commonParamMap));
JsonViewObject jsonViewObject = rpcService(httpRestUtil, rpcUri, null, JSONObject.toJSONString(commonParamMap));
SchemePhaseLightsVO content = (SchemePhaseLightsVO) jsonViewObject.getContent();
if (Objects.nonNull(content)) {
List<CrossSchemePO> crossSchemeList = content.getCrossSchemeList();
......@@ -78,7 +78,7 @@ public class WanjiCommonStaticInfoServiceImpl implements WanjiCommonStaticInfoSe
paramMap.put("crossId", crossId);
paramMap.put("planNo", String.valueOf(planNo));
paramMap.put("signalId", signalId);
JsonViewObject jsonViewObject = rpcService(rpcUri, null, JSONObject.toJSONString(paramMap));
JsonViewObject jsonViewObject = rpcService(httpRestUtil, rpcUri, null, JSONObject.toJSONString(paramMap));
PlanSectionVO content = (PlanSectionVO) jsonViewObject.getContent();
if (Objects.nonNull(content)) {
if (Objects.nonNull(content)) {
......@@ -101,7 +101,7 @@ public class WanjiCommonStaticInfoServiceImpl implements WanjiCommonStaticInfoSe
@Override
public Result<List<CrossSchedulesPO>> schedules(String crossId, String signalId) {
Map<String, String> commonParamMap = getCommonParamMap(crossId, signalId);
JsonViewObject jsonViewObject = rpcService(rpcUri, null, JSONObject.toJSONString(commonParamMap));
JsonViewObject jsonViewObject = rpcService(httpRestUtil, rpcUri, null, JSONObject.toJSONString(commonParamMap));
List<CrossSchedulesPO> content = (List<CrossSchedulesPO>) jsonViewObject.getContent();
if (Objects.nonNull(content)) {
if (!CollectionUtils.isEmpty(content)) {
......@@ -122,13 +122,7 @@ public class WanjiCommonStaticInfoServiceImpl implements WanjiCommonStaticInfoSe
}
@Override
public JsonViewObject rpcService(String url, Map<String, String> headers, String param) {
JsonViewObject jsonViewObject = httpRestUtil.doPost(url, headers, param, JsonViewObject.class);
if (Objects.isNull(jsonViewObject) || jsonViewObject.getCode() != 200) {
log.error("远程服务调用异常,url:".concat(rpcUri));
return jsonViewObject.fail("远程服务调用异常,url:".concat(rpcUri));
}
return jsonViewObject;
public JsonViewObject rpcService(HttpRestUtil httpRestUtil, String url, Map<String, String> headers, String param) {
return WanjiCommonStaticInfoService.super.rpcService(httpRestUtil, url, headers, param);
}
}
package net.wanji.utc.task;
import net.wanji.utc.po.SignalStatusLogPO;
import net.wanji.utc.service.runninginfo.SignalStatusService;
import net.wanji.utc.vo.LightsStatusVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 订阅信号运行状态、告警、灯态
*
......@@ -19,12 +23,12 @@ public class SignalStatusTask {
// 运行状态、告警,5分钟一次
@Scheduled(fixedRate = 5 * 60 * 1000)
public void runningStatusAlarm() {
signalStatusService.runningStatusAlarm();
List<SignalStatusLogPO> signalStatusLogPOS = signalStatusService.runningStatusAlarm();
}
// 灯态,5秒一次
@Scheduled(fixedRate = 5 * 1000)
public void lightStatus() {
signalStatusService.lightStatus();
List<LightsStatusVO> lightsStatusVOS = signalStatusService.lightStatus();
}
}
package net.wanji.utc.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author duanruiming
* @date 2023/01/06 8:58
*/
@ServerEndpoint("/websocket/realTimeData/{crossId}")
@Component
@Slf4j
public class RealTimeDataWebSocket {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
@Getter
private static ConcurrentHashMap<String, RealTimeDataWebSocket> evenWarnWebSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收crossId
*/
private String crossId = "";
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("crossId") String crossId) {
this.session = session;
this.crossId = crossId;
//TODO evenWarnWebSocketMap中的key为crossId 如果多个用户使用同一个crossId登录 一个用户退出代表所有公用该接口的用户都退出了,需要将key改为sessionid
if (evenWarnWebSocketMap.containsKey(crossId)) {
evenWarnWebSocketMap.remove(crossId);
evenWarnWebSocketMap.put(crossId, this);
//加入set中
} else {
evenWarnWebSocketMap.put(crossId, this);
//加入set中
addOnlineCount();
//在线数加1
}
log.info("用户连接:" + crossId + ",当前在线人数为:" + getOnlineCount());
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("用户:" + crossId + ",网络异常!!!!!!");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (evenWarnWebSocketMap.containsKey(crossId)) {
evenWarnWebSocketMap.remove(crossId);
//从set中删除
subOnlineCount();
}
log.info("用户退出:" + crossId + ",当前在线人数为:" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("用户消息:" + crossId + ",报文:" + message);
//可以群发消息
//消息保存到数据库、redis
if (StringUtils.isNotBlank(message)) {
try {
//解析发送的报文
JSONObject jsonObject = JSON.parseObject(message);
//追加发送人(防止串改)
jsonObject.put("fromcrossId", this.crossId);
String tocrossId = jsonObject.getString("tocrossId");
//传送给对应tocrossId用户的websocket
if (StringUtils.isNotBlank(tocrossId) && evenWarnWebSocketMap.containsKey(tocrossId)) {
evenWarnWebSocketMap.get(tocrossId).sendMessage(jsonObject.toJSONString());
} else {
log.error("请求的crossId:" + tocrossId + "不在该服务器上");
//否则不在这个服务器上,发送到mysql或者redis
}
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.crossId + ",原因:" + error.getMessage());
// error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 发送自定义消息
*/
public static void sendInfo(String message, @PathParam("crossId") String crossId) throws IOException {
//log.info("发送消息到:" + crossId + ",报文:" + message);
if (StringUtils.isNotBlank(crossId) && evenWarnWebSocketMap.containsKey(crossId)) {
try {
evenWarnWebSocketMap.get(crossId).sendMessage(message);
} catch (Exception e) {
log.error("用户" + crossId + ",不在线!{}", e.getMessage());
}
} else {
log.debug("用户" + crossId + ",不在线!");
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
RealTimeDataWebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
RealTimeDataWebSocket.onlineCount--;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment