Commit 9a62f7df authored by duanruiming's avatar duanruiming

[add] 灯态控制模式缓存优化

parent ef35556f
......@@ -47,7 +47,6 @@ import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author duanruiming
......@@ -259,7 +258,7 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
} else {
jsonViewObject = wanJiControlCommandService.lockControl(commandVO);
}
}else {
} else {
jsonViewObject = viewObject;
}
} catch (Exception e) {
......@@ -315,20 +314,16 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
**/
private JsonViewObject checkControlStatus(String crossId) {
JsonViewObject jsonViewObject = null;
List<SignalCommandPO> signalCommandPOS = SignalCommandSyncTask.signalStatusRunModelList;
if (ObjectUtil.isNotEmpty(signalCommandPOS)) {
List<SignalCommandPO> collect = signalCommandPOS.stream().filter(x -> x.getCrossId().equals(crossId)).collect(Collectors.toList());
if (ObjectUtil.isNotEmpty(collect)) {
SignalCommandPO signalCommandPO = collect.get(0);
Integer status = signalCommandPO.getStatus();
String runMode = signalCommandPO.getRunMode();
if (status == 0) {
jsonViewObject = JsonViewObject.newInstance().fail("该路口处于离线状态,禁止执行指令");
jsonViewObject.setCode(HttpCode.NOT_IMPLEMENTED.value());
} else if ("31".equals(runMode)) {
jsonViewObject = JsonViewObject.newInstance().fail("该路口控制模式为手动控制,禁止执行指令");
jsonViewObject.setCode(HttpCode.NOT_EXTENDED.value());
}
SignalCommandPO signalCommandPO = SignalCommandSyncTask.signalStatusRunModelMap.get(crossId);
if (ObjectUtil.isNotEmpty(signalCommandPO)) {
Integer status = signalCommandPO.getStatus();
String runMode = signalCommandPO.getRunMode();
if (status == 0) {
jsonViewObject = JsonViewObject.newInstance().fail("该路口处于离线状态,禁止执行指令");
jsonViewObject.setCode(HttpCode.NOT_IMPLEMENTED.value());
} else if ("31".equals(runMode)) {
jsonViewObject = JsonViewObject.newInstance().fail("该路口控制模式为手动控制,禁止执行指令");
jsonViewObject.setCode(HttpCode.NOT_EXTENDED.value());
}
}
return jsonViewObject;
......@@ -561,7 +556,7 @@ public class ControlCommandStrategyServiceImpl implements ControlCommandStrategy
} else {
jsonViewObject = wanJiControlCommandService.tempSchemeDetail(tempSchemeSendVO);
}
}else {
} else {
jsonViewObject = viewObject;
}
} catch (Exception e) {
......
package net.wanji.utc.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;import lombok.extern.slf4j.Slf4j;import net.wanji.common.utils.tool.BeanListUtils;import net.wanji.databus.dao.entity.SignalCommandLogPO;import net.wanji.databus.dao.entity.SignalCommandPO;import net.wanji.databus.dao.mapper.SignalCommandLogPOMapper;import net.wanji.databus.dao.mapper.SignalCommandPOMapper;import net.wanji.utc.common.typeenum.DateStyle;import net.wanji.utc.dto.LightStatusAlarmDTO;import net.wanji.utc.kafka.ProducerHandler;import net.wanji.utc.util.DateUtils;import net.wanji.utc.util.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.CollectionUtils; import javax.annotation.Resource;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.stream.Collectors; /** * @author duanruiming * @date 2024/12/26 10:33 * @description 信号命令日志同步 */@Component@Slf4jpublic class SignalCommandSyncTask { @Resource private SignalCommandPOMapper signalCommandPOMapper; @Resource SignalCommandLogPOMapper signalCommandLogPOMapper; @Autowired private ProducerHandler producerHandler; /** * 信号机状态控制模式缓存,更新数据库后查询 */ public static final List<SignalCommandPO> signalStatusRunModelList = new ArrayList<>(); @Scheduled(fixedRate = 5 * 1000, initialDelay = 30 * 1000) public void sync() { try { ConcurrentHashMap<String, SignalCommandPO> crossLightsStatusMap = SignalStatusTask.crossLightsStatusMap; LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper(); List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper); long start = System.currentTimeMillis(); if (Objects.nonNull(crossLightsStatusMap) && !crossLightsStatusMap.isEmpty()) { List<SignalCommandPO> insertList = new ArrayList<>(); List<SignalCommandPO> updateList = new ArrayList<>(); for (Map.Entry<String, SignalCommandPO> entry : crossLightsStatusMap.entrySet()) { try { String crossId = entry.getKey(); SignalCommandPO statusRunModelCache = entry.getValue(); Integer statusCache = statusRunModelCache.getStatus(); String runModeCache = statusRunModelCache.getRunMode(); String lampTime = statusRunModelCache.getLampTime(); if (!CollectionUtils.isEmpty(signalCommandPOS)) { Map<String, SignalCommandPO> command4DBMap = signalCommandPOS.stream().collect(Collectors.toMap(SignalCommandPO::getCrossId, signalCommandPO -> signalCommandPO)); if (command4DBMap.containsKey(crossId)) { SignalCommandPO updatePO = command4DBMap.get(crossId); Integer status4DB = updatePO.getStatus(); String runMode4DB = updatePO.getRunMode(); if (Objects.equals(status4DB, statusCache) && StringUtils.equals(runMode4DB, runModeCache)) { // 路口编号 状态 控制模式相同,不更新 continue; } else { updatePO.setStatus(statusCache); updatePO.setRunMode(runModeCache); updatePO.setUpdateTime(new Date()); updatePO.setLampTime(lampTime); updateList.add(updatePO); } } else { // 表中没有当前路口插入数据 insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime)); } } else { // 表中一条数据没有,插入路口 insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime)); log.error("路口灯态缓存为空:{}", crossLightsStatusMap); } } catch (Exception e) { log.error("路口在线状态控制模式更新失败,路口编号信息:{}", entry, e); } } saveAndUpdate(insertList, updateList); LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper2 = new LambdaQueryWrapper(); List<SignalCommandPO> listUpdated = signalCommandPOMapper.selectList(lambdaQueryWrapper2); signalStatusRunModelList.clear(); signalStatusRunModelList.addAll(listUpdated); sendAlarmKafka(listUpdated); } else { log.error("从路口灯态缓存获取控制模式在线离线状态为空:{}", crossLightsStatusMap); } long end = System.currentTimeMillis(); log.info("SignalCommandSyncTask 定时任务执行时间: {} ms",end - start); } catch (Exception e) { log.error("信号机状态同步异常:", e); } } @Transactional(rollbackFor = Exception.class) public void saveAndUpdate(List<SignalCommandPO> insertList, List<SignalCommandPO> updateList) { try { if (!CollectionUtils.isEmpty(insertList)) { signalCommandPOMapper.insertBatch(insertList); List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(insertList.size()); BeanListUtils.populateList(insertList, signalCommandLogPOS, SignalCommandLogPO.class); signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); } if (!CollectionUtils.isEmpty(updateList)) { signalCommandPOMapper.updateBatch(updateList); List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(updateList.size()); BeanListUtils.populateList(updateList, signalCommandLogPOS, SignalCommandLogPO.class); signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); log.info("路口状态控制模式变动,数据库更新成功:{}", updateList); } } catch (Exception e) { log.error("路口状态控制模式变动,数据库更新失败:", e); throw new RuntimeException(e); } } private static SignalCommandPO insertSignalCommandPO(String crossId, Integer statusCache, String runModeCache, String lampTime) { SignalCommandPO insertPO = new SignalCommandPO(); insertPO.setCrossId(crossId); insertPO.setStatus(statusCache); insertPO.setRunMode(runModeCache); insertPO.setInsertTime(new Date()); insertPO.setLampTime(lampTime); return insertPO; } private void sendAlarmKafka(List<SignalCommandPO> signalCommandPOS) throws Exception { List<LightStatusAlarmDTO> lightStatusAlarmDTOS = new ArrayList<>(); try { if (!CollectionUtils.isEmpty(signalCommandPOS)) { for (SignalCommandPO signalCommandPO : signalCommandPOS) { LightStatusAlarmDTO dto = new LightStatusAlarmDTO(); dto.setCrossId(signalCommandPO.getCrossId()); dto.setCommandMsg(signalCommandPO.getResultMessage()); dto.setCommandResult(signalCommandPO.getCommandResult()); dto.setCount(signalCommandPO.getUpdateCount()); dto.setStatus(signalCommandPO.getStatus()); dto.setRunMode(signalCommandPO.getRunMode()); dto.setMachineTimestamp(signalCommandPO.getLampTime()); Date date = DateUtils.stringToDate(signalCommandPO.getQueryTime(), DateStyle.YYYY_MM_DD_HH_MM_SS.getValue()); dto.setRequestTime(date); dto.setDbTime(signalCommandPO.getUpdateTime()); lightStatusAlarmDTOS.add(dto); } } producerHandler.send("light_status_alarm", lightStatusAlarmDTOS); } catch (Exception e) { log.error("灯态告警推送kafka失败:", e); throw new RuntimeException(e); } } }
\ No newline at end of file
package net.wanji.utc.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;import lombok.extern.slf4j.Slf4j;import net.wanji.common.utils.tool.BeanListUtils;import net.wanji.databus.dao.entity.SignalCommandLogPO;import net.wanji.databus.dao.entity.SignalCommandPO;import net.wanji.databus.dao.mapper.SignalCommandLogPOMapper;import net.wanji.databus.dao.mapper.SignalCommandPOMapper;import net.wanji.utc.common.typeenum.DateStyle;import net.wanji.utc.dto.LightStatusAlarmDTO;import net.wanji.utc.kafka.ProducerHandler;import net.wanji.utc.util.DateUtils;import net.wanji.utc.util.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.CollectionUtils; import javax.annotation.Resource;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.stream.Collectors; /** * @author duanruiming * @date 2024/12/26 10:33 * @description 信号命令日志同步 */@Component@Slf4jpublic class SignalCommandSyncTask { @Resource private SignalCommandPOMapper signalCommandPOMapper; @Resource SignalCommandLogPOMapper signalCommandLogPOMapper; @Autowired private ProducerHandler producerHandler; /** * 信号机状态控制模式缓存,更新数据库后查询 */ public static Map<String, SignalCommandPO> signalStatusRunModelMap = new HashMap<>(); @Scheduled(fixedRate = 5 * 1000, initialDelay = 30 * 1000) public void sync() { try { ConcurrentHashMap<String, SignalCommandPO> crossLightsStatusMap = SignalStatusTask.crossLightsStatusMap; LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper(); List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper); long start = System.currentTimeMillis(); if (Objects.nonNull(crossLightsStatusMap) && !crossLightsStatusMap.isEmpty()) { List<SignalCommandPO> insertList = new ArrayList<>(); List<SignalCommandPO> updateList = new ArrayList<>(); for (Map.Entry<String, SignalCommandPO> entry : crossLightsStatusMap.entrySet()) { try { String crossId = entry.getKey(); SignalCommandPO statusRunModelCache = entry.getValue(); Integer statusCache = statusRunModelCache.getStatus(); String runModeCache = statusRunModelCache.getRunMode(); String lampTime = statusRunModelCache.getLampTime(); if (!CollectionUtils.isEmpty(signalCommandPOS)) { Map<String, SignalCommandPO> command4DBMap = signalCommandPOS.stream().collect(Collectors.toMap(SignalCommandPO::getCrossId, signalCommandPO -> signalCommandPO)); if (command4DBMap.containsKey(crossId)) { SignalCommandPO updatePO = command4DBMap.get(crossId); Integer status4DB = updatePO.getStatus(); String runMode4DB = updatePO.getRunMode(); if (Objects.equals(status4DB, statusCache) && StringUtils.equals(runMode4DB, runModeCache)) { // 路口编号 状态 控制模式相同,不更新 continue; } else { updatePO.setStatus(statusCache); updatePO.setRunMode(runModeCache); updatePO.setUpdateTime(new Date()); updatePO.setLampTime(lampTime); updateList.add(updatePO); } } else { // 表中没有当前路口插入数据 insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime)); } } else { // 表中一条数据没有,插入路口 insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime)); log.error("路口灯态缓存为空:{}", crossLightsStatusMap); } } catch (Exception e) { log.error("路口在线状态控制模式更新失败,路口编号信息:{}", entry, e); } } saveAndUpdate(insertList, updateList); LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper2 = new LambdaQueryWrapper(); List<SignalCommandPO> listUpdated = signalCommandPOMapper.selectList(lambdaQueryWrapper2); if (!CollectionUtils.isEmpty(listUpdated)) { for (SignalCommandPO signalCommandPO : listUpdated) { String crossId = signalCommandPO.getCrossId(); signalStatusRunModelMap.put(crossId, signalCommandPO); } } sendAlarmKafka(listUpdated); } else { log.error("从路口灯态缓存获取控制模式在线离线状态为空:{}", crossLightsStatusMap); } long end = System.currentTimeMillis(); log.info("SignalCommandSyncTask 定时任务执行时间: {} ms",end - start); } catch (Exception e) { log.error("信号机状态同步异常:", e); } } @Transactional(rollbackFor = Exception.class) public void saveAndUpdate(List<SignalCommandPO> insertList, List<SignalCommandPO> updateList) { try { if (!CollectionUtils.isEmpty(insertList)) { signalCommandPOMapper.insertBatch(insertList); List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(insertList.size()); BeanListUtils.populateList(insertList, signalCommandLogPOS, SignalCommandLogPO.class); signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); } if (!CollectionUtils.isEmpty(updateList)) { signalCommandPOMapper.updateBatch(updateList); List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(updateList.size()); BeanListUtils.populateList(updateList, signalCommandLogPOS, SignalCommandLogPO.class); signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); log.info("路口状态控制模式变动,数据库更新成功:{}", updateList); } } catch (Exception e) { log.error("路口状态控制模式变动,数据库更新失败:", e); throw new RuntimeException(e); } } private static SignalCommandPO insertSignalCommandPO(String crossId, Integer statusCache, String runModeCache, String lampTime) { SignalCommandPO insertPO = new SignalCommandPO(); insertPO.setCrossId(crossId); insertPO.setStatus(statusCache); insertPO.setRunMode(runModeCache); insertPO.setInsertTime(new Date()); insertPO.setLampTime(lampTime); return insertPO; } private void sendAlarmKafka(List<SignalCommandPO> signalCommandPOS) throws Exception { List<LightStatusAlarmDTO> lightStatusAlarmDTOS = new ArrayList<>(); try { if (!CollectionUtils.isEmpty(signalCommandPOS)) { for (SignalCommandPO signalCommandPO : signalCommandPOS) { LightStatusAlarmDTO dto = new LightStatusAlarmDTO(); dto.setCrossId(signalCommandPO.getCrossId()); dto.setCommandMsg(signalCommandPO.getResultMessage()); dto.setCommandResult(signalCommandPO.getCommandResult()); dto.setCount(signalCommandPO.getUpdateCount()); dto.setStatus(signalCommandPO.getStatus()); dto.setRunMode(signalCommandPO.getRunMode()); dto.setMachineTimestamp(signalCommandPO.getLampTime()); Date date = DateUtils.stringToDate(signalCommandPO.getQueryTime(), DateStyle.YYYY_MM_DD_HH_MM_SS.getValue()); dto.setRequestTime(date); dto.setDbTime(signalCommandPO.getUpdateTime()); lightStatusAlarmDTOS.add(dto); } } producerHandler.send("light_status_alarm", lightStatusAlarmDTOS); } catch (Exception e) { log.error("灯态告警推送kafka失败:", e); throw new RuntimeException(e); } } }
\ No newline at end of file
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment