Commit 0fbdb081 authored by duanruiming's avatar duanruiming

[update] 事件数据优化

parent be064f1b
package net.wanji.datacenter.common;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author duanruiming
* @date 2024/11/26 17:59
*/
@Getter
@AllArgsConstructor
public enum PhaseEmptyEnum {
EXT_NS(-100, "1,5", "2"),
EXT_EW(-101, "3,7", "2"),
EXT_NS_L(-102, "1,5", "1"),
EXT_EW_L(-103, "3,7", "1"),
EXT_N2S(-110, "5", "2"),
EXT_E2W(-112, "7", "2"),
EXT_S2N(-114, "1", "2"),
EXT_W2E(-116, "3", "2"),
EXT_N2S_L(-120, "5", "1"),
EXT_E2W_L(-122, "7", "1"),
EXT_S2N_L(-124, "1", "1"),
EXT_W2E_L(-126, "3", "1"),
//EXT_CUR(-150, "3", "1"),
AAAA(0,"", "");
private final Integer code;
/** 车流向 */
private final String dir;
private final String turn;
// EXT_CUR = -150 # 延长当前方向的绿灯
// SKIP_NS = -200 # 南北方向直行绿灯步进到下一个相位
// SKIP_EW = -201 # 东西方向直行绿灯步进到下一个相位
// SKIP_NS_L = -202 # 南北方向左转绿灯步进到下一个相位
// SKIP_EW_L = -203 # 东西方向左转绿灯步进到下一个相位
// SKIP_N2S = -210 # 北向南直行绿灯步进到下一个相位
// SKIP_E2W = -212 # 东向西直行绿灯步进到下一个相位
// SKIP_S2N = -214 # 南向北直行绿灯步进到下一个相位
// SKIP_W2E = -216 # 西向东直行绿灯步进到下一个相位
// SKIP_N2S_L = -220 # 北向南左转绿灯步进到下一个相位
// SKIP_E2W_L = -222 # 东向西左转绿灯步进到下一个相位
// SKIP_S2N_L = -224 # 南向北左转绿灯步进到下一个相位
// SKIP_W2E_L = -226 # 西向东左转绿灯步进到下一个相位
// SKIP_CUR = -250 # 当前绿灯步进到下一个相位
// NORMAL = 0 # 程序正常运行,不对信号机做调整
// VID_DELAY = 1 # 视频延迟
// VID_INVALID = 2 # 视频解析的结果中部分值缺失(比如没有行人的检测结果)
//NO_LIGHT_STATUS = 10 # 获取灯态信息失败
// NORMAL_NO_GREEN = 11 # 当前时刻,路口没有绿灯可通行的方向,所以不对信号机做调整
// PHASE_ERR = 12 # 获取或解析配时方案失败
// VID_LIGHT_NOT_MATCH = 20 # 视频检测的红绿灯灯态与从交通大脑获取的信号机灯态不一致
// CODE_ERR = 30 # 代码出现异常
// ABNORMAL_UNLOCK_SUCC = 40 # 检测到灯态异常后,自动解锁成功
// ABNORMAL_UNLOCK_FAIL = 41 # 检测到灯态异常后,自动解锁失败
// TAKE_LONG_BEFORE_EXT = 60 # 在即将给信号机下发延长指令前检测到代码运行时间太长
// TAKE_LONG_BEFORE_SKIP = 61 # 在即将给信号机下发步进指令前检测到代码运行时间太长
// MIN_GREEN = 70 # 最小绿
// ABNORMAL_COUNTDOWN = 9999 # 从灯态中获取到的绿灯倒计时异常(一般是由于信号机现在受临时方案控制,比如)
}
......@@ -30,6 +30,26 @@ public class CrossStrategyOptConsumerHandler implements KafkaListenerErrorHandle
@Resource
private CrossStrategyOptService crossStrategyOptService;
/**
* 相位空放优化指标周期
* @param record
* @param acknowledgment
* @throws Exception
*/
//@KafkaListener(topics = {"${kafka-consumer.phaseEmptyTopic}"}, groupId = "group")
public void receivePhaseEmptyData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
try {
String message = String.valueOf(record.value());
if (StringUtils.isNotBlank(message)) {
crossStrategyOptService.savePhaseEmpty(message);
}
} catch (Exception e) {
log.error("相位空放phaseEmpty数据处理异常;", e);
throw new Exception(e);
}
acknowledgment.acknowledge();
}
/**
* 失衡算法结果
*
......
package net.wanji.datacenter.kafka;
import lombok.extern.slf4j.Slf4j;
import net.wanji.datacenter.service.EventDataService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author duanruiming
* @date 2024/11/26 15:33
*/
@Slf4j
@Component
public class EventDataConsumerHandler implements KafkaListenerErrorHandler {
@Resource
private EventDataService eventDataService;
@KafkaListener(topics = {"${kafka-consumer.eventDataTopic}"}, groupId = "group")
public void receiveEventData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
try {
String message = String.valueOf(record.value());
if (StringUtils.isNotBlank(message)) {
eventDataService.save(message);
}
} catch (Exception e) {
log.error("事件EventData数据处理异常;", e);
throw new Exception(e);
}
acknowledgment.acknowledge();
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
return new Object();
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?>
consumer) {
String errorMessage = String.format("监听主题:%s,消费者详情:%s,异常信息:%s,消息详情:%s",
consumer.listTopics(), consumer.groupMetadata(), exception, message);
log.error(errorMessage);
return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
}
}
......@@ -28,7 +28,7 @@ public class GreenBeltDataConsumerHandler implements KafkaListenerErrorHandler {
private GreenBeltInfoService greenBeltInfoService;
@KafkaListener(topics = {"${kafka-consumer.greenBeltInfoDataTopic}"}, groupId = "group-blet")
@KafkaListener(topics = {"${kafka-consumer.greenBeltInfoDataTopic}"}, groupId = "group-blet-new ")
public void receiveGreenBeltInfoData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
try {
String message = String.valueOf(record.value());
......
package net.wanji.datacenter.mapper;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import net.wanji.datacenter.pojo.po.EventInfoPO;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
import java.util.List;
......@@ -11,7 +13,7 @@ import java.util.List;
* @date 2024/3/4 10:04
*/
@Repository
@DS("holo")
public interface EventInfoMapper {
List<EventInfoPO> selectHistEvent();
void insertOne(EventInfoPO eventInfoPO);
}
package net.wanji.datacenter.pojo.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import java.util.Date;
/**
* @author duanruiming
* @date 2024/11/26 15:41
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class EventDataDTO {
private String eventId;
private String crossId;
private String eventType;
private String direction;
private Double eventLongitude;
private Double eventLatitude;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date eventStartTime;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date eventEndTime;
private String eventLevel;
}
package net.wanji.datacenter.pojo.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
/**
* @author duanruiming
* @date 2024/11/26 16:58
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class PhaseEmptyDataDTO {
private String msg;
private Idx idx;
@Data
public static class Idx {
private String crossId;
/** 操作时间 */
private Integer curStamp;
private Integer rtnType;
private String v1;
private String v2;
private String v3;
private String v4;
private String v5;
private Integer videoStamp;
/** 信号机时间 */
private Integer signalMachineStamp;
private Integer createStamp;
private String green_flow;
private Integer countDown;
private String green_info;
private Integer planId;
private Integer controlMode;
private Integer crossRealStatus;
private String ctrId;
}
}
......@@ -89,4 +89,6 @@ public class EventInfoPO {
/** */
@ApiModelProperty(name = "",notes = "")
private String eventId ;
@ApiModelProperty(name = "分区字段",notes = "")
private Integer dt ;
}
......@@ -12,4 +12,6 @@ public interface CrossStrategyOptService {
* 溢出指标存储
*/
void save(ImbalanceAlgorithmDTO imbalanceAlgorithmDTO);
void savePhaseEmpty(String message) throws Exception;
}
package net.wanji.datacenter.service;
/**
* @author duanruiming
* @date 2024/11/26 15:43
*/
public interface EventDataService {
void save(String originData);
}
package net.wanji.datacenter.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.datacenter.constant.EventAbnormalEnum;
import net.wanji.datacenter.mapper.EventInfoMapper;
import net.wanji.datacenter.pojo.dto.EventDataDTO;
import net.wanji.datacenter.pojo.po.EventInfoPO;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author duanruiming
* @date 2024/11/26 15:43
*/
@Service
@Slf4j
public class EventDataServiceImpl implements EventDataService {
@Resource
private EventInfoMapper eventInfoMapper;
@Transactional(rollbackFor = Exception.class)
@Override
public void save(String originData) {
try {
ObjectMapper mapper = JacksonUtils.getInstance();
EventDataDTO eventDataDTO = mapper.readValue(originData, EventDataDTO.class);
EventInfoPO eventInfoPO = new EventInfoPO();
String eventTypeName = eventDataDTO.getEventType();
if (EventAbnormalEnum.PHASE_EMPTY.getDesc().contains(eventTypeName)) {
eventInfoPO.setType(EventAbnormalEnum.PHASE_EMPTY.getType());
}
if (EventAbnormalEnum.CROSS_UNBALANCE.getDesc().contains(eventTypeName)) {
eventInfoPO.setType(EventAbnormalEnum.CROSS_UNBALANCE.getType());
}
if (EventAbnormalEnum.CROSS_OVERFLOW.getDesc().contains(eventTypeName)) {
eventInfoPO.setType(EventAbnormalEnum.CROSS_OVERFLOW.getType());
}
Date eventStartTime = eventDataDTO.getEventStartTime();
eventInfoPO.setStartTime(eventStartTime);
eventInfoPO.setEndTime(eventDataDTO.getEventEndTime());
eventInfoPO.setEventSerialNumber(eventDataDTO.getEventId());
eventInfoPO.setCrossId(eventDataDTO.getCrossId());
SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
String dateStr = format.format(eventStartTime);
eventInfoPO.setDt(Integer.valueOf(dateStr));
eventInfoPO.setLat(eventInfoPO.getLat());
eventInfoPO.setLng(eventInfoPO.getLng());
eventInfoPO.setCategory(eventDataDTO.getEventLevel());
eventInfoPO.setPlaceDesc(eventDataDTO.getEventType() + "方向" + eventDataDTO.getDirection() + 1);
eventInfoMapper.insertOne(eventInfoPO);
} catch (Exception e) {
log.error("事件保存异常:", e);
throw new RuntimeException(e);
}
}
}
package net.wanji.datacenter.service.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.datacenter.mapper.CrossStrategyResultMapper;
import net.wanji.datacenter.pojo.dto.ImbalanceAlgorithmDTO;
import net.wanji.datacenter.pojo.dto.PhaseEmptyDataDTO;
import net.wanji.datacenter.pojo.entity.CrossStrategyResultEntity;
import net.wanji.datacenter.service.CrossStrategyOptService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.Objects;
/**
* @author duanruiming
......@@ -21,6 +25,39 @@ public class CrossStrategyOptServiceImpl implements CrossStrategyOptService {
@Resource
private CrossStrategyResultMapper crossStrategyResultMapper;
@Transactional(rollbackFor = Exception.class)
@Override
public void savePhaseEmpty(String message) throws Exception {
try {
ObjectMapper mapper = JacksonUtils.getInstance();
PhaseEmptyDataDTO phaseEmptyDataDTO = mapper.readValue(message, PhaseEmptyDataDTO.class);
if (Objects.nonNull(phaseEmptyDataDTO)) {
PhaseEmptyDataDTO.Idx idx = phaseEmptyDataDTO.getIdx();
String crossId = idx.getCtrId();
Integer curStamp = idx.getCurStamp();
Integer countDown = idx.getCountDown();
Integer rtnType = idx.getRtnType();
Integer signalMachineStamp = idx.getSignalMachineStamp();
Integer createStamp = idx.getCreateStamp();
Date date = new Date(curStamp);
CrossStrategyResultEntity entity = new CrossStrategyResultEntity();
entity.setCrossId(crossId);
entity.setSignalMachineStamp(String.valueOf(signalMachineStamp));
// 绿灯空放
entity.setCurrentAlgo(1);
entity.setRequestTime(date);
entity.setIssueTime(date);
entity.setInsertTime(new Date(createStamp));
entity.setCountDown(countDown);
entity.setExtendTime(countDown);
crossStrategyResultMapper.insert(entity);
}
} catch (JsonProcessingException e) {
throw new Exception(e);
}
}
@Transactional(rollbackFor = Exception.class)
@Override
public void save(ImbalanceAlgorithmDTO dto) {
......
......@@ -2,10 +2,46 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="net.wanji.datacenter.mapper.EventInfoMapper">
<!-- 通用查询映射结果 -->
<resultMap id="EventInfoMap" type="net.wanji.datacenter.pojo.po.EventInfoPO">
<id column="oid" property="oid"/>
<result column="plate_no" property="plateNo"/>
<result column="object_type" property="objectType"/>
<result column="confidence" property="confidence"/>
<result column="detect_time" property="detectTime"/>
<result column="grade" property="grade"/>
<result column="place_desc" property="placeDesc"/>
<result column="lng" property="lng"/>
<result column="lat" property="lat"/>
<result column="category" property="category"/>
<result column="type" property="type"/>
<result column="start_time" property="startTime"/>
<result column="end_time" property="endTime"/>
<result column="duration" property="duration"/>
<result column="source" property="source"/>
<result column="ruksj" property="ruksj"/>
<result column="lane_id" property="laneId"/>
<result column="rid" property="rid"/>
<result column="in_dir" property="inDir"/>
<result column="segment_id" property="segmentId"/>
<result column="cross_id" property="crossId"/>
<result column="camera_channels" property="cameraChannels"/>
<result column="event_serial_number" property="eventSerialNumber"/>
<result column="global_id" property="globalId"/>
<result column="event_id" property="eventId"/>
<result column="event_label" property="eventLabel"/>
<result column="category_label" property="categoryLabel"/>
<result column="cross_name" property="crossName"/>
<result column="rid_name" property="ridName"/>
<result column="extend" property="extend"/>
<result column="video_urls" property="videoUrls"/>
<result column="dt" property="dt"/>
</resultMap>
<sql id="Base_Column_List">
oid,plate_no,object_type,confidence,detect_time,grade,place_desc,lng,lat,category,type,start_time,end_time,
duration,source,ruksj,lane_id,rid,segment_id,cross_id,camera_oid,event_serial_number,data_status,global_id,
station_id,event_id
station_id,event_id,dt
</sql>
<select id="selectHistEvent" resultType="net.wanji.datacenter.pojo.po.EventInfoPO">
......@@ -15,4 +51,57 @@
</select>
<!-- 新增表t_event_info信息 -->
<insert id="insertOne">
INSERT INTO t_event_info ( oid
, plate_no
, object_type
, confidence
, detect_time
, grade
, place_desc
, lng
, lat
, category
, type
, start_time
, end_time
, duration
, source
, ruksj
, lane_id
, rid
, segment_id
, cross_id
, dt
, event_serial_number)
VALUES ( #{oid}
, #{plateNo}
, #{objectType}
, #{confidence}
, #{detectTime}
, #{grade}
, #{placeDesc}
, #{lng}
, #{lat}
, #{category}
, #{type}
, #{startTime}
, #{endTime}
, #{duration}
, #{source}
, #{ruksj}
, #{laneId}
, #{rid}
, #{segmentId}
, #{crossId}
, #{dt}
, #{eventSerialNumber}
)
ON DUPLICATE KEY UPDATE
end_time = values(end_time)
</insert>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment