Commit f1c25dc0 authored by zhoushiguang's avatar zhoushiguang

Merge remote-tracking branch 'origin/master'

parents 142c9b32 bc212002
......@@ -23,9 +23,11 @@ import net.wanji.utc.cache.CrossPhaseDirTurnCache;
import net.wanji.utc.cache.CrossSchemePhaseTimeCountCache;
import net.wanji.utc.common.typeenum.BasicEnum;
import net.wanji.utc.common.typeenum.DateStyle;
import net.wanji.utc.common.typeenum.HttpCode;
import net.wanji.utc.po.hk.request.DelBaseConfigPO;
import net.wanji.utc.service.control.ControlCommandService;
import net.wanji.utc.service.control.ControlCommandStrategyService;
import net.wanji.utc.task.SignalCommandSyncTask;
import net.wanji.utc.task.SignalStatusTask;
import net.wanji.utc.util.StringUtils;
import org.springframework.beans.BeanUtils;
......@@ -40,8 +42,12 @@ import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Field;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author duanruiming
......@@ -244,58 +250,99 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
String now = LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER);
JsonViewObject jsonViewObject;
try {
String manufacturerIdCode = crossInfoCache.getManufacturerCodeByCrossId(commandVO.getCrossCode());
if (StringUtils.equals(BasicEnum.ManufacturerEnum.HK.getCode(), manufacturerIdCode)) {
jsonViewObject = hkControlCommandService.lockControl(commandVO);
} else {
jsonViewObject = wanJiControlCommandService.lockControl(commandVO);
//步进时,需判断信号机是否在线和控制模式
JsonViewObject viewObject = checkControlStatus(commandVO.getCrossCode());
if (ObjectUtil.isNull(viewObject)) {
String manufacturerIdCode = crossInfoCache.getManufacturerCodeByCrossId(commandVO.getCrossCode());
if (StringUtils.equals(BasicEnum.ManufacturerEnum.HK.getCode(), manufacturerIdCode)) {
jsonViewObject = hkControlCommandService.lockControl(commandVO);
} else {
jsonViewObject = wanJiControlCommandService.lockControl(commandVO);
}
}else {
jsonViewObject = viewObject;
}
} catch (Exception e) {
jsonViewObject = JsonViewObject.newInstance().fail(e.getMessage());
}
JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(commandVO));
jsonObject.put("crossId",commandVO.getCrossCode());
jsonObject.put("crossId", commandVO.getCrossCode());
//插入命令操作日志
insertCommandLog(now,jsonViewObject, jsonObject,3);
insertCommandLog(now, jsonViewObject, jsonObject, 3);
return jsonViewObject;
}
@Override
public JsonViewObject stepControlStrategy(String crossId, Integer command, Integer stepNum) throws Exception {
JsonViewObject jsonViewObject;
JsonViewObject jsonViewObject = null;
try {
String now = LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER);
try {
String manufacturerIdCode = crossInfoCache.getManufacturerCodeByCrossId(crossId);
if (StringUtils.equals(BasicEnum.ManufacturerEnum.HK.getCode(), manufacturerIdCode)) {
jsonViewObject = hkControlCommandService.stepControl(crossId, command, stepNum);
//步进时,需判断信号机是否在线和控制模式
JsonViewObject viewObject = checkControlStatus(crossId);
if (ObjectUtil.isNull(viewObject)) {
String manufacturerIdCode = crossInfoCache.getManufacturerCodeByCrossId(crossId);
if (StringUtils.equals(BasicEnum.ManufacturerEnum.HK.getCode(), manufacturerIdCode)) {
jsonViewObject = hkControlCommandService.stepControl(crossId, command, stepNum);
} else {
jsonViewObject = wanJiControlCommandService.stepControl(crossId, command, stepNum);
}
} else {
jsonViewObject = wanJiControlCommandService.stepControl(crossId, command, stepNum);
jsonViewObject = viewObject;
}
} catch (Exception e) {
jsonViewObject = JsonViewObject.newInstance().fail(e.getMessage());
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("crossId",crossId);
jsonObject.put("command",command);
jsonObject.put("stepNum",stepNum);
jsonObject.put("crossId", crossId);
jsonObject.put("command", command);
jsonObject.put("stepNum", stepNum);
//插入命令操作日志
insertCommandLog(now,jsonViewObject, jsonObject,1);
insertCommandLog(now, jsonViewObject, jsonObject, 1);
} catch (Exception e) {
log.error("stepControlStrategy步进控制远程服务调用异常:", e);
jsonViewObject = JsonViewObject.newInstance().fail("步进控制远程服务调用异常");
jsonViewObject = JsonViewObject.newInstance().fail("步进控制远程服务调用异常");
}
return jsonViewObject;
}
/**
* @return net.wanji.common.framework.rest.JsonViewObject
* @Description 监测信号机状态
* @Param [crossId] 路口编号
**/
private JsonViewObject checkControlStatus(String crossId) {
JsonViewObject jsonViewObject = null;
List<SignalCommandPO> signalCommandPOS = SignalCommandSyncTask.signalStatusRunModelList;
if (ObjectUtil.isNotEmpty(signalCommandPOS)) {
List<SignalCommandPO> collect = signalCommandPOS.stream().filter(x -> x.getCrossId().equals(crossId)).collect(Collectors.toList());
if (ObjectUtil.isNotEmpty(collect)) {
SignalCommandPO signalCommandPO = collect.get(0);
Integer status = signalCommandPO.getStatus();
String runMode = signalCommandPO.getRunMode();
if (status == 0) {
jsonViewObject = JsonViewObject.newInstance().fail("该路口处于离线状态,禁止执行指令");
jsonViewObject.setCode(HttpCode.NOT_IMPLEMENTED.value());
} else if ("31".equals(runMode)) {
jsonViewObject = JsonViewObject.newInstance().fail("该路口控制模式为手动控制,禁止执行指令");
jsonViewObject.setCode(HttpCode.NOT_IMPLEMENTED.value());
}
}
}
return jsonViewObject;
}
/**
* @Description 插入命令日志
* @Param nowTime 请求时间戳
* @Param jsonViewObject 操作结果
* @Param jsonObject 参数
* @return void
**/
public void insertCommandLog(String queryTime,JsonViewObject jsonViewObject, JSONObject jsonObject,int commandType) throws Exception{
* @return void
* @Description 插入命令日志
* @Param nowTime 请求时间戳
* @Param jsonViewObject 操作结果
* @Param jsonObject 参数
**/
public void insertCommandLog(String queryTime, JsonViewObject jsonViewObject, JSONObject jsonObject, int commandType) throws Exception {
try {
SignalCommandLogPO signalCommandLogPO = new SignalCommandLogPO();
signalCommandLogPO.setCrossId(jsonObject.getString("crossId"));
......@@ -306,11 +353,11 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String ip = IpAddressUtil.getIpAddress(request);
signalCommandLogPO.setIp(ip);
log.info("insertCommandLog服务器请求返回 信息jsonViewObject:{}",jsonViewObject);
log.info("insertCommandLog服务器请求返回 信息jsonViewObject:{}", jsonViewObject);
int commandResult = 0;
if (code == 200){
if (code == 200) {
commandResult = 1;
}else {
} else {
commandResult = 2;
}
signalCommandLogPO.setCommandResult(commandResult);
......@@ -319,7 +366,7 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
ConcurrentHashMap<String, List<LightsStatusVO2>> produceListMap = SignalStatusTask.produceListMap;
if (!produceListMap.isEmpty()) {
List<LightsStatusVO2> list = produceListMap.get(jsonObject.get("crossId"));
if (ObjectUtil.isNotEmpty(list)){
if (ObjectUtil.isNotEmpty(list)) {
LightsStatusVO2 lightsStatusVO2 = list.get(0);
String runMode = lightsStatusVO2.getRunMode();
String lampTime = lightsStatusVO2.getTimeStamp();
......@@ -329,24 +376,27 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
}
int insert = signalCommandLogPOMapper.insert(signalCommandLogPO);
if (insert > 0) {
log.info("定时任务 插入t_signal_command_log成功,crossId = {},commandType = {},time = {},insert = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER),insert);
}else {
log.info("定时任务 插入t_signal_command_log失败,crossId = {},commandType = {},time = {},insert = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER),insert);
log.info("定时任务 插入t_signal_command_log成功,crossId = {},commandType = {},time = {},insert = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER), insert);
} else {
log.info("定时任务 插入t_signal_command_log失败,crossId = {},commandType = {},time = {},insert = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER), insert);
}
//排除查询环图接口
if (commandType != 10){
if (commandType != 10) {
LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(SignalCommandPO::getCrossId,jsonObject.getString("crossId"));
lambdaQueryWrapper.eq(SignalCommandPO::getCrossId, jsonObject.getString("crossId"));
List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper);
if (ObjectUtil.isEmpty(signalCommandPOS)){
if (ObjectUtil.isEmpty(signalCommandPOS)) {
//插入
SignalCommandPO signalCommandPO = new SignalCommandPO();
BeanUtil.copyProperties(signalCommandLogPO,signalCommandPO);
BeanUtil.copyProperties(signalCommandLogPO, signalCommandPO);
if (code == HttpCode.NOT_IMPLEMENTED.value()) {
signalCommandPO.setCommandResult(1);
}
int insert1 = signalCommandPOMapper.insert(signalCommandPO);
if (insert1 > 0) {
log.info("定时任务 插入t_signal_command成功,crossId = {},commandType = {},time = {},insert = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER),insert1);
}else {
log.info("定时任务 插入t_signal_command失败,crossId = {},commandType = {},time = {},insert = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER),insert1);
log.info("定时任务 插入t_signal_command成功,crossId = {},commandType = {},time = {},insert = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER), insert1);
} else {
log.info("定时任务 插入t_signal_command失败,crossId = {},commandType = {},time = {},insert = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER), insert1);
}
} else {
//更新
......@@ -358,16 +408,19 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
signalCommandPO.setIp(ip);
signalCommandPO.setQueryTime(queryTime);
signalCommandPO.setUpdateCount(0);
if (code == HttpCode.NOT_IMPLEMENTED.value()) {
signalCommandPO.setCommandResult(1);
}
int update = signalCommandPOMapper.update(signalCommandPO, lambdaQueryWrapper);
if (update > 0) {
log.info("定时任务 更新t_signal_command成功,crossId = {},commandType = {},time = {},update = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER),update);
}else {
log.info("定时任务 更新t_signal_command失败,crossId = {},commandType = {},time = {},update = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER),update);
log.info("定时任务 更新t_signal_command成功,crossId = {},commandType = {},time = {},update = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER), update);
} else {
log.info("定时任务 更新t_signal_command失败,crossId = {},commandType = {},time = {},update = {}", jsonObject.getString("crossId"), commandType, LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER), update);
}
}
}
LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(SignalCommandPO::getCrossId,jsonObject.getString("crossId"));
lambdaQueryWrapper.eq(SignalCommandPO::getCrossId, jsonObject.getString("crossId"));
List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper);
log.error("更新t_signal_command成功,数据库中查询结果集:{}", signalCommandPOS);
} catch (Exception e) {
......@@ -396,19 +449,25 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
return jsonViewObject.fail("路口编号不能为空");
}
try {
String manufacturerIdCode = crossInfoCache.getManufacturerCodeByCrossId(crossId);
if (StringUtils.equals(BasicEnum.ManufacturerEnum.HK.getCode(), manufacturerIdCode)) {
jsonViewObject = hkControlCommandService.recoverSchedule(crossId);
//步进时,需判断信号机是否在线和控制模式
JsonViewObject viewObject = checkControlStatus(crossId);
if (ObjectUtil.isNull(viewObject)) {
String manufacturerIdCode = crossInfoCache.getManufacturerCodeByCrossId(crossId);
if (StringUtils.equals(BasicEnum.ManufacturerEnum.HK.getCode(), manufacturerIdCode)) {
jsonViewObject = hkControlCommandService.recoverSchedule(crossId);
} else {
jsonViewObject = wanJiControlCommandService.recoverSchedule(crossId);
}
} else {
jsonViewObject = wanJiControlCommandService.recoverSchedule(crossId);
jsonViewObject = viewObject;
}
} catch (Exception e) {
jsonViewObject = JsonViewObject.newInstance().fail(e.getMessage());
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("crossId",crossId);
jsonObject.put("crossId", crossId);
//插入命令操作日志
insertCommandLog(now,jsonViewObject, jsonObject,2);
insertCommandLog(now, jsonViewObject, jsonObject, 2);
return jsonViewObject;
}
......@@ -469,7 +528,7 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
public JsonViewObject tempScheme(SchemeOptSendVO schemeOptSendVO) throws Exception {
JsonViewObject jsonViewObject = wanJiControlCommandService.tempScheme(schemeOptSendVO);
return jsonViewObject;
}
}
public static String getCrossCode(Object obj) throws NoSuchFieldException, IllegalAccessException {
// 获取对象的Class对象
......@@ -494,17 +553,23 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
String now = LocalDateTimeUtil.formatNow(LocalDateTimeUtil.TIMEFORMATTER);
JsonViewObject jsonViewObject = JsonViewObject.newInstance();
try {
String manufacturerCode = crossInfoCache.getManufacturerCodeByCrossId(tempSchemeSendVO.getCrossId());
if (StringUtils.equals(BasicEnum.ManufacturerEnum.HK.getCode(), manufacturerCode)) {
} else {
jsonViewObject = wanJiControlCommandService.tempSchemeDetail(tempSchemeSendVO);
//步进时,需判断信号机是否在线和控制模式
JsonViewObject viewObject = checkControlStatus(tempSchemeSendVO.getCrossId());
if (ObjectUtil.isNull(viewObject)) {
String manufacturerCode = crossInfoCache.getManufacturerCodeByCrossId(tempSchemeSendVO.getCrossId());
if (StringUtils.equals(BasicEnum.ManufacturerEnum.HK.getCode(), manufacturerCode)) {
} else {
jsonViewObject = wanJiControlCommandService.tempSchemeDetail(tempSchemeSendVO);
}
}else {
jsonViewObject = viewObject;
}
} catch (Exception e) {
jsonViewObject = JsonViewObject.newInstance().fail(e.getMessage());
}
JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(tempSchemeSendVO));
//插入命令操作日志
insertCommandLog(now,jsonViewObject, jsonObject,4);
insertCommandLog(now, jsonViewObject, jsonObject, 4);
return jsonViewObject;
}
......
package net.wanji.utc.task;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.databus.dao.entity.SignalCommandLogPO;
import net.wanji.databus.dao.entity.SignalCommandPO;
import net.wanji.databus.dao.mapper.SignalCommandLogPOMapper;
import net.wanji.databus.dao.mapper.SignalCommandPOMapper;
import net.wanji.utc.common.typeenum.DateStyle;
import net.wanji.utc.dto.LightStatusAlarmDTO;
import net.wanji.utc.kafka.ProducerHandler;
import net.wanji.utc.util.DateUtils;
import net.wanji.utc.util.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2024/12/26 10:33
* @description 信号命令日志同步
*/
@Component
@Slf4j
public class SignalCommandSyncTask {
@Resource
private SignalCommandPOMapper signalCommandPOMapper;
@Resource
SignalCommandLogPOMapper signalCommandLogPOMapper;
@Autowired
private ProducerHandler producerHandler;
@Resource(name = "commonThreadPoolExecutor")
private ThreadPoolTaskExecutor commonThreadPoolExecutor;
@Scheduled(fixedRate = 5 * 1000, initialDelay = 30 * 1000)
public void sync() {
try {
ConcurrentHashMap<String, SignalCommandPO> crossLightsStatusMap = SignalStatusTask.crossLightsStatusMap;
LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper();
List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper);
if (Objects.nonNull(crossLightsStatusMap) && !crossLightsStatusMap.isEmpty()) {
List<SignalCommandPO> insertList = new ArrayList<>();
List<SignalCommandPO> updateList = new ArrayList<>();
for (Map.Entry<String, SignalCommandPO> entry : crossLightsStatusMap.entrySet()) {
try {
String crossId = entry.getKey();
SignalCommandPO statusRunModelCache = entry.getValue();
Integer statusCache = statusRunModelCache.getStatus();
String runModeCache = statusRunModelCache.getRunMode();
String lampTime = statusRunModelCache.getLampTime();
if (!CollectionUtils.isEmpty(signalCommandPOS)) {
Map<String, SignalCommandPO> command4DBMap = signalCommandPOS.stream().collect(Collectors.toMap(SignalCommandPO::getCrossId, signalCommandPO -> signalCommandPO));
if (command4DBMap.containsKey(crossId)) {
SignalCommandPO updatePO = command4DBMap.get(crossId);
Integer status4DB = updatePO.getStatus();
String runMode4DB = updatePO.getRunMode();
if (Objects.equals(status4DB, statusCache) && StringUtils.equals(runMode4DB, runModeCache)) {
// 路口编号 状态 控制模式相同,不更新
continue;
} else {
updatePO.setStatus(statusCache);
updatePO.setRunMode(runModeCache);
updatePO.setUpdateTime(new Date());
updatePO.setLampTime(lampTime);
updateList.add(updatePO);
}
} else {
// 表中没有当前路口插入数据
insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime));
}
} else {
// 表中一条数据没有,插入路口
insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime));
log.error("路口灯态缓存为空:{}", crossLightsStatusMap);
}
} catch (Exception e) {
log.error("路口在线状态控制模式更新失败,路口编号信息:{}", entry, e);
}
}
saveAndUpdate(insertList, updateList);
List<SignalCommandPO> sendList = new ArrayList<>();
sendList.addAll(insertList);
sendList.addAll(updateList);
if (!CollectionUtils.isEmpty(sendList)) {
sendAlarmKafka(sendList);
}
} else {
log.error("从路口灯态缓存获取控制模式在线离线状态为空:{}", crossLightsStatusMap);
}
} catch (Exception e) {
log.error("信号机状态同步异常:", e);
}
}
@Transactional(rollbackFor = Exception.class)
public void saveAndUpdate(List<SignalCommandPO> insertList, List<SignalCommandPO> updateList) {
try {
if (!CollectionUtils.isEmpty(insertList)) {
signalCommandPOMapper.insertBatch(insertList);
List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(insertList.size());
BeanListUtils.populateList(insertList, signalCommandLogPOS, SignalCommandLogPO.class);
signalCommandLogPOMapper.insertBatch(signalCommandLogPOS);
}
if (!CollectionUtils.isEmpty(updateList)) {
signalCommandPOMapper.updateBatch(updateList);
List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(updateList.size());
BeanListUtils.populateList(updateList, signalCommandLogPOS, SignalCommandLogPO.class);
signalCommandLogPOMapper.insertBatch(signalCommandLogPOS);
log.info("路口状态控制模式变动,数据库更新成功:{}", updateList);
}
} catch (Exception e) {
log.error("路口状态控制模式变动,数据库更新失败:", e);
throw new RuntimeException(e);
}
}
private static SignalCommandPO insertSignalCommandPO(String crossId, Integer statusCache, String runModeCache, String lampTime) {
SignalCommandPO insertPO = new SignalCommandPO();
insertPO.setCrossId(crossId);
insertPO.setStatus(statusCache);
insertPO.setRunMode(runModeCache);
insertPO.setInsertTime(new Date());
insertPO.setLampTime(lampTime);
return insertPO;
}
private void sendAlarmKafka(List<SignalCommandPO> signalCommandPOS) throws Exception {
List<LightStatusAlarmDTO> lightStatusAlarmDTOS = new ArrayList<>();
try {
if (!CollectionUtils.isEmpty(signalCommandPOS)) {
for (SignalCommandPO signalCommandPO : signalCommandPOS) {
LightStatusAlarmDTO dto = new LightStatusAlarmDTO();
dto.setCrossId(signalCommandPO.getCrossId());
dto.setCommandMsg(signalCommandPO.getResultMessage());
dto.setCommandResult(signalCommandPO.getCommandResult());
dto.setCount(signalCommandPO.getUpdateCount());
dto.setStatus(signalCommandPO.getStatus());
dto.setRunMode(signalCommandPO.getRunMode());
dto.setMachineTimestamp(signalCommandPO.getLampTime());
Date date = DateUtils.stringToDate(signalCommandPO.getQueryTime(), DateStyle.YYYY_MM_DD_HH_MM_SS.getValue());
dto.setRequestTime(date);
dto.setDbTime(signalCommandPO.getUpdateTime());
lightStatusAlarmDTOS.add(dto);
}
}
producerHandler.send("light_status_alarm", lightStatusAlarmDTOS);
} catch (Exception e) {
log.error("灯态告警推送kafka失败:", e);
throw new RuntimeException(e);
}
}
}
package net.wanji.utc.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;import lombok.extern.slf4j.Slf4j;import net.wanji.common.utils.tool.BeanListUtils;import net.wanji.databus.dao.entity.SignalCommandLogPO;import net.wanji.databus.dao.entity.SignalCommandPO;import net.wanji.databus.dao.mapper.SignalCommandLogPOMapper;import net.wanji.databus.dao.mapper.SignalCommandPOMapper;import net.wanji.utc.common.typeenum.DateStyle;import net.wanji.utc.dto.LightStatusAlarmDTO;import net.wanji.utc.kafka.ProducerHandler;import net.wanji.utc.util.DateUtils;import net.wanji.utc.util.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.CollectionUtils; import javax.annotation.Resource;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.stream.Collectors; /** * @author duanruiming * @date 2024/12/26 10:33 * @description 信号命令日志同步 */@Component@Slf4jpublic class SignalCommandSyncTask { @Resource private SignalCommandPOMapper signalCommandPOMapper; @Resource SignalCommandLogPOMapper signalCommandLogPOMapper; @Autowired private ProducerHandler producerHandler; /** * 信号机状态控制模式缓存,更新数据库后查询 */ public static final List<SignalCommandPO> signalStatusRunModelList = new ArrayList<>(); @Scheduled(fixedRate = 5 * 1000, initialDelay = 30 * 1000) public void sync() { try { ConcurrentHashMap<String, SignalCommandPO> crossLightsStatusMap = SignalStatusTask.crossLightsStatusMap; LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper(); List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper); long start = System.currentTimeMillis(); if (Objects.nonNull(crossLightsStatusMap) && !crossLightsStatusMap.isEmpty()) { List<SignalCommandPO> insertList = new ArrayList<>(); List<SignalCommandPO> updateList = new ArrayList<>(); for (Map.Entry<String, SignalCommandPO> entry : crossLightsStatusMap.entrySet()) { try { String crossId = entry.getKey(); SignalCommandPO statusRunModelCache = entry.getValue(); Integer statusCache = statusRunModelCache.getStatus(); String runModeCache = statusRunModelCache.getRunMode(); String lampTime = statusRunModelCache.getLampTime(); if (!CollectionUtils.isEmpty(signalCommandPOS)) { Map<String, SignalCommandPO> command4DBMap = signalCommandPOS.stream().collect(Collectors.toMap(SignalCommandPO::getCrossId, signalCommandPO -> signalCommandPO)); if (command4DBMap.containsKey(crossId)) { SignalCommandPO updatePO = command4DBMap.get(crossId); Integer status4DB = updatePO.getStatus(); String runMode4DB = updatePO.getRunMode(); if (Objects.equals(status4DB, statusCache) && StringUtils.equals(runMode4DB, runModeCache)) { // 路口编号 状态 控制模式相同,不更新 continue; } else { updatePO.setStatus(statusCache); updatePO.setRunMode(runModeCache); updatePO.setUpdateTime(new Date()); updatePO.setLampTime(lampTime); updateList.add(updatePO); } } else { // 表中没有当前路口插入数据 insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime)); } } else { // 表中一条数据没有,插入路口 insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime)); log.error("路口灯态缓存为空:{}", crossLightsStatusMap); } } catch (Exception e) { log.error("路口在线状态控制模式更新失败,路口编号信息:{}", entry, e); } } saveAndUpdate(insertList, updateList); LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper2 = new LambdaQueryWrapper(); List<SignalCommandPO> listUpdated = signalCommandPOMapper.selectList(lambdaQueryWrapper2); signalStatusRunModelList.clear(); signalStatusRunModelList.addAll(listUpdated); sendAlarmKafka(listUpdated); } else { log.error("从路口灯态缓存获取控制模式在线离线状态为空:{}", crossLightsStatusMap); } long end = System.currentTimeMillis(); log.info("SignalCommandSyncTask 定时任务执行时间: {} ms",end - start); } catch (Exception e) { log.error("信号机状态同步异常:", e); } } @Transactional(rollbackFor = Exception.class) public void saveAndUpdate(List<SignalCommandPO> insertList, List<SignalCommandPO> updateList) { try { if (!CollectionUtils.isEmpty(insertList)) { signalCommandPOMapper.insertBatch(insertList); List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(insertList.size()); BeanListUtils.populateList(insertList, signalCommandLogPOS, SignalCommandLogPO.class); signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); } if (!CollectionUtils.isEmpty(updateList)) { signalCommandPOMapper.updateBatch(updateList); List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(updateList.size()); BeanListUtils.populateList(updateList, signalCommandLogPOS, SignalCommandLogPO.class); signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); log.info("路口状态控制模式变动,数据库更新成功:{}", updateList); } } catch (Exception e) { log.error("路口状态控制模式变动,数据库更新失败:", e); throw new RuntimeException(e); } } private static SignalCommandPO insertSignalCommandPO(String crossId, Integer statusCache, String runModeCache, String lampTime) { SignalCommandPO insertPO = new SignalCommandPO(); insertPO.setCrossId(crossId); insertPO.setStatus(statusCache); insertPO.setRunMode(runModeCache); insertPO.setInsertTime(new Date()); insertPO.setLampTime(lampTime); return insertPO; } private void sendAlarmKafka(List<SignalCommandPO> signalCommandPOS) throws Exception { List<LightStatusAlarmDTO> lightStatusAlarmDTOS = new ArrayList<>(); try { if (!CollectionUtils.isEmpty(signalCommandPOS)) { for (SignalCommandPO signalCommandPO : signalCommandPOS) { LightStatusAlarmDTO dto = new LightStatusAlarmDTO(); dto.setCrossId(signalCommandPO.getCrossId()); dto.setCommandMsg(signalCommandPO.getResultMessage()); dto.setCommandResult(signalCommandPO.getCommandResult()); dto.setCount(signalCommandPO.getUpdateCount()); dto.setStatus(signalCommandPO.getStatus()); dto.setRunMode(signalCommandPO.getRunMode()); dto.setMachineTimestamp(signalCommandPO.getLampTime()); Date date = DateUtils.stringToDate(signalCommandPO.getQueryTime(), DateStyle.YYYY_MM_DD_HH_MM_SS.getValue()); dto.setRequestTime(date); dto.setDbTime(signalCommandPO.getUpdateTime()); lightStatusAlarmDTOS.add(dto); } } producerHandler.send("light_status_alarm", lightStatusAlarmDTOS); } catch (Exception e) { log.error("灯态告警推送kafka失败:", e); throw new RuntimeException(e); } } }
\ No newline at end of file
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment