Commit 68e7a695 authored by duanruiming's avatar duanruiming

[add] 路口状态控制模式同步

parent ae02b901
package net.wanji.utc.task;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.databus.dao.entity.SignalCommandLogPO;
import net.wanji.databus.dao.entity.SignalCommandPO;
import net.wanji.databus.dao.mapper.SignalCommandLogPOMapper;
import net.wanji.databus.dao.mapper.SignalCommandPOMapper;
import net.wanji.utc.util.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2024/12/26 10:33
* @description 信号命令日志同步
*/
@Component
@Slf4j
public class SignalCommandSyncTask {
@Resource
private SignalCommandPOMapper signalCommandPOMapper;
@Resource
SignalCommandLogPOMapper signalCommandLogPOMapper;
@Resource(name = "commonThreadPoolExecutor")
private ThreadPoolTaskExecutor commonThreadPoolExecutor;
@Scheduled(fixedRate = 5 * 1000, initialDelay = 30 * 1000)
public void sync() {
ConcurrentHashMap<String, SignalCommandPO> crossLightsStatusMap = SignalStatusTask.crossLightsStatusMap;
LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper();
List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper);
if (Objects.nonNull(crossLightsStatusMap) && !crossLightsStatusMap.isEmpty()) {
List<SignalCommandPO> insertList = new ArrayList<>();
List<SignalCommandPO> updateList = new ArrayList<>();
for (Map.Entry<String, SignalCommandPO> entry : crossLightsStatusMap.entrySet()) {
try {
String crossId = entry.getKey();
SignalCommandPO statusRunModelCache = entry.getValue();
Integer statusCache = statusRunModelCache.getStatus();
String runModeCache = statusRunModelCache.getRunMode();
if (!CollectionUtils.isEmpty(signalCommandPOS)) {
Map<String, SignalCommandPO> command4DBMap = signalCommandPOS.stream().collect(Collectors.toMap(SignalCommandPO::getCrossId, signalCommandPO -> signalCommandPO));
if (command4DBMap.containsKey(crossId)) {
SignalCommandPO updatePO = command4DBMap.get(crossId);
Integer status4DB = updatePO.getStatus();
String runMode4DB = updatePO.getRunMode();
if (Objects.equals(status4DB, statusCache) && StringUtils.equals(runMode4DB, runModeCache)) {
// 路口编号 状态 控制模式相同,不更新
continue;
} else {
updatePO.setStatus(statusCache);
updatePO.setRunMode(runModeCache);
updatePO.setUpdateTime(new Date());
updateList.add(updatePO);
}
} else {
SignalCommandPO insertPO = new SignalCommandPO();
insertPO.setCrossId(crossId);
insertPO.setStatus(statusCache);
insertPO.setRunMode(runModeCache);
insertPO.setCommandResult(1);
insertPO.setUpdateCount(0);
insertPO.setInsertTime(new Date());
insertList.add(insertPO);
}
} else {
log.error("路口灯态缓存为空:{}", crossLightsStatusMap);
}
} catch (Exception e) {
log.error("路口在线状态控制模式更新失败,路口编号信息:{}", entry, e);
}
}
if (!CollectionUtils.isEmpty(insertList)) {
signalCommandPOMapper.insertBatch(insertList);
List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(insertList.size());
BeanListUtils.populateList(insertList, signalCommandLogPOS, SignalCommandLogPO.class);
signalCommandLogPOMapper.insertBatch(signalCommandLogPOS);
}
if (!CollectionUtils.isEmpty(updateList)) {
signalCommandPOMapper.updateBatch(updateList);
List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(updateList.size());
BeanListUtils.populateList(updateList, signalCommandLogPOS, SignalCommandLogPO.class);
signalCommandLogPOMapper.updateBatch(signalCommandLogPOS);
log.info("路口状态控制模式变动,数据库更新成功:{}", updateList);
}
} else {
log.error("从路口灯态缓存获取控制模式在线离线状态为空:{}", crossLightsStatusMap);
}
}
}
...@@ -6,6 +6,8 @@ import lombok.extern.slf4j.Slf4j; ...@@ -6,6 +6,8 @@ import lombok.extern.slf4j.Slf4j;
import net.wanji.common.framework.rest.JsonViewObject; import net.wanji.common.framework.rest.JsonViewObject;
import net.wanji.common.utils.tool.JacksonUtils; import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.databus.dao.entity.CrossLightsStatusHistPO; import net.wanji.databus.dao.entity.CrossLightsStatusHistPO;
import net.wanji.databus.dao.entity.SignalCommandLogPO;
import net.wanji.databus.dao.entity.SignalCommandPO;
import net.wanji.databus.dao.mapper.CrossInfoMapper; import net.wanji.databus.dao.mapper.CrossInfoMapper;
import net.wanji.databus.dao.mapper.CrossLightsStatusHistMapper; import net.wanji.databus.dao.mapper.CrossLightsStatusHistMapper;
import net.wanji.databus.dao.mapper.ManufacturerInfoMapper; import net.wanji.databus.dao.mapper.ManufacturerInfoMapper;
...@@ -86,6 +88,8 @@ public class SignalStatusTask { ...@@ -86,6 +88,8 @@ public class SignalStatusTask {
public static final ConcurrentHashMap<String, List<LightsStatusVO2>> produceListMap = new ConcurrentHashMap<>(); public static final ConcurrentHashMap<String, List<LightsStatusVO2>> produceListMap = new ConcurrentHashMap<>();
public static final ConcurrentHashMap<String, SignalCommandPO> crossLightsStatusMap = new ConcurrentHashMap<>();
// 运行状态、告警,1分钟一次 // 运行状态、告警,1分钟一次
@Scheduled(fixedRate = 60 * 1000) @Scheduled(fixedRate = 60 * 1000)
public void runningStatusAlarm() throws Exception { public void runningStatusAlarm() throws Exception {
...@@ -107,6 +111,7 @@ public class SignalStatusTask { ...@@ -107,6 +111,7 @@ public class SignalStatusTask {
CrossInfoPO crossInfoPO = crossInfoMapper.selectByPrimaryKey(crossId); CrossInfoPO crossInfoPO = crossInfoMapper.selectByPrimaryKey(crossId);
if (Objects.nonNull(crossInfoPO)) { if (Objects.nonNull(crossInfoPO)) {
List<LightsStatusVO> lightsStatusVOS = getLightsStatusVOS(mapper, crossId, crossInfoPO); List<LightsStatusVO> lightsStatusVOS = getLightsStatusVOS(mapper, crossId, crossInfoPO);
setStatus(crossId, start, lightsStatusVOS);
saveAndSendLightsStatus(mapper, crossId, lightsStatusVOS, entry); saveAndSendLightsStatus(mapper, crossId, lightsStatusVOS, entry);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
log.error("当前路口:{}, 获取灯态时间:{}ms", crossId, end-start); log.error("当前路口:{}, 获取灯态时间:{}ms", crossId, end-start);
...@@ -121,6 +126,33 @@ public class SignalStatusTask { ...@@ -121,6 +126,33 @@ public class SignalStatusTask {
} }
} }
private static void setStatus(String crossId, long start, List<LightsStatusVO> lightsStatusVOS) {
// 添加在线离线逻辑
// 0-离线 1-在线
SignalCommandLogPO signalCommandLogPO = new SignalCommandLogPO();
int status = 1;
String runMode = "11";
if (!CollectionUtils.isEmpty(lightsStatusVOS)) {
for (LightsStatusVO lightsStatusVO : lightsStatusVOS) {
if (Objects.nonNull(lightsStatusVO)) {
String machineTimestamp = lightsStatusVO.getTimeStamp();
long offsetTime = start - Long.valueOf(machineTimestamp);
if (offsetTime > 1000 * 10) {
status = 0;
}
runMode = lightsStatusVO.getRunMode();
}
}
} else {
status = 0;
}
signalCommandLogPO.setCrossId(crossId);
signalCommandLogPO.setRunMode(runMode);
signalCommandLogPO.setStatus(status);
signalCommandLogPO.setUpdateTime(new Date());
crossLightsStatusMap.put(crossId, signalCommandLogPO);
}
/** /**
* 保存灯态数据并发送 * 保存灯态数据并发送
* *
......
package net.wanji.databus.dao.entity; package net.wanji.databus.dao.entity;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import java.sql.Date;
/** /**
* @ClassName CrossSchemeOptLog * @ClassName CrossSchemeOptLog
* @Description 命令操作日志 * @Description 命令操作日志
...@@ -14,42 +11,6 @@ import java.sql.Date; ...@@ -14,42 +11,6 @@ import java.sql.Date;
*/ */
@Data @Data
@TableName("t_signal_command_log") @TableName("t_signal_command_log")
public class SignalCommandLogPO { public class SignalCommandLogPO extends SignalCommandPO {
@ApiModelProperty(value = "路口编号")
private String crossId;
@ApiModelProperty(value = "请求信息")
private String dataInfo;
@ApiModelProperty(value = "操作类型:1-步进控制/恢复(公用);2-恢复时间表(公用);3-相位锁定/解锁(公用);4-临时方案下发(公用);5-下发命令失败,程序下发恢复指令;10-查询环图失败记录")
private Integer commandType;
@ApiModelProperty(value = "优化结果:1-成功 2-失败")
private Integer commandResult;
@ApiModelProperty(value = "返回信息")
private String resultMessage;
@ApiModelProperty(value = "运行模式 `2=手动锁定当前相位`,`3=手动全红`,`4=手动黄闪`,`5=手动关灯`,`6=全红闪控制`,`7=全黄控制`,`8=正常按计划运行`,`9=手动步进控制`,`10=自动步进控制`,`11=恢复时间表`,`255=临时控制方案`")
private String runMode;
@ApiModelProperty(value = "调用IP")
private String ip;
@ApiModelProperty(value = "请求时间")
private String queryTime;
@ApiModelProperty(value = "灯态时间")
private String lampTime;
@ApiModelProperty(value = "数据插入时间")
private Date insertTime;
@ApiModelProperty(value = "请求次数,每五秒请求一次,请求两分钟24次")
private Integer updateCount = 0;
@ApiModelProperty(value = "更新时间")
private Date update_time;
} }
...@@ -4,7 +4,7 @@ import com.baomidou.mybatisplus.annotation.TableName; ...@@ -4,7 +4,7 @@ import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import java.sql.Date; import java.util.Date;
/** /**
* @ClassName CrossSchemeOptLog * @ClassName CrossSchemeOptLog
...@@ -50,6 +50,8 @@ public class SignalCommandPO { ...@@ -50,6 +50,8 @@ public class SignalCommandPO {
private Integer updateCount = 0; private Integer updateCount = 0;
@ApiModelProperty(value = "更新时间") @ApiModelProperty(value = "更新时间")
private Date update_time; private Date updateTime;
@ApiModelProperty(value = "在线状态 0-离线 1-在线")
private Integer status;
} }
...@@ -3,6 +3,8 @@ package net.wanji.databus.dao.mapper; ...@@ -3,6 +3,8 @@ package net.wanji.databus.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import net.wanji.databus.dao.entity.SignalCommandLogPO; import net.wanji.databus.dao.entity.SignalCommandLogPO;
import java.util.List;
/** /**
* @ClassName SignalCommandLogPOMapper * @ClassName SignalCommandLogPOMapper
* @Description SignalCommandLogPOMapper * @Description SignalCommandLogPOMapper
...@@ -10,4 +12,8 @@ import net.wanji.databus.dao.entity.SignalCommandLogPO; ...@@ -10,4 +12,8 @@ import net.wanji.databus.dao.entity.SignalCommandLogPO;
* @Date 2024/12/23 15:23 * @Date 2024/12/23 15:23
*/ */
public interface SignalCommandLogPOMapper extends BaseMapper<SignalCommandLogPO> { public interface SignalCommandLogPOMapper extends BaseMapper<SignalCommandLogPO> {
void insertBatch(List<SignalCommandLogPO> list);
void updateBatch(List<SignalCommandLogPO> list);
} }
...@@ -3,6 +3,8 @@ package net.wanji.databus.dao.mapper; ...@@ -3,6 +3,8 @@ package net.wanji.databus.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import net.wanji.databus.dao.entity.SignalCommandPO; import net.wanji.databus.dao.entity.SignalCommandPO;
import java.util.List;
/** /**
* @ClassName SignalCommandLogPOMapper * @ClassName SignalCommandLogPOMapper
* @Description SignalCommandLogPOMapper * @Description SignalCommandLogPOMapper
...@@ -10,4 +12,7 @@ import net.wanji.databus.dao.entity.SignalCommandPO; ...@@ -10,4 +12,7 @@ import net.wanji.databus.dao.entity.SignalCommandPO;
* @Date 2024/12/30 11:23 * @Date 2024/12/30 11:23
*/ */
public interface SignalCommandPOMapper extends BaseMapper<SignalCommandPO> { public interface SignalCommandPOMapper extends BaseMapper<SignalCommandPO> {
void insertBatch(List<SignalCommandPO> list);
void updateBatch(List<SignalCommandPO> list);
} }
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="net.wanji.databus.dao.mapper.SignalCommandLogPOMapper">
<!-- 批量新增数据 -->
<insert id="insertBatch">
insert into t_signal_command_log(cross_id, status, run_mode, command_result, insert_time)
values
<foreach collection="list" item="entity" separator=",">
(#{entity.crossId}, #{entity.status}, #{entity.runMode}, #{entity.commandResult}, #{entity.insertTime})
</foreach>
</insert>
<!-- 批量更新 -->
<update id="updateBatch" parameterType="java.util.List">
<foreach collection="list" item="item" index="index" open="" separator=";" close="">
UPDATE t_signal_command_log
<set>
<if test="item.commandResult != null">command_result = #{item.commandResult},</if>
<if test="item.runMode != null">run_mode = #{item.runMode},</if>
<if test="item.updateTime != null">update_time = #{item.updateTime},</if>
<if test="item.status != null">status = #{item.status}</if>
</set>
WHERE cross_id = #{item.crossId}
</foreach>
</update>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="net.wanji.databus.dao.mapper.SignalCommandPOMapper">
<!-- 批量新增数据 -->
<insert id="insertBatch">
insert into t_signal_command(cross_id, status, run_mode, command_result, insert_time)
values
<foreach collection="list" item="entity" separator=",">
(#{entity.crossId}, #{entity.status}, #{entity.runMode}, #{entity.commandResult}, #{entity.insertTime})
</foreach>
</insert>
<!-- 批量更新 -->
<update id="updateBatch" parameterType="java.util.List">
<foreach collection="list" item="item" index="index" open="" separator=";" close="">
UPDATE t_signal_command
<set>
<if test="item.commandResult != null">command_result = #{item.commandResult},</if>
<if test="item.runMode != null">run_mode = #{item.runMode},</if>
<if test="item.updateTime != null">update_time = #{item.updateTime},</if>
<if test="item.status != null">status = #{item.status}</if>
</set>
WHERE cross_id = #{item.crossId}
</foreach>
</update>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment