Commit 40f2ac9d authored by zhoushiguang's avatar zhoushiguang

干线指标计算

parent a540319f
......@@ -2,9 +2,15 @@ package net.wanji.datacenter.pojo.dto;
import lombok.Data;
import java.util.Map;
@Data
public class LineCongestion {
private Integer greenWaveId;
private Integer greenWaveDir;
private Double index;
// 501畅通 502轻微拥堵 503中度拥堵 504重度拥堵
......@@ -25,4 +31,11 @@ public class LineCongestion {
// 检测时间
private Long detectTime;
//拥堵状态
private String congestionType;
private Double speed;
//干线路段旅行时间
private Map<String,Double> ridTravelTime;
}
\ No newline at end of file
......@@ -209,6 +209,7 @@ public class BaseLaneInfoServiceImpl extends ServiceImpl<BaseLaneInfoMapper, Bas
for (Map<String, Object> map : list) {
String crossId = Tools.getMapValue("cross_id", map);
String rid = Tools.getMapValue("rid", map);
String ridDir = Tools.getMapValue("rid_dir", map);
String fs = Tools.getMapValue("free_speed", map);
fs = fs == null ? defaultFreeSpeed.toString() : fs;
......@@ -223,6 +224,7 @@ public class BaseLaneInfoServiceImpl extends ServiceImpl<BaseLaneInfoMapper, Bas
dataMap.put("level", Double.parseDouble(level));
dataMap.put("ridLength", ridLength);
dataMap.put("freeSpeed", freeSpeed);
dataMap.put("ridDir",Double.valueOf(ridDir));
retmap.put(key, dataMap);
}
......
......@@ -213,6 +213,11 @@ public class LineCongestionIndexMain {
singleCarInfo.setId(end.getId());
singleCarInfo.setCrossId(end.getRoadnet().getCrossId());
singleCarInfo.setRid(end.getRoadnet().getRid());
singleCarInfo.setRidDir(end.getRoadnet().getRidDir8());
Map<String, Double> freeMap = BaseLaneCache.getInstance().getFreeSpeedMap().get(singleCarInfo.getRid());
if (singleCarInfo.getRidDir()==null) {
singleCarInfo.setRidDir(freeMap.get("ridDir").intValue());
}
singleCarInfo.setLaneId(end.getRoadnet().getLaneId());
singleCarInfo.setSegmentId(end.getRoadnet().getSegmentId());
singleCarInfo.setStartTime(start.getGlobalTimeStamp());
......@@ -235,7 +240,7 @@ public class LineCongestionIndexMain {
}
}
}
}).setParallelism(2).name("统计每辆车通过路口的通行时间");
}).setParallelism(1).name("统计每辆车通过路口的通行时间");
//对同一窗口的统计数据进行处理
SingleOutputStreamOperator<LineCongestion> trafficIndexStream = singleCarStatStream
......@@ -254,46 +259,66 @@ public class LineCongestionIndexMain {
AtomicDouble atomicDoubleIndex = new AtomicDouble();
AtomicDouble atomicDoubleTime = new AtomicDouble();
AtomicDouble atomicDoubleTransTime = new AtomicDouble();
AtomicDouble atomicDoubleFreeTime = new AtomicDouble();
AtomicDouble atomicDoubleLength = new AtomicDouble();
//干线子路段分组
Map<String, List<SingleCarInfo>> groupByRid = list.stream().collect(Collectors.groupingBy(o -> o.getRid()));
Set<String> ridList1 = new HashSet<>();
Set<String> ridList2 = groupByRid.keySet();
for (String gk : greenWaveMap.keySet()) {
Map<String, Object> map = greenWaveMap.get(gk);
String gdir = Tools.getMapValue("dir", map);
if (Objects.equals(dir, gdir)) {
String gdir = Tools.getMapValue("greenDir", map);
if (Objects.equals(dir, gdir) && !ridList2.contains(gk)) {
ridList1.add(gk);
}
}
Set<String> ridList2 = groupByRid.keySet();
ridList1.retainAll(ridList2);
//无指标数据rid
//无指标数据rid默认给一个自由流旅行时间
for (String rid : ridList1) {
atomicDoubleIndex.addAndGet(1);
Map<String, Double> freeMap = BaseLaneCache.getInstance().getFreeSpeedMap().get(rid);
Double transitTime = ArithOfBigDecmial.div(freeMap.get("ridLength"),freeMap.get("freeSpeed")/3.6,0);
List<SingleCarInfo> virtualList =new ArrayList<>();
SingleCarInfo info = new SingleCarInfo();
info.setTransitTime(transitTime.longValue()*1000);
info.setFreeTime(transitTime);
info.setRidLength(freeMap.get("ridLength"));
info.setRid(rid);
String crossId = rid.substring(11,rid.length()-1);
info.setCrossId(crossId);
info.setRidDir(freeMap.get("ridDir").intValue());
info.setVirtual(true);
virtualList.add(info);
groupByRid.put(rid,virtualList);
}
Map<String,Double> ridTravelTime = new HashMap<>();
groupByRid.forEach((k1, v1) -> {
double avgTransitTime = v1.stream().mapToDouble(SingleCarInfo::getTransitTime).average().orElse(0) / 1000;
double freeTime = v1.get(0).getFreeTime();
double ridIndex = avgTransitTime / freeTime;
atomicDoubleIndex.addAndGet(ridIndex);
//double ridIndex = avgTransitTime / freeTime;
atomicDoubleTransTime.addAndGet(avgTransitTime*v1.size());
atomicDoubleFreeTime.addAndGet(freeTime*v1.size());
atomicDoubleTime.addAndGet(avgTransitTime);
atomicDoubleLength.addAndGet(v1.get(0).getRidLength());
log.info("路段交通指数rid:{},index:{}", k1, ridIndex);
String crossDir = v1.get(0).getCrossId()+"_"+v1.get(0).getRidDir()+"_"+v1.get(0).getRid();
ridTravelTime.put(crossDir,avgTransitTime);
//log.info("路段交通指数rid:{},index:{}", k1, ridIndex);
});
double index = atomicDoubleIndex.get() / (groupByRid.size()+ridList1.size());
double index = atomicDoubleTransTime.get() / atomicDoubleFreeTime.get();
if (index < 1) {
index = 1;
}
LineCongestion lineCongestion = new LineCongestion();
lineCongestion.setRids(ridList2);
lineCongestion.setRidTravelTime(ridTravelTime);
lineCongestion.setIndex(index);
lineCongestion.setTransitTime(atomicDoubleTime.get());
String congestionType = null;
......
......@@ -3,6 +3,7 @@ package com.wanji.indicators.task.trajectory.pojo;
import lombok.Data;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
/**
......@@ -40,5 +41,6 @@ public class LineCongestion implements Serializable {
private Double speed;
private Set<String> rids;
//干线路段旅行时间
private Map<String,Double> ridTravelTime;
}
......@@ -18,6 +18,8 @@ public class SingleCarInfo {
private String rid;
private Integer ridDir;
private String crossId;
private String laneId;
......@@ -42,4 +44,6 @@ public class SingleCarInfo {
private Double freeTime;
//rid length
private Double ridLength;
private boolean isVirtual;
}
......@@ -181,7 +181,8 @@
a.id AS rid,
max(b.free_speed) free_speed,
a.level,
a.wkt
a.wkt,
a.in_dir rid_dir
FROM
t_base_rid_info a
LEFT JOIN t_cross_rid_freespeed b
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment