Commit b8fa4f79 authored by zhoushiguang's avatar zhoushiguang

路段路况计算优化

parent e8364935
...@@ -96,7 +96,7 @@ public class RidTrafficStateMain implements Serializable { ...@@ -96,7 +96,7 @@ public class RidTrafficStateMain implements Serializable {
//.keyBy(o->(o.getRoadnet().getSegmentId())) //.keyBy(o->(o.getRoadnet().getSegmentId()))
//.process(new RidMotorCarProcessWindow()) //.process(new RidMotorCarProcessWindow())
.keyBy(o->o.getRoadnet().getRid()) .keyBy(o->o.getRoadnet().getRid())
.window(SlidingEventTimeWindows.of(Time.minutes(2), Time.seconds(30))) .window(SlidingEventTimeWindows.of(Time.minutes(2), Time.seconds(1)))
.process(new RidProcessWindow()) .process(new RidProcessWindow())
.name("路段交通路况-按RID渠化段分组车辆"); .name("路段交通路况-按RID渠化段分组车辆");
......
...@@ -4,13 +4,24 @@ import com.wanji.indicators.cache.BaseLaneCache; ...@@ -4,13 +4,24 @@ import com.wanji.indicators.cache.BaseLaneCache;
import com.wanji.indicators.constant.CongestEnum; import com.wanji.indicators.constant.CongestEnum;
import com.wanji.indicators.constant.RoadClassMap; import com.wanji.indicators.constant.RoadClassMap;
import com.wanji.indicators.model.CarTrackModel; import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.RoutePathCarTrackModel;
import com.wanji.indicators.task.area.service.model.ParticipantAnalysisMidResultModel; import com.wanji.indicators.task.area.service.model.ParticipantAnalysisMidResultModel;
import com.wanji.indicators.task.citybrain.vo.CarPassVO;
import com.wanji.indicators.task.trafficstate.model.LaneTrafficStateResult; import com.wanji.indicators.task.trafficstate.model.LaneTrafficStateResult;
import com.wanji.indicators.util.ArithOfBigDecmial; import com.wanji.indicators.util.ArithOfBigDecmial;
import com.wanji.indicators.util.CommonUtil; import com.wanji.indicators.util.CommonUtil;
import com.wanji.indicators.util.DateUtil; import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.GeomsConvertUtil; import com.wanji.indicators.util.GeomsConvertUtil;
import com.wanji.indicators.util.PropertiesHelper; import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
...@@ -34,11 +45,27 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT ...@@ -34,11 +45,27 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT
private Properties properties = new Properties(); private Properties properties = new Properties();
private transient MapState<String, LaneTrafficStateResult> roadStateMapState;
@Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
PropertiesHelper instance = PropertiesHelper.getInstance(); PropertiesHelper instance = PropertiesHelper.getInstance();
this.properties = instance.getProperties(); this.properties = instance.getProperties();
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(6))//状态过期时间
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.cleanupFullSnapshot()
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)//不返回过期值
.build();
MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("REALTIME_ROAD_STATUS",
String.class,
LaneTrafficStateResult.class
);
mapStateDescriptor.enableTimeToLive(ttlConfig);
roadStateMapState = getRuntimeContext().getMapState(mapStateDescriptor);
} }
@Override @Override
public void process(String key, Context context, Iterable<CarTrackModel> elements, Collector<LaneTrafficStateResult> out) throws Exception { public void process(String key, Context context, Iterable<CarTrackModel> elements, Collector<LaneTrafficStateResult> out) throws Exception {
long windowStartTs = context.window().getStart(); long windowStartTs = context.window().getStart();
...@@ -46,26 +73,39 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT ...@@ -46,26 +73,39 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT
List<CarTrackModel> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList()); List<CarTrackModel> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList());
//排除路口进口车道上的车辆 //进口车道
list = list.stream().filter(o -> !Objects.equals(2, o.getRoadnet().getLaneType())).collect(Collectors.toList()); List<CarTrackModel> inLaneList = list.stream().filter(o -> Objects.equals(2, o.getRoadnet().getLaneType())).collect(Collectors.toList());
//保留路段上的车辆
list = list.stream().filter(o -> !Objects.equals(2, o.getRoadnet().getLaneType()) && !Objects.equals(3, o.getRoadnet().getLaneType())).collect(Collectors.toList());
Double freeSpeed = null; Double freeSpeed = null;
int ridLevel = RoadClassMap.MAIN_DAO.getType(); String rid = null;
Integer ridDir = null;
double trafficIndex = 1; double trafficIndex = 1;
if (!list.isEmpty()) { if (!list.isEmpty()) {
freeSpeed = list.get(0).getRoadnet().getFreeSpeed(); if (!inLaneList.isEmpty()) {
double ridLength = list.get(0).getRoadnet().getRidLength(); freeSpeed = inLaneList.get(0).getRoadnet().getFreeSpeed();
rid = inLaneList.get(0).getRoadnet().getRid();
ridDir = inLaneList.get(0).getRoadnet().getRidDir8();
} else {
freeSpeed = list.get(0).getRoadnet().getFreeSpeed();
rid = list.get(0).getRoadnet().getRid();
ridDir = list.get(0).getRoadnet().getRidDir8();
}
if (freeSpeed == null) { if (freeSpeed == null) {
freeSpeed = Double.parseDouble(this.properties.getProperty("rid.default.free.speed")); freeSpeed = Double.parseDouble(this.properties.getProperty("rid.default.free.speed"));
} }
double avgSpeed = list.stream().mapToDouble(o -> o.getSpeed()).average().getAsDouble(); double avgSpeed = getAvgSpeed(list, freeSpeed);
long segmentCarSize = list.stream().mapToInt(o -> o.getId()).distinct().count(); long segmentCarSize = list.stream().mapToInt(o -> o.getId()).distinct().count();
if (avgSpeed > 0 && segmentCarSize>10) { if (avgSpeed > 0 && segmentCarSize > 10) {
trafficIndex = ArithOfBigDecmial.div(freeSpeed, avgSpeed); trafficIndex = ArithOfBigDecmial.div(freeSpeed, avgSpeed);
} }
String trafficState = "1"; String trafficState = "1";
Integer count = BaseLaneCache.getInstance().getRidMaxLaneNumberMap().get(key); Integer count = BaseLaneCache.getInstance().getRidMaxLaneNumberMap().get(key);
int ridLevel = RoadClassMap.MAIN_DAO.getType();
if (count != null && count >= 3) { if (count != null && count >= 3) {
ridLevel = RoadClassMap.MAIN_DAO.getType(); ridLevel = RoadClassMap.MAIN_DAO.getType();
} else { } else {
...@@ -98,9 +138,8 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT ...@@ -98,9 +138,8 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT
LaneTrafficStateResult outModel = new LaneTrafficStateResult(); LaneTrafficStateResult outModel = new LaneTrafficStateResult();
outModel.setId("Rid_Traffic_State"); outModel.setId("Rid_Traffic_State");
outModel.setRid(list.get(0).getRoadnet().getRid()); outModel.setRid(rid);
outModel.setRidDir(list.get(0).getRoadnet().getRidDir8()); outModel.setRidDir(ridDir);
outModel.setSegmentId(list.get(0).getRoadnet().getSegmentId());
outModel.setTrafficIndex(trafficIndex); outModel.setTrafficIndex(trafficIndex);
outModel.setTrafficState(Integer.valueOf(trafficState)); outModel.setTrafficState(Integer.valueOf(trafficState));
outModel.setStartTimestamp(windowStartTs); outModel.setStartTimestamp(windowStartTs);
...@@ -108,28 +147,66 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT ...@@ -108,28 +147,66 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT
outModel.setStartTime(DateUtil.toDateTime(windowStartTs, "yyyy-MM-dd HH:mm:ss")); outModel.setStartTime(DateUtil.toDateTime(windowStartTs, "yyyy-MM-dd HH:mm:ss"));
outModel.setEndTime(DateUtil.toDateTime(windowEndTs, "yyyy-MM-dd HH:mm:ss")); outModel.setEndTime(DateUtil.toDateTime(windowEndTs, "yyyy-MM-dd HH:mm:ss"));
outModel.setRidLength(list.get(0).getRoadnet().getRidLength()); outModel.setRidLength(list.get(0).getRoadnet().getRidLength());
outModel.setNowDateTime(DateUtil.toDateTime(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
LaneTrafficStateResult agoResult = roadStateMapState.get(outModel.getRid());
if (agoResult != null) {
long agoTimeStamp = agoResult.getCreateTime();
long duration = System.currentTimeMillis() - agoTimeStamp;
int agoState = agoResult.getTrafficState();
int nowState = outModel.getTrafficState();
if (agoState == nowState && duration >= 30 * 1000) {
//持续时间30秒以上输出
out.collect(outModel);
outModel.setCreateTime(System.currentTimeMillis());
log.info("路段渠化RID:{},方向:{},持续时间:{}ms,拥堵指数:{},拥堵状态:{},平均速度:{},车辆数:{},路段长度:{}m, 自由流速度:{},开始时间:{},截止时间:{}", key, outModel.getRidDir(), duration, trafficIndex, trafficState, avgSpeed, segmentCarSize, outModel.getRidLength(), freeSpeed, outModel.getStartTime(), outModel.getEndTime());
} else {
outModel.setCreateTime(agoTimeStamp);
}
} else {
//第一次缓存创建时间
outModel.setCreateTime(System.currentTimeMillis());
}
log.info("路段渠化RID:{},方向:{},拥堵指数:{},拥堵状态:{},平均速度:{},车辆数:{},路段长度:{}m, 自由流速度:{},开始时间:{},截止时间:{}", key, outModel.getRidDir(), trafficIndex, trafficState, avgSpeed, segmentCarSize, outModel.getRidLength(), freeSpeed, outModel.getStartTime(), outModel.getEndTime()); roadStateMapState.put(outModel.getRid(), outModel);
out.collect(outModel);
} }
} }
private int getFrameAvgSpeed(List<Double> list) { private double getAvgSpeed(List<CarTrackModel> list, double freeSpeed) {
//当前帧机动车平均速度,所有速度大于5km/h的速度进行排序,去掉低速10%,然后进行平均 Map<Integer, List<CarTrackModel>> groupByPlateNo = list.stream().filter(o -> Objects.nonNull(o.getId())).collect(Collectors.groupingBy(o -> o.getId()));
List<Double> speedList = list.stream()
.filter(speed -> speed > 0).distinct() double laneTotalSpeed = 0;
.sorted(Comparator.comparing(Double::doubleValue)) double laneTotalTime = 0;
.collect(Collectors.toList()); double laneCarSize = 0;
for (Map.Entry<Integer, List<CarTrackModel>> entry : groupByPlateNo.entrySet()) {
List<CarTrackModel> carList = entry.getValue();
double travelSpeed = 0;
double travelTime = 0;
if (carList.size() > 1) {
carList = carList.stream().sorted(Comparator.comparing(o -> o.getGlobalTimeStamp())).collect(Collectors.toList());
//米
double distance = CommonUtil.getTravelDistance(carList);
//秒
travelTime = (carList.get(carList.size() - 1).getGlobalTimeStamp() - carList.get(0).getGlobalTimeStamp()) / 1000;
if (travelTime > 0) {//单车平均行驶速度km/h
travelSpeed = (distance / 1000) / (travelTime / 60 / 60.0);
}
laneTotalSpeed += travelSpeed;
laneCarSize++;
int startIndex = new BigDecimal(speedList.size() * 0.1).intValue(); }
int endIndex = (int) ArithOfBigDecmial.round(speedList.size() * 0.9, 0); // trackIdList.add(entry.getKey());
List<Double> subList = speedList.subList(startIndex, endIndex); }
double avgSpeed = subList.stream().collect(Collectors.averagingDouble(Double::doubleValue)); if (laneCarSize > 0) {
return (int) ArithOfBigDecmial.round(avgSpeed, 0); return ArithOfBigDecmial.div(laneTotalSpeed, laneCarSize);
} else {
return freeSpeed;
}
} }
} }
package com.wanji.indicators.task.trafficstate.model; package com.wanji.indicators.task.trafficstate.model;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data; import lombok.Data;
import lombok.ToString; import lombok.ToString;
...@@ -37,4 +38,7 @@ public class LaneTrafficStateResult implements Serializable { ...@@ -37,4 +38,7 @@ public class LaneTrafficStateResult implements Serializable {
private Double trafficIndex; private Double trafficIndex;
private List<Integer> trackIdList; private List<Integer> trackIdList;
@JSONField(serialize = false)
private long createTime;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment