Commit 05d4c549 authored by duanruiming's avatar duanruiming

[add] 实时数据处理

parent 2a58f6fb
......@@ -10,9 +10,11 @@ import net.wanji.datacenter.pojo.vo.GreenBeltOptInfoVO;
import net.wanji.datacenter.service.GreenBeltInfoService;
import net.wanji.datacenter.util.RedisUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
......@@ -32,8 +34,9 @@ public class GreenBeltDataConsumerHandler implements KafkaListenerErrorHandler {
private RedisUtils redisUtils;
@KafkaListener(topics = {"green_belt_info_data"}, groupId = "group2", containerFactory = "kafkaListenerContainerFactory2")
public void receiveGreenBeltInfoData(String message) throws Exception {
@KafkaListener(topics = {"${kafka-consumer.greenBeltInfoDataTopic}"}, groupId = "group1")
public void receiveGreenBeltInfoData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String message = String.valueOf(record.value());
if (StringUtils.isNotBlank(message)) {
GreenBeltOptInfoVO greenBeltOptInfoVO = greenBeltInfoService.convertData(message);
greenBeltInfoService.save(greenBeltOptInfoVO);
......@@ -44,6 +47,7 @@ public class GreenBeltDataConsumerHandler implements KafkaListenerErrorHandler {
String chartKey = Constants.GREEN_ID_OPT_CHART_KEY.concat(greenBeltOptInfoVO.getGreenId());
redisUtils.set(chartKey, JacksonUtils.getInstance().writeValueAsString(greenBeltChartVO));
}
acknowledgment.acknowledge();
}
......
......@@ -31,37 +31,37 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler {
public static final Map<String, List<CrossSnapshotDataDTO>> crossSnapshotDataMap = new ConcurrentHashMap<>();
@KafkaListener(topics = {"${kafka-consumer.lanePeriodicDataTopic}"}, groupId = "group")
@KafkaListener(topics = {"${kafka-consumer.lanePeriodicDataTopic}"}, groupId = "group1")
public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value());
try {
dataProcessService.laneSave(originalData);
} catch (Exception e) {
log.error("车道周期实时数据转换异常", e);
log.error("车道Lane周期实时数据转换异常", e);
throw new Exception();
}
acknowledgment.acknowledge();
}
@KafkaListener(topics = {"${kafka-consumer.crossPeriodicDataTopic}"}, groupId = "group")
@KafkaListener(topics = {"${kafka-consumer.crossPeriodicDataTopic}"}, groupId = "group1")
public void receiveCrossPeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value());
try {
dataProcessService.crossSave(originalData);
} catch (Exception e) {
log.error("车道周期实时数据转换异常", e);
log.error("路口Cross实时数据转换异常", e);
throw new Exception();
}
acknowledgment.acknowledge();
}
@KafkaListener(topics = {"${kafka-consumer.dirPeriodicDataTopic}"}, groupId = "group")
@KafkaListener(topics = {"${kafka-consumer.dirPeriodicDataTopic}"}, groupId = "group1")
public void receiveDirPeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value());
try {
dataProcessService.dirSave(originalData);
} catch (Exception e) {
log.error("车道周期实时数据转换异常", e);
log.error("方向Dir周期实时数据转换异常", e);
throw new Exception();
}
acknowledgment.acknowledge();
......@@ -73,7 +73,7 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler {
try {
dataProcessService.turnSave(originalData);
} catch (Exception e) {
log.error("车道周期实时数据转换异常", e);
log.error("转向Turn周期实时数据转换异常", e);
throw new Exception();
}
acknowledgment.acknowledge();
......
package net.wanji.datacenter.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import net.wanji.datacenter.pojo.entity.CrossStrategyResultEntity;
/**
* @author duanruiming
* @date 2024/11/23 16:25
*/
public interface CrossStrategyResultMapper extends BaseMapper<CrossStrategyResultEntity> {
}
package net.wanji.datacenter.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import net.wanji.datacenter.pojo.StrategyGreenOptHistEntity;
import net.wanji.datacenter.pojo.entity.StrategyGreenOptHistEntity;
/**
* @author duanruiming
......
package net.wanji.datacenter.pojo.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* @author duanruiming
* @date 2024/11/23 14:49
* @description 神思失衡算法结果
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@SuppressWarnings("all")
public class ImbalanceAlgorithmDTO {
// 万集路口编号
private String platformId;
private String crossId;
private String crossName;
// 当前策略名称 0:无策略 1:绿灯空放 2:失衡 3:溢出
private int currentAlgo;
// 策略请求时间(格式:yyyy-MM-dd HH:mm:ss)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date requestTime;
// 策略下发时间(格式:yyyy-MM-dd HH:mm:ss)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date issueTime;
// 发是否成功(200成功,其他失败)
private int responseCode;
// 配时信息
private List<Integer> timingPlan;
// 入库时间
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date insertTime;
private int rtnType;
// 延长时间(当操作为2时有值)
private int extendTime;
// 操作名称(0,不操作;1,切换相位;2延长相位)
private int blockRegion;
// 溢出类型(0:未溢出 1:即将溢出 2:路口溢出 3:溢停)
private int blockType;
// 当前相位剩余时长
private int countDown;
// 实时数据时间戳
private String videoStamp;
// 灯态数据时间戳
private String signalMachineStamp;
// 当前相位控制车流方向
private String controlDir;
private Integer code;
private String msg;
// 方案具体信息
private SchemeData data;
@Data
public static class SchemeData {
@JsonProperty("cycle")
private int cycle;
@JsonProperty("green_time")
private List<Integer> greenTime;
@JsonProperty("cross_id")
private String crossId;
@JsonProperty("control_id")
private Integer controlId;
@JsonProperty("provider")
private String provider;
@JsonProperty("plan_no")
private String planNo;
@JsonProperty("offset")
private Integer offset;
@JsonProperty("ring_cnt")
private Integer ringCnt;
@JsonProperty("ring_set")
private Map<String, Map<String, List<Integer>>> ringSet;
@JsonProperty("seq_id")
private List<Integer> seqId;
@JsonProperty("red_time")
private List<Integer> redTime;
@JsonProperty("yellow_time")
private List<Integer> yellowTime;
@JsonProperty("green_flash")
private List<Integer> green_flash;
@JsonProperty("person_flash")
private List<Integer> person_flash;
@JsonProperty("min_green")
private List<Integer> min_green;
@JsonProperty("max_green")
private List<Integer> max_green;
@JsonProperty("direction")
private List<Integer[][]> direction;
}
}
package net.wanji.datacenter.pojo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
/**
* @author duanruiming
* @date 2024/11/23 16:17
*/
@Data
@TableName("t_strategy_cross_result")
public class CrossStrategyResultEntity {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
@TableField("cross_id")
private String crossId;
@TableField("cross_name")
private String crossName;
@TableField("current_algo")
private Integer currentAlgo;
@TableField("request_time")
private Date requestTime;
@TableField("issue_time")
private Date issueTime;
@TableField("response_code")
private Integer responseCode;
@TableField("timing_plan")
private String timingPlan;
@TableField("insert_time")
private Date insertTime;
@TableField("rtn_type")
private Integer rtnType;
@TableField("extend_time")
private Integer extendTime;
@TableField("block_region")
private Integer blockRegion;
@TableField("block_type")
private Integer blockType;
@TableField("count_down")
private Integer countDown;
@TableField("video_stamp")
private String videoStamp;
@TableField("signal_machine_stamp")
private String signalMachineStamp;
@TableField("control_dir")
private String controlDir;
@TableField("data")
private String data;
}
package net.wanji.datacenter.pojo;
package net.wanji.datacenter.pojo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
......
package net.wanji.datacenter.service;
import net.wanji.datacenter.pojo.dto.ImbalanceAlgorithmDTO;
/**
* @author duanruiming
* @date 2024/11/23 16:12
*/
public interface CrossStrategyOptService {
/**
* 溢出指标存储
*/
void save(ImbalanceAlgorithmDTO imbalanceAlgorithmDTO);
}
package net.wanji.datacenter.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.datacenter.mapper.CrossStrategyResultMapper;
import net.wanji.datacenter.pojo.dto.ImbalanceAlgorithmDTO;
import net.wanji.datacenter.pojo.entity.CrossStrategyResultEntity;
import net.wanji.datacenter.service.CrossStrategyOptService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author duanruiming
* @date 2024/11/23 16:14
*/
@Service
public class CrossStrategyOptServiceImpl implements CrossStrategyOptService {
@Resource
private CrossStrategyResultMapper crossStrategyResultMapper;
@Override
public void save(ImbalanceAlgorithmDTO dto) {
try {
ObjectMapper mapper = JacksonUtils.getInstance();
CrossStrategyResultEntity crossStrategyResultEntity = new CrossStrategyResultEntity();
crossStrategyResultEntity.setCrossId(dto.getPlatformId());
crossStrategyResultEntity.setCrossName(dto.getCrossName());
crossStrategyResultEntity.setCurrentAlgo(dto.getCurrentAlgo());
crossStrategyResultEntity.setRequestTime(dto.getRequestTime());
crossStrategyResultEntity.setIssueTime(dto.getIssueTime());
crossStrategyResultEntity.setResponseCode(dto.getResponseCode());
crossStrategyResultEntity.setTimingPlan(mapper.writeValueAsString(dto.getTimingPlan()));
crossStrategyResultEntity.setInsertTime(dto.getInsertTime());
crossStrategyResultEntity.setRtnType(dto.getRtnType());
crossStrategyResultEntity.setExtendTime(dto.getExtendTime());
crossStrategyResultEntity.setBlockRegion(dto.getBlockRegion());
crossStrategyResultEntity.setBlockType(dto.getBlockType());
crossStrategyResultEntity.setCountDown(dto.getCountDown());
crossStrategyResultEntity.setVideoStamp(dto.getVideoStamp());
crossStrategyResultEntity.setSignalMachineStamp(dto.getSignalMachineStamp());
crossStrategyResultEntity.setControlDir(dto.getControlDir());
crossStrategyResultEntity.setData(mapper.writeValueAsString(dto.getData()));
crossStrategyResultMapper.insert(crossStrategyResultEntity);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
......@@ -123,6 +123,7 @@ public class DataProcessServiceImpl implements DataProcessService {
String timeStamp = crossPeriodDataDTO.getTimeStamp();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
List<CrossDataRealtimePO> eventList = crossPeriodDataDTO.getEventList();
CrossDataRealtimePO crossDataRealtimePO = new CrossDataRealtimePO();
if (!CollectionUtils.isEmpty(eventList)) {
List<CrossDataRealtimePO> crossDataRealtimePOS = eventList;
crossDataRealtimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList()));
......
......@@ -5,8 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.datacenter.common.Constants;
import net.wanji.datacenter.mapper.StrategyGreenOptHistMapper;
import net.wanji.datacenter.pojo.StrategyGreenOptHistEntity;
import net.wanji.datacenter.pojo.dto.GreenBeltKafkaDTO;
import net.wanji.datacenter.pojo.entity.StrategyGreenOptHistEntity;
import net.wanji.datacenter.pojo.vo.GreenBeltChartVO;
import net.wanji.datacenter.pojo.vo.GreenBeltOptInfoVO;
import net.wanji.datacenter.service.GreenBeltInfoService;
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="net.wanji.datacenter.mapper.CrossStrategyResultMapper">
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment