Commit c6314d4e authored by duanruiming's avatar duanruiming

[add] 路口告警推送kafka

parent 68e7a695
package net.wanji.utc.dto;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* @author duanruiming
* @date 2024/12/30 20:31
*/
@Data
public class LightStatusAlarmDTO {
@ApiModelProperty(value = "路口编号")
private String crossId;
@ApiModelProperty(value = "返回信息")
private String commandMsg;
@ApiModelProperty(value = "优化结果:1-成功 2-失败 3-自动任务解锁失败")
private Integer commandResult;
@ApiModelProperty(value = "自动任务解锁失败次数")
private Integer count;
@ApiModelProperty(value = "在线状态 0-离线 1-在线")
private Integer status;
@ApiModelProperty(value = "运行模式 `2=手动锁定当前相位`,`3=手动全红`,`4=手动黄闪`,`5=手动关灯`," +
"`6=全红闪控制`,`7=全黄控制`,`8=正常按计划运行`,`9=手动步进控制`,`10=自动步进控制`,`11=恢复时间表`," +
"`20=单点自适应控制`,`21=绿波协调`,`31=手动控制`,`255=临时控制方案`")
private String runMode;
@ApiModelProperty(value = "信号机时间")
private String machineTimestamp;
@ApiModelProperty(value = "请求操作时间")
private Date requestTime;
@ApiModelProperty(value = "请求操作时间")
private Date dbTime;
}
...@@ -3,11 +3,17 @@ package net.wanji.utc.task; ...@@ -3,11 +3,17 @@ package net.wanji.utc.task;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils; import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.databus.dao.entity.SignalCommandLogPO; import net.wanji.databus.dao.entity.SignalCommandLogPO;
import net.wanji.databus.dao.entity.SignalCommandPO; import net.wanji.databus.dao.entity.SignalCommandPO;
import net.wanji.databus.dao.mapper.SignalCommandLogPOMapper; import net.wanji.databus.dao.mapper.SignalCommandLogPOMapper;
import net.wanji.databus.dao.mapper.SignalCommandPOMapper; import net.wanji.databus.dao.mapper.SignalCommandPOMapper;
import net.wanji.utc.common.typeenum.DateStyle;
import net.wanji.utc.dto.LightStatusAlarmDTO;
import net.wanji.utc.kafka.ProducerHandler;
import net.wanji.utc.util.DateUtils;
import net.wanji.utc.util.StringUtils; import net.wanji.utc.util.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -31,71 +37,110 @@ public class SignalCommandSyncTask { ...@@ -31,71 +37,110 @@ public class SignalCommandSyncTask {
private SignalCommandPOMapper signalCommandPOMapper; private SignalCommandPOMapper signalCommandPOMapper;
@Resource @Resource
SignalCommandLogPOMapper signalCommandLogPOMapper; SignalCommandLogPOMapper signalCommandLogPOMapper;
@Autowired
private ProducerHandler producerHandler;
@Resource(name = "commonThreadPoolExecutor") @Resource(name = "commonThreadPoolExecutor")
private ThreadPoolTaskExecutor commonThreadPoolExecutor; private ThreadPoolTaskExecutor commonThreadPoolExecutor;
@Scheduled(fixedRate = 5 * 1000, initialDelay = 30 * 1000) @Scheduled(fixedRate = 5 * 1000, initialDelay = 30 * 1000)
public void sync() { public void sync() {
ConcurrentHashMap<String, SignalCommandPO> crossLightsStatusMap = SignalStatusTask.crossLightsStatusMap; try {
LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper(); ConcurrentHashMap<String, SignalCommandPO> crossLightsStatusMap = SignalStatusTask.crossLightsStatusMap;
List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper); LambdaQueryWrapper<SignalCommandPO> lambdaQueryWrapper = new LambdaQueryWrapper();
if (Objects.nonNull(crossLightsStatusMap) && !crossLightsStatusMap.isEmpty()) { List<SignalCommandPO> signalCommandPOS = signalCommandPOMapper.selectList(lambdaQueryWrapper);
List<SignalCommandPO> insertList = new ArrayList<>(); sendAlarmKafka(signalCommandPOS);
List<SignalCommandPO> updateList = new ArrayList<>(); if (Objects.nonNull(crossLightsStatusMap) && !crossLightsStatusMap.isEmpty()) {
for (Map.Entry<String, SignalCommandPO> entry : crossLightsStatusMap.entrySet()) { List<SignalCommandPO> insertList = new ArrayList<>();
try { List<SignalCommandPO> updateList = new ArrayList<>();
String crossId = entry.getKey(); for (Map.Entry<String, SignalCommandPO> entry : crossLightsStatusMap.entrySet()) {
SignalCommandPO statusRunModelCache = entry.getValue(); try {
Integer statusCache = statusRunModelCache.getStatus(); String crossId = entry.getKey();
String runModeCache = statusRunModelCache.getRunMode(); SignalCommandPO statusRunModelCache = entry.getValue();
if (!CollectionUtils.isEmpty(signalCommandPOS)) { Integer statusCache = statusRunModelCache.getStatus();
Map<String, SignalCommandPO> command4DBMap = signalCommandPOS.stream().collect(Collectors.toMap(SignalCommandPO::getCrossId, signalCommandPO -> signalCommandPO)); String runModeCache = statusRunModelCache.getRunMode();
if (command4DBMap.containsKey(crossId)) { String lampTime = statusRunModelCache.getLampTime();
SignalCommandPO updatePO = command4DBMap.get(crossId); if (!CollectionUtils.isEmpty(signalCommandPOS)) {
Integer status4DB = updatePO.getStatus(); Map<String, SignalCommandPO> command4DBMap = signalCommandPOS.stream().collect(Collectors.toMap(SignalCommandPO::getCrossId, signalCommandPO -> signalCommandPO));
String runMode4DB = updatePO.getRunMode(); if (command4DBMap.containsKey(crossId)) {
if (Objects.equals(status4DB, statusCache) && StringUtils.equals(runMode4DB, runModeCache)) { SignalCommandPO updatePO = command4DBMap.get(crossId);
// 路口编号 状态 控制模式相同,不更新 Integer status4DB = updatePO.getStatus();
continue; String runMode4DB = updatePO.getRunMode();
if (Objects.equals(status4DB, statusCache) && StringUtils.equals(runMode4DB, runModeCache)) {
// 路口编号 状态 控制模式相同,不更新
continue;
} else {
updatePO.setStatus(statusCache);
updatePO.setRunMode(runModeCache);
updatePO.setUpdateTime(new Date());
updatePO.setLampTime(lampTime);
updateList.add(updatePO);
}
} else { } else {
updatePO.setStatus(statusCache); SignalCommandPO insertPO = new SignalCommandPO();
updatePO.setRunMode(runModeCache); insertPO.setCrossId(crossId);
updatePO.setUpdateTime(new Date()); insertPO.setStatus(statusCache);
updateList.add(updatePO); insertPO.setRunMode(runModeCache);
insertPO.setCommandResult(1);
insertPO.setUpdateCount(0);
insertPO.setInsertTime(new Date());
insertPO.setLampTime(lampTime);
insertList.add(insertPO);
} }
} else { } else {
SignalCommandPO insertPO = new SignalCommandPO(); log.error("路口灯态缓存为空:{}", crossLightsStatusMap);
insertPO.setCrossId(crossId);
insertPO.setStatus(statusCache);
insertPO.setRunMode(runModeCache);
insertPO.setCommandResult(1);
insertPO.setUpdateCount(0);
insertPO.setInsertTime(new Date());
insertList.add(insertPO);
} }
} else { } catch (Exception e) {
log.error("路口灯态缓存为空:{}", crossLightsStatusMap); log.error("路口在线状态控制模式更新失败,路口编号信息:{}", entry, e);
} }
} catch (Exception e) {
log.error("路口在线状态控制模式更新失败,路口编号信息:{}", entry, e);
} }
if (!CollectionUtils.isEmpty(insertList)) {
signalCommandPOMapper.insertBatch(insertList);
List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(insertList.size());
BeanListUtils.populateList(insertList, signalCommandLogPOS, SignalCommandLogPO.class);
signalCommandLogPOMapper.insertBatch(signalCommandLogPOS);
}
if (!CollectionUtils.isEmpty(updateList)) {
signalCommandPOMapper.updateBatch(updateList);
List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(updateList.size());
BeanListUtils.populateList(updateList, signalCommandLogPOS, SignalCommandLogPO.class);
signalCommandLogPOMapper.updateBatch(signalCommandLogPOS);
log.info("路口状态控制模式变动,数据库更新成功:{}", updateList);
}
} else {
log.error("从路口灯态缓存获取控制模式在线离线状态为空:{}", crossLightsStatusMap);
} }
if (!CollectionUtils.isEmpty(insertList)) { } catch (Exception e) {
signalCommandPOMapper.insertBatch(insertList);
List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(insertList.size()); log.error("信号机状态同步异常:", e);
BeanListUtils.populateList(insertList, signalCommandLogPOS, SignalCommandLogPO.class); }
signalCommandLogPOMapper.insertBatch(signalCommandLogPOS); }
}
if (!CollectionUtils.isEmpty(updateList)) { private void sendAlarmKafka(List<SignalCommandPO> signalCommandPOS) throws Exception {
signalCommandPOMapper.updateBatch(updateList); List<LightStatusAlarmDTO> lightStatusAlarmDTOS = new ArrayList<>();
List<SignalCommandLogPO> signalCommandLogPOS = new ArrayList<>(updateList.size()); try {
BeanListUtils.populateList(updateList, signalCommandLogPOS, SignalCommandLogPO.class); if (!CollectionUtils.isEmpty(signalCommandPOS)) {
signalCommandLogPOMapper.updateBatch(signalCommandLogPOS); for (SignalCommandPO signalCommandPO : signalCommandPOS) {
log.info("路口状态控制模式变动,数据库更新成功:{}", updateList); LightStatusAlarmDTO dto = new LightStatusAlarmDTO();
dto.setCrossId(signalCommandPO.getCrossId());
dto.setCommandMsg(signalCommandPO.getResultMessage());
dto.setCommandResult(signalCommandPO.getCommandResult());
dto.setCount(signalCommandPO.getUpdateCount());
dto.setStatus(signalCommandPO.getStatus());
dto.setRunMode(signalCommandPO.getRunMode());
dto.setMachineTimestamp(signalCommandPO.getLampTime());
Date date = DateUtils.stringToDate(signalCommandPO.getQueryTime(), DateStyle.YYYY_MM_DD_HH_MM_SS.getValue());
dto.setRequestTime(date);
dto.setDbTime(signalCommandPO.getUpdateTime());
lightStatusAlarmDTOS.add(dto);
}
} }
} else { producerHandler.send("light_status_alarm", lightStatusAlarmDTOS);
log.error("从路口灯态缓存获取控制模式在线离线状态为空:{}", crossLightsStatusMap); } catch (Exception e) {
log.error("灯态告警推送kafka失败:", e);
throw new RuntimeException(e);
} }
} }
} }
...@@ -130,26 +130,26 @@ public class SignalStatusTask { ...@@ -130,26 +130,26 @@ public class SignalStatusTask {
// 添加在线离线逻辑 // 添加在线离线逻辑
// 0-离线 1-在线 // 0-离线 1-在线
SignalCommandLogPO signalCommandLogPO = new SignalCommandLogPO(); SignalCommandLogPO signalCommandLogPO = new SignalCommandLogPO();
int status = 1; int status = 0;
String runMode = "11"; String runMode = "11";
String machineTimestamp = "";
if (!CollectionUtils.isEmpty(lightsStatusVOS)) { if (!CollectionUtils.isEmpty(lightsStatusVOS)) {
for (LightsStatusVO lightsStatusVO : lightsStatusVOS) { for (LightsStatusVO lightsStatusVO : lightsStatusVOS) {
if (Objects.nonNull(lightsStatusVO)) { if (Objects.nonNull(lightsStatusVO)) {
String machineTimestamp = lightsStatusVO.getTimeStamp(); machineTimestamp = lightsStatusVO.getTimeStamp();
long offsetTime = start - Long.valueOf(machineTimestamp); long offsetTime = start - Long.valueOf(machineTimestamp);
if (offsetTime > 1000 * 10) { if (offsetTime < 1000 * 10) {
status = 0; status = 1;
} }
runMode = lightsStatusVO.getRunMode(); runMode = lightsStatusVO.getRunMode();
} }
} }
} else {
status = 0;
} }
signalCommandLogPO.setCrossId(crossId); signalCommandLogPO.setCrossId(crossId);
signalCommandLogPO.setRunMode(runMode); signalCommandLogPO.setRunMode(runMode);
signalCommandLogPO.setStatus(status); signalCommandLogPO.setStatus(status);
signalCommandLogPO.setUpdateTime(new Date()); signalCommandLogPO.setUpdateTime(new Date());
signalCommandLogPO.setLampTime(machineTimestamp);
crossLightsStatusMap.put(crossId, signalCommandLogPO); crossLightsStatusMap.put(crossId, signalCommandLogPO);
} }
......
...@@ -4,10 +4,11 @@ ...@@ -4,10 +4,11 @@
<!-- 批量新增数据 --> <!-- 批量新增数据 -->
<insert id="insertBatch"> <insert id="insertBatch">
insert into t_signal_command_log(cross_id, status, run_mode, command_result, insert_time) insert into t_signal_command_log(cross_id, status, run_mode, command_result, insert_time, lamp_time, update_count)
values values
<foreach collection="list" item="entity" separator=","> <foreach collection="list" item="entity" separator=",">
(#{entity.crossId}, #{entity.status}, #{entity.runMode}, #{entity.commandResult}, #{entity.insertTime}) (#{entity.crossId}, #{entity.status}, #{entity.runMode}, #{entity.commandResult}, #{entity.insertTime},
#{entity.lampTime}, #{entity.updateCount})
</foreach> </foreach>
</insert> </insert>
...@@ -20,6 +21,7 @@ ...@@ -20,6 +21,7 @@
<if test="item.runMode != null">run_mode = #{item.runMode},</if> <if test="item.runMode != null">run_mode = #{item.runMode},</if>
<if test="item.updateTime != null">update_time = #{item.updateTime},</if> <if test="item.updateTime != null">update_time = #{item.updateTime},</if>
<if test="item.status != null">status = #{item.status}</if> <if test="item.status != null">status = #{item.status}</if>
<if test="item.lampTime != null">lamp_time = #{item.lampTime}</if>
</set> </set>
WHERE cross_id = #{item.crossId} WHERE cross_id = #{item.crossId}
</foreach> </foreach>
......
...@@ -4,10 +4,11 @@ ...@@ -4,10 +4,11 @@
<!-- 批量新增数据 --> <!-- 批量新增数据 -->
<insert id="insertBatch"> <insert id="insertBatch">
insert into t_signal_command(cross_id, status, run_mode, command_result, insert_time) insert into t_signal_command(cross_id, status, run_mode, command_result, insert_time, lamp_time, update_count)
values values
<foreach collection="list" item="entity" separator=","> <foreach collection="list" item="entity" separator=",">
(#{entity.crossId}, #{entity.status}, #{entity.runMode}, #{entity.commandResult}, #{entity.insertTime}) (#{entity.crossId}, #{entity.status}, #{entity.runMode}, #{entity.commandResult}, #{entity.insertTime},
#{entity.lampTime}, #{entity.updateCount})
</foreach> </foreach>
</insert> </insert>
...@@ -20,6 +21,7 @@ ...@@ -20,6 +21,7 @@
<if test="item.runMode != null">run_mode = #{item.runMode},</if> <if test="item.runMode != null">run_mode = #{item.runMode},</if>
<if test="item.updateTime != null">update_time = #{item.updateTime},</if> <if test="item.updateTime != null">update_time = #{item.updateTime},</if>
<if test="item.status != null">status = #{item.status}</if> <if test="item.status != null">status = #{item.status}</if>
<if test="item.lampTime != null">lamp_time = #{item.lampTime}</if>
</set> </set>
WHERE cross_id = #{item.crossId} WHERE cross_id = #{item.crossId}
</foreach> </foreach>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment