Commit fecdea7c authored by duanruiming's avatar duanruiming

[update] 路口数据优化

parent cc150029
......@@ -42,8 +42,8 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler {
}
@KafkaListener(topics = {"${kafka-consumer.crossPeriodicDataTopic}"}, groupId = "group")
public void receiveCrossPeriodicData(String message) throws Exception {
String originalData = String.valueOf(message);
public void receiveCrossPeriodicData(ConsumerRecord<Object, String> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value());
try {
dataProcessService.crossSave(originalData);
//acknowledgment.acknowledge();
......
......@@ -83,7 +83,8 @@ public class DataProcessServiceImpl implements DataProcessService {
for (Map.Entry<String, List<CrossPeriodLaneDTO.CrossLaneDataRealTimeDTO>> entry : crossLaneMap.entrySet()) {
String crossId = entry.getKey();
List<CrossPeriodLaneDTO.CrossLaneDataRealTimeDTO> eventListValue = entry.getValue();
Map<Integer, List<CrossPeriodLaneDTO.CrossLaneDataRealTimeDTO>> dirLaneMap = eventListValue.stream().collect(Collectors.groupingBy(CrossPeriodLaneDTO.CrossLaneDataRealTimeDTO::getDir));
List<CrossPeriodLaneDTO.CrossLaneDataRealTimeDTO> collect1 = eventListValue.stream().filter(item -> Objects.nonNull(item.getDir())).collect(Collectors.toList());
Map<Integer, List<CrossPeriodLaneDTO.CrossLaneDataRealTimeDTO>> dirLaneMap = collect1.stream().collect(Collectors.groupingBy(CrossPeriodLaneDTO.CrossLaneDataRealTimeDTO::getDir));
// 方向级别
for (Map.Entry<Integer, List<CrossPeriodLaneDTO.CrossLaneDataRealTimeDTO>> dirLaneEntry : dirLaneMap.entrySet()) {
Integer dir = dirLaneEntry.getKey();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment