Commit 8f3c9f20 authored by duanruiming's avatar duanruiming

[add] 增加test文件

parent 61c864e5
utc.service.url=http://173.17.0.1:32000/utc
utc.dt.service.url=http://173.17.0.1:39002/utc-dt
utc.hisense.service.url=http://173.17.0.1:39003/utc-hisense
control.url=http://173.17.0.1:32001/web
ehualu.url=http://173.17.0.1:30015
\ No newline at end of file
utc.service.url=http://37.12.182.29:32000/utc
utc.dt.service.url=http://37.12.182.29:39002/utc-dt
utc.hisense.service.url=http://37.12.182.29:39003/utc-hisense
control.url=http://37.12.182.29:32001/web
ehualu.url=http://37.12.182.29:30015
\ No newline at end of file
......@@ -19,6 +19,7 @@ import net.wanji.utc.common.typeenum.BasicEnum;
import net.wanji.utc.service.control.ControlCommandService;
import net.wanji.utc.service.control.ControlCommandStrategyService;
import net.wanji.utc.util.StringUtils;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
......@@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
@Component
@Slf4j
@Profile("docker")
@SuppressWarnings("all")
public class CommandFaildTask {
@Resource
......
package net.wanji.utc.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;import lombok.extern.slf4j.Slf4j;import net.wanji.common.utils.tool.BeanListUtils;import net.wanji.databus.dao.entity.SignalCommandLogPO;import net.wanji.databus.dao.entity.SignalCommandPO;import net.wanji.databus.dao.mapper.SignalCommandLogPOMapper;import net.wanji.databus.dao.mapper.SignalCommandPOMapper;import net.wanji.utc.common.typeenum.DateStyle;import net.wanji.utc.dto.LightStatusAlarmDTO;import net.wanji.utc.kafka.ProducerHandler;import net.wanji.utc.util.DateUtils;import net.wanji.utc.util.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.CollectionUtils; import javax.annotation.Resource;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.stream.Collectors; /** * @author duanruiming * @date 2024/12/26 10:33 * @description 信号命令日志同步 */@Component@Slf4jpublic class SignalCommandSyncTask { @Resource private SignalCommandPOMapper signalCommandPOMapper; @Resource SignalCommandLogPOMapper signalCommandLogPOMapper; @Autowired private ProducerHandler producerHandler; /** * 信号机状态控制模式缓存,更新数据库后查询 */ public static Map<String, SignalCommandPO> signalStatusRunModelMap = new HashMap<>(); @Scheduled(fixedRate = 5 * 1000, initialDelay = 30 * 1000) public void sync() { try { ConcurrentHashMap<String, SignalCommandPO> crossLightsStatusMap = SignalStatusTask.crossLightsStatusMap; LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper(); List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper); long start = System.currentTimeMillis(); if (Objects.nonNull(crossLightsStatusMap) && !crossLightsStatusMap.isEmpty()) { List<SignalCommandPO> insertList = new ArrayList<>(); List<SignalCommandPO> updateList = new ArrayList<>(); for (Map.Entry<String, SignalCommandPO> entry : crossLightsStatusMap.entrySet()) { try { String crossId = entry.getKey(); SignalCommandPO statusRunModelCache = entry.getValue(); Integer statusCache = statusRunModelCache.getStatus(); String runModeCache = statusRunModelCache.getRunMode(); String lampTime = statusRunModelCache.getLampTime(); if (!CollectionUtils.isEmpty(signalCommandPOS)) { Map<String, SignalCommandPO> command4DBMap = signalCommandPOS.stream().collect(Collectors.toMap(SignalCommandPO::getCrossId, signalCommandPO -> signalCommandPO)); if (command4DBMap.containsKey(crossId)) { SignalCommandPO updatePO = command4DBMap.get(crossId); Integer status4DB = updatePO.getStatus(); String runMode4DB = updatePO.getRunMode(); if (Objects.equals(status4DB, statusCache) && StringUtils.equals(runMode4DB, runModeCache)) { // 路口编号 状态 控制模式相同,不更新 continue; } else { updatePO.setStatus(statusCache); updatePO.setRunMode(runModeCache); updatePO.setUpdateTime(new Date()); updatePO.setLampTime(lampTime); updateList.add(updatePO); } } else { // 表中没有当前路口插入数据 insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime)); } } else { // 表中一条数据没有,插入路口 insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime)); log.error("路口灯态缓存为空:{}", crossLightsStatusMap); } } catch (Exception e) { log.error("路口在线状态控制模式更新失败,路口编号信息:{}", entry, e); } } saveAndUpdate(insertList, updateList); LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper2 = new LambdaQueryWrapper(); List<SignalCommandPO> listUpdated = signalCommandPOMapper.selectList(lambdaQueryWrapper2); if (!CollectionUtils.isEmpty(listUpdated)) { for (SignalCommandPO signalCommandPO : listUpdated) { String crossId = signalCommandPO.getCrossId(); signalStatusRunModelMap.put(crossId, signalCommandPO); } } sendAlarmKafka(listUpdated); } else { log.error("从路口灯态缓存获取控制模式在线离线状态为空:{}", crossLightsStatusMap); } long end = System.currentTimeMillis(); log.info("SignalCommandSyncTask 定时任务执行时间: {} ms",end - start); } catch (Exception e) { log.error("信号机状态同步异常:", e); } } @Transactional(rollbackFor = Exception.class) public void saveAndUpdate(List<SignalCommandPO> insertList, List<SignalCommandPO> updateList) { try { if (!CollectionUtils.isEmpty(insertList)) { signalCommandPOMapper.insertBatch(insertList); List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(insertList.size()); BeanListUtils.populateList(insertList, signalCommandLogPOS, SignalCommandLogPO.class); signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); } if (!CollectionUtils.isEmpty(updateList)) { signalCommandPOMapper.updateBatch(updateList); List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(updateList.size()); BeanListUtils.populateList(updateList, signalCommandLogPOS, SignalCommandLogPO.class); signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); log.info("路口状态控制模式变动,数据库更新成功:{}", updateList); } } catch (Exception e) { log.error("路口状态控制模式变动,数据库更新失败:", e); throw new RuntimeException(e); } } private static SignalCommandPO insertSignalCommandPO(String crossId, Integer statusCache, String runModeCache, String lampTime) { SignalCommandPO insertPO = new SignalCommandPO(); insertPO.setCrossId(crossId); insertPO.setStatus(statusCache); insertPO.setRunMode(runModeCache); insertPO.setInsertTime(new Date()); insertPO.setLampTime(lampTime); return insertPO; } private void sendAlarmKafka(List<SignalCommandPO> signalCommandPOS) throws Exception { List<LightStatusAlarmDTO> lightStatusAlarmDTOS = new ArrayList<>(); try { if (!CollectionUtils.isEmpty(signalCommandPOS)) { for (SignalCommandPO signalCommandPO : signalCommandPOS) { LightStatusAlarmDTO dto = new LightStatusAlarmDTO(); dto.setCrossId(signalCommandPO.getCrossId()); dto.setCommandMsg(signalCommandPO.getResultMessage()); dto.setCommandResult(signalCommandPO.getCommandResult()); dto.setCount(signalCommandPO.getUpdateCount()); dto.setStatus(signalCommandPO.getStatus()); dto.setRunMode(signalCommandPO.getRunMode()); dto.setMachineTimestamp(signalCommandPO.getLampTime()); Date date = DateUtils.stringToDate(signalCommandPO.getQueryTime(), DateStyle.YYYY_MM_DD_HH_MM_SS.getValue()); dto.setRequestTime(date); dto.setDbTime(signalCommandPO.getUpdateTime()); lightStatusAlarmDTOS.add(dto); } } producerHandler.send("light_status_alarm", lightStatusAlarmDTOS); } catch (Exception e) { log.error("灯态告警推送kafka失败:", e); throw new RuntimeException(e); } } }
\ No newline at end of file
package net.wanji.utc.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;import lombok.extern.slf4j.Slf4j;import net.wanji.common.utils.tool.BeanListUtils;import net.wanji.databus.dao.entity.SignalCommandLogPO;import net.wanji.databus.dao.entity.SignalCommandPO;import net.wanji.databus.dao.mapper.SignalCommandLogPOMapper;import net.wanji.databus.dao.mapper.SignalCommandPOMapper;import net.wanji.utc.common.typeenum.DateStyle;import net.wanji.utc.dto.LightStatusAlarmDTO;import net.wanji.utc.kafka.ProducerHandler;import net.wanji.utc.util.DateUtils;import net.wanji.utc.util.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Profile;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.CollectionUtils; import javax.annotation.Resource;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.stream.Collectors; /** * @author duanruiming * @date 2024/12/26 10:33 * @description 信号命令日志同步 */@Component@Slf4j@Profile("docker")public class SignalCommandSyncTask { @Resource private SignalCommandPOMapper signalCommandPOMapper; @Resource SignalCommandLogPOMapper signalCommandLogPOMapper; @Autowired private ProducerHandler producerHandler; /** * 信号机状态控制模式缓存,更新数据库后查询 */ public static Map<String, SignalCommandPO> signalStatusRunModelMap = new HashMap<>(); @Scheduled(fixedRate = 5 * 1000, initialDelay = 30 * 1000) public void sync() { try { ConcurrentHashMap<String, SignalCommandPO> crossLightsStatusMap = SignalStatusTask.crossLightsStatusMap; LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper(); List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper); long start = System.currentTimeMillis(); if (Objects.nonNull(crossLightsStatusMap) && !crossLightsStatusMap.isEmpty()) { List<SignalCommandPO> insertList = new ArrayList<>(); List<SignalCommandPO> updateList = new ArrayList<>(); for (Map.Entry<String, SignalCommandPO> entry : crossLightsStatusMap.entrySet()) { try { String crossId = entry.getKey(); SignalCommandPO statusRunModelCache = entry.getValue(); Integer statusCache = statusRunModelCache.getStatus(); String runModeCache = statusRunModelCache.getRunMode(); String lampTime = statusRunModelCache.getLampTime(); if (!CollectionUtils.isEmpty(signalCommandPOS)) { Map<String, SignalCommandPO> command4DBMap = signalCommandPOS.stream().collect(Collectors.toMap(SignalCommandPO::getCrossId, signalCommandPO -> signalCommandPO)); if (command4DBMap.containsKey(crossId)) { SignalCommandPO updatePO = command4DBMap.get(crossId); Integer status4DB = updatePO.getStatus(); String runMode4DB = updatePO.getRunMode(); if (Objects.equals(status4DB, statusCache) && StringUtils.equals(runMode4DB, runModeCache)) { // 路口编号 状态 控制模式相同,不更新 continue; } else { updatePO.setStatus(statusCache); updatePO.setRunMode(runModeCache); updatePO.setUpdateTime(new Date()); updatePO.setLampTime(lampTime); updateList.add(updatePO); } } else { // 表中没有当前路口插入数据 insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime)); } } else { // 表中一条数据没有,插入路口 insertList.add(insertSignalCommandPO(crossId, statusCache, runModeCache, lampTime)); log.error("路口灯态缓存为空:{}", crossLightsStatusMap); } } catch (Exception e) { log.error("路口在线状态控制模式更新失败,路口编号信息:{}", entry, e); } } saveAndUpdate(insertList, updateList); LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper2 = new LambdaQueryWrapper(); List<SignalCommandPO> listUpdated = signalCommandPOMapper.selectList(lambdaQueryWrapper2); if (!CollectionUtils.isEmpty(listUpdated)) { for (SignalCommandPO signalCommandPO : listUpdated) { String crossId = signalCommandPO.getCrossId(); signalStatusRunModelMap.put(crossId, signalCommandPO); } } sendAlarmKafka(listUpdated); } else { log.error("从路口灯态缓存获取控制模式在线离线状态为空:{}", crossLightsStatusMap); } long end = System.currentTimeMillis(); log.info("SignalCommandSyncTask 定时任务执行时间: {} ms",end - start); } catch (Exception e) { log.error("信号机状态同步异常:", e); } } @Transactional(rollbackFor = Exception.class) public void saveAndUpdate(List<SignalCommandPO> insertList, List<SignalCommandPO> updateList) { try { if (!CollectionUtils.isEmpty(insertList)) { signalCommandPOMapper.insertBatch(insertList); List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(insertList.size()); BeanListUtils.populateList(insertList, signalCommandLogPOS, SignalCommandLogPO.class); signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); } if (!CollectionUtils.isEmpty(updateList)) { signalCommandPOMapper.updateBatch(updateList); List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(updateList.size()); BeanListUtils.populateList(updateList, signalCommandLogPOS, SignalCommandLogPO.class); signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); log.info("路口状态控制模式变动,数据库更新成功:{}", updateList); } } catch (Exception e) { log.error("路口状态控制模式变动,数据库更新失败:", e); throw new RuntimeException(e); } } private static SignalCommandPO insertSignalCommandPO(String crossId, Integer statusCache, String runModeCache, String lampTime) { SignalCommandPO insertPO = new SignalCommandPO(); insertPO.setCrossId(crossId); insertPO.setStatus(statusCache); insertPO.setRunMode(runModeCache); insertPO.setInsertTime(new Date()); insertPO.setLampTime(lampTime); return insertPO; } private void sendAlarmKafka(List<SignalCommandPO> signalCommandPOS) throws Exception { List<LightStatusAlarmDTO> lightStatusAlarmDTOS = new ArrayList<>(); try { if (!CollectionUtils.isEmpty(signalCommandPOS)) { for (SignalCommandPO signalCommandPO : signalCommandPOS) { LightStatusAlarmDTO dto = new LightStatusAlarmDTO(); dto.setCrossId(signalCommandPO.getCrossId()); dto.setCommandMsg(signalCommandPO.getResultMessage()); dto.setCommandResult(signalCommandPO.getCommandResult()); dto.setCount(signalCommandPO.getUpdateCount()); dto.setStatus(signalCommandPO.getStatus()); dto.setRunMode(signalCommandPO.getRunMode()); dto.setMachineTimestamp(signalCommandPO.getLampTime()); Date date = DateUtils.stringToDate(signalCommandPO.getQueryTime(), DateStyle.YYYY_MM_DD_HH_MM_SS.getValue()); dto.setRequestTime(date); dto.setDbTime(signalCommandPO.getUpdateTime()); lightStatusAlarmDTOS.add(dto); } } producerHandler.send("light_status_alarm", lightStatusAlarmDTOS); } catch (Exception e) { log.error("灯态告警推送kafka失败:", e); throw new RuntimeException(e); } } }
\ No newline at end of file
......
......@@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
......@@ -58,8 +59,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
* @author Kent HAN
* @date 2022/11/21 9:01
*/
//@Component
@Component
@Slf4j
@Profile("docker")
@SuppressWarnings("all")
public class SignalStatusTask {
@Autowired
......
spring:
application:
# dubbo启动需要程序名称
name: utc
main:
allow-circular-references: true
cloud:
nacos:
config:
server-addr: 37.12.182.29:8848
file-extension: yaml
group: signal
namespace: signal
username: nacos
password: nacos
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment