Commit 0561b9a8 authored by hanbing's avatar hanbing

[update] 运行评价-详细指标查询增加绿灯空放时长

parent 4da0ccfe
package net.wanji.opt.dto;
import lombok.Data;
/**
* 相位空放 Kafka 实体类
*/
@Data
public class PhaseEmptyResult {
private String crossId;
private String rid;
//进口车道方向
private String direction;
//绿灯转向
private String turn;
//转向空放时间/绿灯总时长
private Double index;
private Long globalTimeStamp;
private Long startTime;
private Long endTime;
private Integer duration;
// 检测时间
private Long detectTime;
}
...@@ -4,6 +4,8 @@ import cn.hutool.core.collection.CollectionUtil; ...@@ -4,6 +4,8 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.common.enums.CrossStatusEnum; import net.wanji.common.enums.CrossStatusEnum;
import net.wanji.common.enums.TurnConvertEnum; import net.wanji.common.enums.TurnConvertEnum;
...@@ -23,14 +25,22 @@ import net.wanji.databus.po.LaneInfoPO; ...@@ -23,14 +25,22 @@ import net.wanji.databus.po.LaneInfoPO;
import net.wanji.databus.vo.RunningEvaluateCrossListVO; import net.wanji.databus.vo.RunningEvaluateCrossListVO;
import net.wanji.opt.bo.CrossNameBO; import net.wanji.opt.bo.CrossNameBO;
import net.wanji.opt.bo.MetricsDetailBO; import net.wanji.opt.bo.MetricsDetailBO;
import net.wanji.opt.dto.PhaseEmptyResult;
import net.wanji.opt.service.RunningEvaluateService; import net.wanji.opt.service.RunningEvaluateService;
import net.wanji.opt.vo.*; import net.wanji.opt.vo.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.ZoneId; import java.time.ZoneId;
...@@ -46,6 +56,13 @@ import java.util.stream.Stream; ...@@ -46,6 +56,13 @@ import java.util.stream.Stream;
@Slf4j @Slf4j
@Service @Service
public class RunningEvaluateServiceImpl implements RunningEvaluateService { public class RunningEvaluateServiceImpl implements RunningEvaluateService {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.empty-phase-topic}")
private String emptyPhaseTopic;
private final CrossDataRealtimeMapper crossDataRealtimeMapper; private final CrossDataRealtimeMapper crossDataRealtimeMapper;
private final CrossDataHistMapper crossDataHistMapper; private final CrossDataHistMapper crossDataHistMapper;
private final BaseCrossSectionMapper baseCrossSectionMapper; private final BaseCrossSectionMapper baseCrossSectionMapper;
...@@ -64,6 +81,8 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService { ...@@ -64,6 +81,8 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService {
SimpleDateFormat HOUR_SDF = new SimpleDateFormat("HH:mm"); SimpleDateFormat HOUR_SDF = new SimpleDateFormat("HH:mm");
SimpleDateFormat DAY_SDF = new SimpleDateFormat("yyyy-MM-dd"); SimpleDateFormat DAY_SDF = new SimpleDateFormat("yyyy-MM-dd");
ObjectMapper objectMapper = new ObjectMapper();
public RunningEvaluateServiceImpl(CrossDataRealtimeMapper crossDataRealtimeMapper, public RunningEvaluateServiceImpl(CrossDataRealtimeMapper crossDataRealtimeMapper,
CrossDataHistMapper crossDataHistMapper, CrossDataHistMapper crossDataHistMapper,
@Qualifier("baseCrossSectionMapper") BaseCrossSectionMapper baseCrossSectionMapper, @Qualifier("baseCrossSectionMapper") BaseCrossSectionMapper baseCrossSectionMapper,
...@@ -299,6 +318,8 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService { ...@@ -299,6 +318,8 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService {
} else { // 路口级别 } else { // 路口级别
metricHistDTOList = crossDataHistMapper.selectMetricHistDTO( metricHistDTOList = crossDataHistMapper.selectMetricHistDTO(
crossId, startStamp, endStamp); crossId, startStamp, endStamp);
// 相位空放指标
fillPhaseEmpty(metricHistDTOList, crossId);
} }
RunningEvaluateMetricsDetailVO res = new RunningEvaluateMetricsDetailVO(); RunningEvaluateMetricsDetailVO res = new RunningEvaluateMetricsDetailVO();
...@@ -321,6 +342,75 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService { ...@@ -321,6 +342,75 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService {
return res; return res;
} }
private void fillPhaseEmpty(List<MetricHistDTO> metricHistDTOList, String crossId) {
for (MetricHistDTO histDTO : metricHistDTOList) {
Integer batchTime = histDTO.getBatchTime();
// 查询 batchTime 5分钟内的相位空放记录
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "phase-empty-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 topic
String topic = emptyPhaseTopic;
List<TopicPartition> partitions = Arrays.asList(new TopicPartition(topic, 0)); // 只有一个分区
consumer.assign(partitions);
long batchTimeLong = batchTime;
batchTimeLong *= 1000;
long startTime = batchTimeLong - 5 * 60 * 1000; // batchTime 前5分钟
// 计算历史数据的起始和结束 offset
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition partition : partitions) {
timestampToSearch.put(partition, startTime);
}
Map<TopicPartition, OffsetAndTimestamp> startOffsets = consumer.offsetsForTimes(timestampToSearch);
if (startOffsets != null) {
startOffsets.forEach((partition, offsetAndTimestamp) -> {
if (offsetAndTimestamp != null) {
consumer.seek(partition, offsetAndTimestamp.offset());
}
});
}
List<PhaseEmptyResult> results = new ArrayList<>();
try {
boolean keepConsuming = true;
while (keepConsuming) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
long recordTime = record.timestamp();
if (recordTime >= startTime && recordTime <= batchTimeLong) {
PhaseEmptyResult phaseEmptyResult =
objectMapper.readValue(record.value(), PhaseEmptyResult.class);
results.add(phaseEmptyResult);
} else if (recordTime > batchTime) {
keepConsuming = false;
break;
}
}
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} finally {
consumer.close();
}
// 过滤出本路口记录,时间加总
int sum = results.stream()
.filter(result -> crossId.equals(result.getCrossId()))
.mapToInt(PhaseEmptyResult::getDuration)
.sum();
histDTO.setEmptyPhase(sum);
}
}
private List<RunningEvaluateMetricsDetailVO.CrossMetrics> buildMetricsList( private List<RunningEvaluateMetricsDetailVO.CrossMetrics> buildMetricsList(
List<MetricHistDTO> metricHistDTOList, Integer minutes) { List<MetricHistDTO> metricHistDTOList, Integer minutes) {
List<RunningEvaluateMetricsDetailVO.CrossMetrics> res = new ArrayList<>(); List<RunningEvaluateMetricsDetailVO.CrossMetrics> res = new ArrayList<>();
...@@ -353,6 +443,7 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService { ...@@ -353,6 +443,7 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService {
double stopTimesSum = 0.0; double stopTimesSum = 0.0;
int delayTimeSum = 0; int delayTimeSum = 0;
double effusionRateSum = 0.0; double effusionRateSum = 0.0;
int emptyPhaseSum = 0;
for (MetricHistDTO metricHistDTO : dtoList) { for (MetricHistDTO metricHistDTO : dtoList) {
Integer flow = metricHistDTO.getFlow(); Integer flow = metricHistDTO.getFlow();
Double speed = metricHistDTO.getSpeed(); Double speed = metricHistDTO.getSpeed();
...@@ -361,6 +452,7 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService { ...@@ -361,6 +452,7 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService {
Double stopTimes = metricHistDTO.getStopTimes(); Double stopTimes = metricHistDTO.getStopTimes();
Integer delayTime = metricHistDTO.getDelayTime(); Integer delayTime = metricHistDTO.getDelayTime();
Double effusionRate = metricHistDTO.getEffusionRate(); Double effusionRate = metricHistDTO.getEffusionRate();
Integer emptyPhase = metricHistDTO.getEmptyPhase();
if (flow != null) flowSum += flow; if (flow != null) flowSum += flow;
if (speed != null) speedSum += speed; if (speed != null) speedSum += speed;
if (capacity != null) capacitySum += capacity; if (capacity != null) capacitySum += capacity;
...@@ -368,6 +460,7 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService { ...@@ -368,6 +460,7 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService {
if (stopTimes != null) stopTimesSum += stopTimes; if (stopTimes != null) stopTimesSum += stopTimes;
if (delayTime != null) delayTimeSum += delayTime; if (delayTime != null) delayTimeSum += delayTime;
if (effusionRate != null) effusionRateSum += effusionRate; if (effusionRate != null) effusionRateSum += effusionRate;
if (emptyPhase != null) emptyPhaseSum += emptyPhase;
} }
int size = dtoList.size(); int size = dtoList.size();
crossMetrics.setFlow(flowSum); crossMetrics.setFlow(flowSum);
...@@ -377,6 +470,7 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService { ...@@ -377,6 +470,7 @@ public class RunningEvaluateServiceImpl implements RunningEvaluateService {
crossMetrics.setStopTimes(stopTimesSum / size); crossMetrics.setStopTimes(stopTimesSum / size);
crossMetrics.setDelayTime(delayTimeSum / size); crossMetrics.setDelayTime(delayTimeSum / size);
crossMetrics.setEffusionRate(effusionRateSum / size); crossMetrics.setEffusionRate(effusionRateSum / size);
crossMetrics.setEmptyPhase(emptyPhaseSum);
} }
res.add(crossMetrics); res.add(crossMetrics);
} }
......
...@@ -100,5 +100,9 @@ public class RunningEvaluateMetricsDetailVO { ...@@ -100,5 +100,9 @@ public class RunningEvaluateMetricsDetailVO {
@ApiModelProperty(value = "溢流率") @ApiModelProperty(value = "溢流率")
private Double effusionRate ; private Double effusionRate ;
@ApiModelProperty(value = "绿灯空放时长")
private Integer emptyPhase ;
} }
} }
...@@ -2,7 +2,7 @@ spring: ...@@ -2,7 +2,7 @@ spring:
elasticsearch: elasticsearch:
username: elastic username: elastic
password: Wanji300552 password: Wanji300552
uris: http://37.12.182.31:9200 uris: http://10.102.1.182:9200
application: application:
name: opt name: opt
datasource: datasource:
...@@ -11,24 +11,24 @@ spring: ...@@ -11,24 +11,24 @@ spring:
datasource: datasource:
master: master:
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://37.12.182.29:3306/t_signal_control_jinan?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true url: jdbc:mysql://10.102.1.182:3306/t_signal_control_jinan?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root username: root
password: Wanji300552 password: Wanji300552
driverClassName: com.mysql.cj.jdbc.Driver driverClassName: com.mysql.cj.jdbc.Driver
slave: slave:
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://37.12.182.29:3306/t_signal_control_jinan?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true url: jdbc:mysql://10.102.1.182:3306/t_signal_control_jinan?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root username: root
password: Wanji300552 password: Wanji300552
driverClassName: com.mysql.cj.jdbc.Driver driverClassName: com.mysql.cj.jdbc.Driver
holo: holo:
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://37.12.182.29:3306/holo_roadnet?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true url: jdbc:mysql://10.102.1.182:3306/holo_roadnet_jn?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root username: root
password: Wanji300552 password: Wanji300552
driverClassName: com.mysql.cj.jdbc.Driver driverClassName: com.mysql.cj.jdbc.Driver
redis: redis:
host: 37.12.182.29 host: 10.102.1.182
port: 6379 port: 6379
password: Wanji300552 password: Wanji300552
jedis: jedis:
...@@ -41,7 +41,8 @@ spring: ...@@ -41,7 +41,8 @@ spring:
database: 3 database: 3
kafka: kafka:
bootstrap-servers: 37.12.182.31:9092 bootstrap-servers: 10.102.1.182:9092
empty-phase-topic: phase.emptiness.analysis
#消费者配置 #消费者配置
consumer: consumer:
max-poll-records: 1 max-poll-records: 1
......
...@@ -45,5 +45,7 @@ public class MetricHistDTO { ...@@ -45,5 +45,7 @@ public class MetricHistDTO {
private Double greenLightEfficiency = 0.0; private Double greenLightEfficiency = 0.0;
@ApiModelProperty(value = "负载均衡度", notes = "") @ApiModelProperty(value = "负载均衡度", notes = "")
private Double loadBalance = 0.0; private Double loadBalance = 0.0;
@ApiModelProperty(value = "绿灯空放时长", notes = "")
private Integer emptyPhase = 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment