Commit ea2b3832 authored by duanruiming's avatar duanruiming

[update] 交通大脑-统计数据优化转向数据逻辑优化

parent c1476ce2
......@@ -15,6 +15,7 @@ import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author duanruiming
......@@ -43,34 +44,17 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St
String dir = inVo.getSectionNo();
List<LaneStatisticsDetail> dataList = inVo.getData_list();
if (!CollectionUtils.isEmpty(dataList)) {
for (LaneStatisticsDetail detail : dataList) {
String laneNo = detail.getLaneNo();
CrossTurnDataRealtimePO po = new CrossTurnDataRealtimePO();
po.setId(String.join(Constant.MIDDLE_LINE, crossId, dir, laneNo));
po.setCrossId(crossId);
po.setStartTime(dateTime);
po.setFlow(Integer.parseInt(detail.getWArrivalFlow()));
po.setSpeed(Double.parseDouble(detail.getSpeedAvg()));
po.setInSpeed(Double.parseDouble(detail.getSpeedAvg()));
po.setOutSpeed(Double.parseDouble(detail.getSpeedAvg()));
po.setQueueLength(Double.parseDouble(detail.getByQueueLen()));
po.setStopTimes(Double.parseDouble(detail.getByStoppingTimes()));
String wDelay = detail.getWDelay();
wDelay = StringUtils.isNotBlank(wDelay) ? wDelay : "0";
po.setDelayTime((int) Double.parseDouble(wDelay));
po.setVehheadDist(Double.parseDouble(detail.getGap()));
po.setVehheadTime(Double.parseDouble(detail.getHeadway()));
po.setQuality(0.6);
po.setNoStopRate(0.0);
po.setOneStopRate(0.0);
po.setTwoStopRate(0.0);
po.setThreeStopRate(0.0);
po.setSturation(0.7);
po.setBatchTime(Integer.parseInt(startTime.substring(0, 10)));
po.setEffusionRate(0.0);
po.setGreenLightEfficiency(0.0);
results.add(po);
}
List<LaneStatisticsDetail> leftList = dataList.stream().filter(po -> po.getDwLeftVehNum() > 0).collect(Collectors.toList());
List<LaneStatisticsDetail> straightList = dataList.stream().filter(po -> po.getDwStraightVehNum() > 0).collect(Collectors.toList());
List<LaneStatisticsDetail> rightList = dataList.stream().filter(po -> po.getDwRightVehNum() > 0).collect(Collectors.toList());
CrossTurnDataRealtimePO leftPO = getCrossTurnDataRealtimePO(leftList, dir, "l", dateTime, crossId);
CrossTurnDataRealtimePO straightPO = getCrossTurnDataRealtimePO(straightList, dir, "s", dateTime, crossId);
CrossTurnDataRealtimePO rightPO = getCrossTurnDataRealtimePO(rightList, dir, "r", dateTime, crossId);
results.add(leftPO);
results.add(straightPO);
results.add(rightPO);
}
}
}
......@@ -83,4 +67,93 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St
throw new RuntimeException(e);
}
}
private CrossTurnDataRealtimePO getCrossTurnDataRealtimePO(List<LaneStatisticsDetail> dataList, String dir, String turn, Date startTime, String crossId) {
CrossTurnDataRealtimePO realtimePO = new CrossTurnDataRealtimePO();
if (!CollectionUtils.isEmpty(dataList)) {
// 不区分车型机动车总流量
int volumeSum = 0;
// 行人流量
int volumePerson = 0;
// 非机动车流量
int volumeCarNon = 0;
// 小车流量
int volumeCarSmall = 0;
// 中车流量
int volumeCarMiddle = 0;
// 大车流量
int volumeCarBig = 0;
// 平均速度(km/h)
double speedAvg = 0;
// 时间占有率(%)
double occupancy = 0;
// 平均车头时距(s)
double headway = 0;
// 平均车身间距(s)
double gap = 0;
// 85位速度(km/h)
double speed85 = 0;
// 车流量总和
int wArrivalFlow = 0;
// 平均空间占有率
double fSpaceOccupyRation = 0;
// 堵塞状态下最大排队长度
double byQueueLen = 0;
// 平均延误
int wDelay = 0;
// 平均停车次数
double byStoppingTimes = 0;
// 左转总流量
int dwLeftVehNum = 0;
// 右转总流量
int dwRightVehNum = 0;
// 直行总流量
int dwStraightVehNum = 0;
for (LaneStatisticsDetail laneStatisticsDetail : dataList) {
volumeSum += laneStatisticsDetail.getVolumeSum();
volumePerson += laneStatisticsDetail.getVolumePerson();
volumeCarNon += laneStatisticsDetail.getVolumeCarNon();
volumeCarSmall += laneStatisticsDetail.getVolumeCarSmall();
volumeCarMiddle += laneStatisticsDetail.getVolumeCarMiddle();
volumeCarBig += laneStatisticsDetail.getVolumeCarBig();
speedAvg += Double.parseDouble(laneStatisticsDetail.getSpeedAvg());
occupancy += Double.parseDouble(laneStatisticsDetail.getOccupancy());
headway += Double.parseDouble(laneStatisticsDetail.getHeadway());
gap += Double.parseDouble(laneStatisticsDetail.getGap());
speed85 += Double.parseDouble(laneStatisticsDetail.getSpeed85());
wArrivalFlow += Integer.parseInt(laneStatisticsDetail.getWArrivalFlow());
fSpaceOccupyRation += Double.parseDouble(laneStatisticsDetail.getFSpaceOccupyRation());
Double currrentQueue = Double.parseDouble(laneStatisticsDetail.getByQueueLen());
byQueueLen = byQueueLen >= currrentQueue ? byQueueLen :currrentQueue;
wDelay += Integer.parseInt(laneStatisticsDetail.getWDelay());
byStoppingTimes += Double.parseDouble(laneStatisticsDetail.getByStoppingTimes());
dwLeftVehNum += laneStatisticsDetail.getDwLeftVehNum();
dwRightVehNum += laneStatisticsDetail.getDwRightVehNum();
dwStraightVehNum += laneStatisticsDetail.getDwStraightVehNum();
}
String id = String.join("_", crossId, dir, turn);
realtimePO.setId(id);
realtimePO.setTurnType(turn);
realtimePO.setInDir(Integer.valueOf(dir));
realtimePO.setCrossId(crossId);
realtimePO.setStartTime(startTime);
realtimePO.setFlow(dwLeftVehNum + dwRightVehNum + dwStraightVehNum);
realtimePO.setStatus(1);
realtimePO.setTrafficIndex(1.0);
realtimePO.setSpeed(Math.round(speedAvg / dataList.size() * 100.0) / 100.0);
realtimePO.setQueueLength(byQueueLen);
realtimePO.setStopTimes(Math.round(byStoppingTimes / dataList.size() * 100.0) / 100.0);
realtimePO.setDelayTime( (int) Math.round(wDelay / dataList.size() * 100.0) / 100);
realtimePO.setSturation(1.0);
//realtimePO.setVehheadDist();
realtimePO.setVehheadTime(Math.round(headway / dataList.size() * 100.0) / 100.0);
realtimePO.setQuality(1.0);
//realtimePO.setNoStopRate();
realtimePO.setBatchTime((int) startTime.getTime());
realtimePO.setGmtCreate(new Date());
realtimePO.setGmtModified(new Date());
}
return realtimePO;
}
}
\ No newline at end of file
......@@ -48,7 +48,7 @@ public class LaneStatisticsReceiveMain {
"city-brain-spillover-receive-data-source");
dealLaneRealTimeData(statisticsStream, env);
// 车道数据中不能处理转向,暂不处理 数据不匹配
// 车道数据中不能处理转向,暂不处理 数据不匹配,统计数据没办法计算
dealTurnRealTimeData(statisticsStream);
dealDirRealTimeData(statisticsStream);
dealCrossRealTimeData(env, statisticsStream, spilloverStream);
......@@ -103,7 +103,7 @@ public class LaneStatisticsReceiveMain {
private static void dealTurnRealTimeData(DataStream<String> stream) {
SingleOutputStreamOperator<String> laneRealtimeStream = stream
.map(new LaneStatisticsReceive2LaneDataMap())
.map(new LaneStatisticsReceive2TurnDataMap())
.name("接收城市大脑统计数据-JsonToObject");
// 同周期数据一致,发送Kafka,准备入库
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment