Commit 38a27564 authored by duanruiming's avatar duanruiming

[add] 绿波城项目周期数据处理

parent c3f12a81
...@@ -8,6 +8,7 @@ import org.springframework.stereotype.Component; ...@@ -8,6 +8,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -23,8 +24,10 @@ import java.util.stream.Collectors; ...@@ -23,8 +24,10 @@ import java.util.stream.Collectors;
public class CrossLaneInfoCache implements InitializingBean { public class CrossLaneInfoCache implements InitializingBean {
public static final Map<String, LaneInfoPO> laneInfoMap = new HashMap<>(); public static final Map<String, LaneInfoPO> laneInfoMap = new HashMap<>();
/** key crossId11, value LaneInfoPO */ /** key crossId+dir */
public static final Map<String, LaneInfoPO> crossIdLaneId2Map = new HashMap<>(); public static final Map<String,LaneInfoPO> crossIdLaneId2Map = new HashMap<>();
/** key crossId, value dir */
public static final Map<String, List<Integer>> crossLaneDirMap = new HashMap<>();
@Resource @Resource
private LaneInfoMapper laneInfoMapper; private LaneInfoMapper laneInfoMapper;
...@@ -36,13 +39,21 @@ public class CrossLaneInfoCache implements InitializingBean { ...@@ -36,13 +39,21 @@ public class CrossLaneInfoCache implements InitializingBean {
if (!CollectionUtils.isEmpty(laneInfoPOS)) { if (!CollectionUtils.isEmpty(laneInfoPOS)) {
Map<String, LaneInfoPO> crossMap = laneInfoPOS.stream().collect(Collectors.toMap(LaneInfoPO::getId, Function.identity(), (key1, key2) -> key2)); Map<String, LaneInfoPO> crossMap = laneInfoPOS.stream().collect(Collectors.toMap(LaneInfoPO::getId, Function.identity(), (key1, key2) -> key2));
laneInfoMap.putAll(crossMap); laneInfoMap.putAll(crossMap);
Map<Integer, List<LaneInfoPO>> dirLaneList = laneInfoPOS.stream().collect(Collectors.groupingBy(LaneInfoPO::getDir));
for (Map.Entry<Integer, List<LaneInfoPO>> entry : dirLaneList.entrySet()) {
Integer dir = entry.getKey();
List<LaneInfoPO> value = entry.getValue();
for (LaneInfoPO laneInfoPO : value) {
String laneInfoPOId = laneInfoPO.getId();
String laneSortId = laneInfoPOId.substring(laneInfoPOId.length() - 2);
crossIdLaneId2Map.put(crossId.concat(String.valueOf(dir)).concat(laneSortId), laneInfoPO);
}
}
List<Integer> dirList = new ArrayList<>();
for (LaneInfoPO laneInfoPO : laneInfoPOS) { for (LaneInfoPO laneInfoPO : laneInfoPOS) {
String currentCrossId = laneInfoPO.getCrossId(); dirList.add(laneInfoPO.getDir());
String laneId = laneInfoPO.getId();
String laneId2 = laneId.substring(laneId.length() - 2);
String dir = String.valueOf(laneInfoPO.getDir());
crossIdLaneId2Map.put(currentCrossId.concat(dir).concat(laneId2), laneInfoPO);
} }
crossLaneDirMap.put(crossId, dirList);
} }
} }
} }
......
...@@ -33,33 +33,54 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -33,33 +33,54 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
@KafkaListener(topics = {"${kafka-consumer.lanePeriodicDataTopic}"}, groupId = "group") @KafkaListener(topics = {"${kafka-consumer.lanePeriodicDataTopic}"}, groupId = "group")
public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
//String originalData = String.valueOf(record.value()); String originalData = String.valueOf(record.value());
String originalData = String.valueOf(laneStr);
try { try {
dataProcessService.laneSave(originalData); dataProcessService.laneSave(originalData);
} catch (Exception e) { } catch (Exception e) {
log.error("车道周期实时数据转换异常", e); log.error("车道周期实时数据转换异常", e);
throw new Exception(); throw new Exception();
} }
// 修改逻辑,将保存逻辑改为将方向,转向数据返回kafka acknowledgment.acknowledge();
//acknowledgment.acknowledge();
} }
@KafkaListener(topics = {"${kafka-consumer.crossPeriodicDataTopic}"}, groupId = "group") @KafkaListener(topics = {"${kafka-consumer.crossPeriodicDataTopic}"}, groupId = "group")
public void receiveCrossPeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveCrossPeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
//String originalData = String.valueOf(record.value()); String originalData = String.valueOf(record.value());
String originalData = String.valueOf(crossStr);
try { try {
dataProcessService.crossSave(originalData); dataProcessService.crossSave(originalData);
} catch (Exception e) { } catch (Exception e) {
log.error("车道周期实时数据转换异常", e); log.error("车道周期实时数据转换异常", e);
throw new Exception(); throw new Exception();
} }
// 修改逻辑,将保存逻辑改为将方向,转向数据返回kafka acknowledgment.acknowledge();
//acknowledgment.acknowledge(); }
@KafkaListener(topics = {"${kafka-consumer.dirPeriodicDataTopic}"}, groupId = "group")
public void receiveDirPeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value());
try {
dataProcessService.dirSave(originalData);
} catch (Exception e) {
log.error("车道周期实时数据转换异常", e);
throw new Exception();
}
acknowledgment.acknowledge();
}
@KafkaListener(topics = {"${kafka-consumer.turnPeriodicDataTopic}"}, groupId = "group")
public void receiveTurnPeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value());
try {
dataProcessService.turnSave(originalData);
} catch (Exception e) {
log.error("车道周期实时数据转换异常", e);
throw new Exception();
}
acknowledgment.acknowledge();
} }
@Override @Override
@NonNull @NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException e) { public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
...@@ -75,7 +96,4 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -75,7 +96,4 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
} }
public static final String laneStr = "{\"orgCode\":\"370102\",\"timeStamp\":\"2024-11-15 15:40:00.000\",\"eventList\":[{\"crossId\":\"13MNM0B5OE0\",\"dir\":0,\"lane_num\":1,\"entry_type\":0,\"laneId\":\"1\",\"trafficFlow\":-1,\"trafficFlowA\":-1,\"trafficFlowB\":-1,\"trafficFlowC\":-1,\"trolleyEquivalent\":-1,\"laneFlowRate\":-1.0,\"meanV\":-1.0,\"vehicleNumsRatioMean\":-1.0,\"vehicleLengthRatioMean\":-1.0,\"timeOccupancy\":-1.0,\"staticQueueLengthMax\":-1.0,\"staticQueueLengthMin\":-1.0,\"dynamicQueueLengthMax\":-1.0,\"dynamicQueueLengthMin\":-1.0,\"lightGreenStartQueueLength\":-1.0,\"lightGreenFinishQueueLength\":-1.0,\"greenLightEfficiency\":-1.0,\"laneSaturationFlowRate\":-1,\"laneCapacity\":-1,\"laneSaturation\":-1.0,\"laneNoStopRate\":-1.0,\"laneOneStopRate\":-1.0,\"laneTwoStopRate\":-1.0,\"laneThreeStopRate\":-1.0,\"meanDelay\":-1.0,\"meanStopsNumber\":-1.0,\"meanLen\":0,\"timeHeadwaySectionMean\":-1.0,\"trafficInformation\":-1,\"overflowRate\":-1.0,\"nonMotorFlow\":-1.0,\"v85\":-1.0},{\"crossId\":\"13MNM0B5OR0\",\"dir\":2,\"lane_num\":2,\"entry_type\":-1,\"laneId\":\"2\",\"trafficFlow\":-1,\"trafficFlowA\":-1,\"trafficFlowB\":-1,\"trafficFlowC\":-1,\"trolleyEquivalent\":-1,\"laneFlowRate\":-1.0,\"meanV\":-1.0,\"vehicleNumsRatioMean\":-1.0,\"vehicleLengthRatioMean\":-1.0,\"timeOccupancy\":-1.0,\"staticQueueLengthMax\":-1.0,\"staticQueueLengthMin\":-1.0,\"dynamicQueueLengthMax\":-1.0,\"dynamicQueueLengthMin\":-1.0,\"lightGreenStartQueueLength\":-1.0,\"lightGreenFinishQueueLength\":-1.0,\"greenLightEfficiency\":-1.0,\"laneSaturationFlowRate\":-1,\"laneCapacity\":-1,\"laneSaturation\":-1.0,\"laneNoStopRate\":-1.0,\"laneOneStopRate\":-1.0,\"laneTwoStopRate\":-1.0,\"laneThreeStopRate\":-1.0,\"meanDelay\":-1.0,\"meanStopsNumber\":-1.0,\"meanLen\":-1.0,\"timeHeadwaySectionMean\":-1.0,\"trafficInformation\":-1,\"overflowRate\":-1.0,\"nonMotorFlow\":-1.0,\"v85\":-1.0}]}";
public static final String crossStr = "{\"orgCode\":\"370112\",\"timeStamp\":\"2024-11-15 20:10:00.000\",\"eventList\":[{\"crossId\":\"13N7H0B63Q0\",\"status\":-1,\"type\":-1,\"trafficIndex\":-1.0,\"startTime\":\"2024-11-15 20:05:00\",\"duration\":-1,\"isUnbalance\":0,\"isSpillover\":-1,\"emptyPass\":-1,\"isCongestion\":-1,\"unbalanceIndex\":null,\"spilloverIndex\":-1.0,\"congestionIndex\":-1.0,\"unbalanceDirs\":\"-1\",\"spilloverDirs\":\"-1\",\"emptyDirTurn\":\"-1\",\"congestionDirs\":\"-1\",\"flow\":-1,\"flowRate\":-1.0,\"speed\":-1.0,\"queueLength\":-1.0,\"stopTimes\":-1.0,\"delayTime\":-1,\"sturation\":-1.0,\"serviceLevel\":\"Z\",\"trafficState\":-1,\"strategy\":-1,\"strategyDuration\":-1,\"optimizeCount\":-1,\"optimizeSeconds\":-1,\"batchTime\":1731672601},{\"crossId\":\"13N920B63K0\",\"status\":-1,\"type\":-1,\"trafficIndex\":-1.0,\"startTime\":\"2024-11-15 20:05:00\",\"duration\":-1,\"isUnbalance\":0,\"isSpillover\":-1,\"emptyPass\":-1,\"isCongestion\":-1,\"unbalanceIndex\":null,\"spilloverIndex\":-1.0,\"congestionIndex\":-1.0,\"unbalanceDirs\":\"-1\",\"spilloverDirs\":\"-1\",\"emptyDirTurn\":\"-1\",\"congestionDirs\":\"-1\",\"flow\":-1,\"flowRate\":-1.0,\"speed\":-1.0,\"queueLength\":-1.0,\"stopTimes\":-1.0,\"delayTime\":-1,\"sturation\":-1.0,\"serviceLevel\":\"Z\",\"trafficState\":-1,\"strategy\":-1,\"strategyDuration\":-1,\"optimizeCount\":-1,\"optimizeSeconds\":-1,\"batchTime\":1731672601}]}";
} }
package net.wanji.datacenter.pojo.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import net.wanji.databus.po.CrossDirDataRealtimePO;
import java.util.List;
/**
* @author duanruiming
* @date 2024/11/20 15:49
*/
@Data
public class CrossPeriodDirDTO {
@JsonProperty("eventList")
private List<CrossDirDataRealtimePO> eventList;
/**
* 车道数量
*/
private Integer laneNum;
/**
* 全域编号
*/
private String orgCode;
/**
* 数据生成时间: yyyy-MM-dd HH:mm:ss:SSS
*/
private String timeStamp;
}
...@@ -13,7 +13,7 @@ import java.util.List; ...@@ -13,7 +13,7 @@ import java.util.List;
* @date 2023/03/09 17:02 * @date 2023/03/09 17:02
*/ */
@Data @Data
public class LanePeriodicDataDTO { public class CrossPeriodLaneDTO {
/** /**
* 车道事件集合 * 车道事件集合
......
package net.wanji.datacenter.pojo.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import net.wanji.databus.po.CrossTurnDataRealtimePO;
import java.util.List;
/**
* @author duanruiming
* @date 2024/11/20 15:52
*/
@Data
public class CrossPeriodTurnDTO {
@JsonProperty("eventList")
private List<CrossTurnDataRealtimePO> eventList;
/**
* 车道数量
*/
private Integer laneNum;
/**
* 全域编号
*/
private String orgCode;
/**
* 数据生成时间: yyyy-MM-dd HH:mm:ss:SSS
*/
private String timeStamp;
}
...@@ -20,4 +20,22 @@ public interface DataProcessService { ...@@ -20,4 +20,22 @@ public interface DataProcessService {
*/ */
void crossSave(String originalData) throws Exception; void crossSave(String originalData) throws Exception;
/**
* 方向信息处理存库
* @param originalData
* @throws Exception
*/
void dirSave(String originalData) throws Exception;
/**
* 转向信息处理存库
* @param originalData
* @throws Exception
*/
void turnSave(String originalData) throws Exception;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment