Commit adee8d66 authored by hanbing's avatar hanbing

[update] 运行评价-绿灯空放时长耗时优化

parent 52872185
package net.wanji.opt.common;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.wanji.opt.dto.PhaseEmptyResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
/**
* 从 Kafka 中按时间范围消费数据
*
* @author Kent HAN
* @date 2024/4/16 8:45
*/
public class KafkaConsumerUtil {
private final KafkaConsumer<String, String> consumer;
private final ObjectMapper objectMapper;
public KafkaConsumerUtil(String bootstrapServers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
this.objectMapper = new ObjectMapper();
}
public List<PhaseEmptyResult> consumeEmptyPhaseForTimeRange(
String topic, int partition, long startTime, long endTime) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(Collections.singletonList(topicPartition));
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(topicPartition, startTime);
long startOffset = consumer.offsetsForTimes(timestampToSearch).get(topicPartition).offset();
// 开始消费
consumer.seek(topicPartition, startOffset);
int emptyPollCount = 0; // 记录空轮询次数的计数器
int maxEmptyPolls = 10; // 设置最大空轮询次数
List<PhaseEmptyResult> results = new ArrayList<>();
try {
boolean keepConsuming = true;
while (keepConsuming) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
emptyPollCount++; // 如果没有记录,增加空轮询计数
if (emptyPollCount >= maxEmptyPolls) {
// 如果达到最大空轮询次数,退出循环
break;
}
} else {
emptyPollCount = 0; // 如果有记录,重置空轮询计数器
for (ConsumerRecord<String, String> record : records) {
long recordTime = record.timestamp();
if (recordTime >= startTime && recordTime <= endTime) {
PhaseEmptyResult phaseEmptyResult =
objectMapper.readValue(record.value(), PhaseEmptyResult.class);
results.add(phaseEmptyResult);
} else if (recordTime > endTime) {
keepConsuming = false;
break;
}
}
}
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} finally {
consumer.close();
}
return results;
}
}
......@@ -4,7 +4,6 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.enums.CrossStatusEnum;
......@@ -25,14 +24,10 @@ import net.wanji.databus.po.LaneInfoPO;
import net.wanji.databus.vo.RunningEvaluateCrossListVO;
import net.wanji.opt.bo.CrossNameBO;
import net.wanji.opt.bo.MetricsDetailBO;
import net.wanji.opt.common.KafkaConsumerUtil;
import net.wanji.opt.dto.PhaseEmptyResult;
import net.wanji.opt.service.RunningEvaluateService;
import net.wanji.opt.vo.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
......@@ -40,7 +35,6 @@ import org.springframework.stereotype.Service;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
......@@ -319,7 +313,12 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService {
metricHistDTOList = crossDataHistMapper.selectMetricHistDTO(
crossId, startStamp, endStamp);
// 相位空放指标
fillPhaseEmpty(metricHistDTOList, crossId);
long startTime = startDate.getTime();
long endTime = endDate.getTime();
KafkaConsumerUtil kafkaConsumerUtil = new KafkaConsumerUtil(bootstrapServers, "empty-phase-comsumer");
List<PhaseEmptyResult> phaseEmptyResults =
kafkaConsumerUtil.consumeEmptyPhaseForTimeRange(emptyPhaseTopic, 0, startTime, endTime);
fillPhaseEmpty(metricHistDTOList, crossId, phaseEmptyResults);
}
RunningEvaluateMetricsDetailVO res = new RunningEvaluateMetricsDetailVO();
......@@ -342,73 +341,27 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService {
return res;
}
private void fillPhaseEmpty(List<MetricHistDTO> metricHistDTOList, String crossId) {
for (MetricHistDTO histDTO : metricHistDTOList) {
Integer batchTime = histDTO.getBatchTime();
// 查询 batchTime 5分钟内的相位空放记录
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "phase-empty-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 topic
String topic = emptyPhaseTopic;
List<TopicPartition> partitions = Arrays.asList(new TopicPartition(topic, 0)); // 只有一个分区
consumer.assign(partitions);
long batchTimeLong = batchTime;
batchTimeLong *= 1000;
long startTime = batchTimeLong - 5 * 60 * 1000; // batchTime 前5分钟
// 计算历史数据的起始和结束 offset
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition partition : partitions) {
timestampToSearch.put(partition, startTime);
}
private void fillPhaseEmpty(List<MetricHistDTO> metricHistDTOList, String crossId,
List<PhaseEmptyResult> phaseEmptyResults) {
Map<TopicPartition, OffsetAndTimestamp> startOffsets = consumer.offsetsForTimes(timestampToSearch);
if (startOffsets != null) {
startOffsets.forEach((partition, offsetAndTimestamp) -> {
if (offsetAndTimestamp != null) {
consumer.seek(partition, offsetAndTimestamp.offset());
}
});
}
List<PhaseEmptyResult> crossResults = phaseEmptyResults.stream()
.filter(result -> crossId.equals(result.getCrossId()))
.collect(Collectors.toList());
List<PhaseEmptyResult> results = new ArrayList<>();
try {
boolean keepConsuming = true;
while (keepConsuming) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
long recordTime = record.timestamp();
if (recordTime >= startTime && recordTime <= batchTimeLong) {
PhaseEmptyResult phaseEmptyResult =
objectMapper.readValue(record.value(), PhaseEmptyResult.class);
results.add(phaseEmptyResult);
} else if (recordTime > batchTime) {
keepConsuming = false;
break;
}
}
for (MetricHistDTO metricHistDTO : metricHistDTOList) {
// 计算结束时间
Date endTime = new Date(metricHistDTO.getStartTime().getTime() + 5 * 60 * 1000); // 增加5分钟
// 遍历每个 PhaseEmptyResult 来查找和累加 duration
for (PhaseEmptyResult phaseEmptyResult : crossResults) {
if (phaseEmptyResult.getDetectTime() * 1000 >= metricHistDTO.getStartTime().getTime()
&& phaseEmptyResult.getDetectTime() * 1000 <= endTime.getTime()) {
// 累加 duration
metricHistDTO.setEmptyPhase(metricHistDTO.getEmptyPhase() + phaseEmptyResult.getDuration());
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} finally {
consumer.close();
}
// 过滤出本路口记录,时间加总
int sum = results.stream()
.filter(result -> crossId.equals(result.getCrossId()))
.mapToInt(PhaseEmptyResult::getDuration)
.sum();
histDTO.setEmptyPhase(sum);
}
}
private List<RunningEvaluateMetricsDetailVO.CrossMetrics> buildMetricsList(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment