Commit 9a03ecfb authored by zhoushiguang's avatar zhoushiguang

从kafka获取干线路段行程时间

parent 698a167f
package net.wanji.opt.common;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.opt.dto.LineCongestion;
import net.wanji.opt.dto.PhaseEmptyResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
......@@ -19,6 +22,7 @@ import java.util.*;
* @author Kent HAN
* @date 2024/4/16 8:45
*/
@Slf4j
public class KafkaConsumerUtil {
private final KafkaConsumer<String, String> consumer;
private final ObjectMapper objectMapper;
......@@ -87,4 +91,57 @@ public class KafkaConsumerUtil {
}
return results;
}
public List<LineCongestion> consumeLineMetricForTimeRange(
String topic, int partition, long startTime, long endTime) {
List<LineCongestion> results = new ArrayList<>();
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(Collections.singletonList(topicPartition));
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(topicPartition, startTime);
OffsetAndTimestamp offsetAndTimestamp = consumer.offsetsForTimes(timestampToSearch).get(topicPartition);
if (offsetAndTimestamp == null) {
return results;
}
long startOffset = offsetAndTimestamp.offset();
// 开始消费
consumer.seek(topicPartition, startOffset);
int emptyPollCount = 0; // 记录空轮询次数的计数器
int maxEmptyPolls = 10; // 设置最大空轮询次数
try {
boolean keepConsuming = true;
while (keepConsuming) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
emptyPollCount++; // 如果没有记录,增加空轮询计数
if (emptyPollCount >= maxEmptyPolls) {
// 如果达到最大空轮询次数,退出循环
break;
}
} else {
emptyPollCount = 0; // 如果有记录,重置空轮询计数器
for (ConsumerRecord<String, String> record : records) {
long recordTime = record.timestamp();
if (recordTime >= startTime && recordTime <= endTime) {
LineCongestion lineCongestion = JSONObject.parseObject(record.value(), LineCongestion.class);
results.add(lineCongestion);
} else if (recordTime > endTime) {
keepConsuming = false;
break;
}
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
consumer.close();
}
log.info("从kafka中获取干线指标数据 topic:{},startTime:{},endTime:{}");
return results;
}
}
package net.wanji.opt.dto;
import lombok.Data;
import java.io.Serializable;
import java.util.Map;
/**
* 干线拥堵事件
*/
@Data
public class LineCongestion implements Serializable {
private Integer greenWaveId;
private Integer greenWaveDir;
private Double index;
// 501畅通 502轻微拥堵 503中度拥堵 504重度拥堵
private String congestionCode;
private Long startTime;
private Long endTime;
private Integer duration;
// 协调方向行程时间(总)
private Double transitTime;
// 协调方向自由流时间(总)
private Double freeTime;
// 检测时间
private Long detectTime;
//拥堵状态
private String congestionType;
private Double speed;
//干线路段旅行时间
private Map<String,Double> ridTravelTime;
}
......@@ -23,6 +23,7 @@ import net.wanji.databus.vo.GreenwaveListVO;
import net.wanji.opt.bo.*;
import net.wanji.opt.cache.BaseCrossInfoCache;
import net.wanji.opt.common.ExcelExportUtils;
import net.wanji.opt.common.KafkaConsumerUtil;
import net.wanji.opt.common.RedisUtils;
import net.wanji.opt.config.DirectionMappingsConfig;
import net.wanji.opt.dao.mapper.CrossSchemeOptLogMapper;
......@@ -35,7 +36,9 @@ import net.wanji.opt.dao.mapper.trend.AnalysisRidTurnIndicatorsMapper;
import net.wanji.opt.dao.mapper.trend.EventAlarmMapper;
import net.wanji.opt.dto.CrossEventDTO;
import net.wanji.opt.dto.CrossLaneSnapshotDataDTO;
import net.wanji.opt.dto.LineCongestion;
import net.wanji.opt.dto.LineSchemeDTO;
import net.wanji.opt.dto.PhaseEmptyResult;
import net.wanji.opt.dto.trend.AbnormalCrossListDTO;
import net.wanji.opt.dto.trend.GreenwaveListDTO;
import net.wanji.opt.po.base.CrossSchemeOptLogPO;
......@@ -50,6 +53,7 @@ import net.wanji.opt.vo.*;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
......@@ -113,6 +117,8 @@ public class TrendServiceImpl implements TrendService {
private final AnalysisRidTurnIndicatorsMapper ridTurnIndicatorsMapper;
private final HoloEventMapper holoEventMapper;
private final BaseCrossInfoCache crossInfoCache;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// 用于计算路口状态,key为方向,value为状态
private Map<Integer, Integer> preStatus = new HashMap<Integer, Integer>() {{
......@@ -965,7 +971,7 @@ public class TrendServiceImpl implements TrendService {
}
private Double calcSpeed(String greenwaveName, String crossId,
String startTime, String endTime, String startDate, String endDate) throws Exception{
String startTime, String endTime, String startDate, String endDate) throws Exception {
// 确定协调方向
String[] split = greenwaveName.split(" ");
String dirStr = split[1];
......@@ -1072,40 +1078,53 @@ public class TrendServiceImpl implements TrendService {
greenwaveCrossMetricsVO.setNoparkPassRate(calcNoparkPassRate(crossDirDataHistPOList, coordDirList));
greenwaveCrossMetricsVO.setUncoordinatePhaseQueue(calcUncoordinatePhaseQueue(crossDirDataHistPOList, coordDirList));
greenwaveCrossMetricsVO.setTrvalTime(calcTravelTime(greenwaveCrossPO,crossDirDataHistPOList,crossId, coordDirList));
greenwaveCrossMetricsVO.setTrvalTime(calcTravelTime(greenwaveCrossPO, crossId, coordDirList, timeStamp - 5 * 60 * 1000, timeStamp));
res.add(greenwaveCrossMetricsVO);
}
return res;
}
private Integer calcTravelTime(GreenwaveCrossPO greenwaveCrossPO, List<CrossDirDataHistPO> crossDirDataHistPOList, String crossId, List<Integer> coordDirList) throws Exception {
double average = crossDirDataHistPOList.stream()
.filter(po -> po.getInOutType() == 1 && coordDirList.contains(po.getDirType()))
.mapToDouble(o->greenwaveCrossPO.getNextCrossLen()/o.getSpeed()/3.6)
.average()
.orElse(0.0);
BigDecimal bigDecimal = new BigDecimal(average);
bigDecimal = bigDecimal.setScale(0,RoundingMode.HALF_UP);
return bigDecimal.intValue();
}
// private Integer calcTravelTime(String crossId, Integer inDir) throws Exception {
// try {
// // 从 Redis 中获取路段行程时间
// String redisKey = crossId + ":" + inDir;
// Object element = redisUtils.getMaxScoreElement(redisKey);
// ObjectMapper objectMapper = new ObjectMapper();
// if (element != null) {
// CrossEventDTO dto = objectMapper.readValue(element.toString(), CrossEventDTO.class);
// return dto.getTransitTime().intValue();
// }
// } catch (Exception e) {
// log.error("获取绿波旅行时间异常:", e);
// throw new RuntimeException(e);
// }
// return 0;
// }
private Integer calcTravelTime(GreenwaveCrossPO greenwaveCrossPO, String crossId, List<Integer> coordDirList, long startTime, long endTime) throws Exception {
// 从 Kafka 查询绿灯空放时长
KafkaConsumerUtil kafkaConsumerUtil = new KafkaConsumerUtil(bootstrapServers, "line-travel-time");
List<LineCongestion> results =
kafkaConsumerUtil.consumeLineMetricForTimeRange(
"line_congestion_sink", 0, startTime, endTime);
results = results.stream().filter(o -> o.getGreenWaveId().equals(greenwaveCrossPO.getGreenId())).collect(Collectors.toList());
List<Double> travelTimeList = new ArrayList<>();
for (LineCongestion dto : results) {
Map<String, Double> travelMap = dto.getRidTravelTime();
travelMap.forEach((k, v) -> {
for (Integer inDir : coordDirList) {
String keyPre = crossId + "_" + inDir;
if (k.contains(keyPre)) {
travelTimeList.add(v);
}
}
});
}
return (int) (travelTimeList.stream().mapToDouble(Double::doubleValue).average().orElse(0));
}
private Integer calcTravelTime(String crossId, Integer inDir) throws Exception {
try {
// 从 Redis 中获取路段行程时间
String redisKey = crossId + ":" + inDir;
Object element = redisUtils.getMaxScoreElement(redisKey);
ObjectMapper objectMapper = new ObjectMapper();
if (element != null) {
CrossEventDTO dto = objectMapper.readValue(element.toString(), CrossEventDTO.class);
return dto.getTransitTime().intValue();
}
} catch (Exception e) {
log.error("获取绿波旅行时间异常:", e);
throw new RuntimeException(e);
}
return 0;
}
private Integer calcUncoordinatePhaseQueue(
List<CrossDirDataHistPO> crossDirDataHistPOList, List<Integer> coordDirList) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment