Commit a1b239e8 authored by zhoushiguang's avatar zhoushiguang

Merge remote-tracking branch 'origin/master'

parents 12f6403b d31db0d5
...@@ -11,10 +11,7 @@ import org.springframework.util.CollectionUtils; ...@@ -11,10 +11,7 @@ import org.springframework.util.CollectionUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalTime; import java.time.LocalTime;
import java.util.Comparator; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -51,7 +48,8 @@ public class CrossGreenWaveInfoCache implements InitializingBean { ...@@ -51,7 +48,8 @@ public class CrossGreenWaveInfoCache implements InitializingBean {
List<GreenwaveCrossPO> greenWaveCrossPOS = greenwaveCrossMapper.selectByGreenwaveId(greenWaveId); List<GreenwaveCrossPO> greenWaveCrossPOS = greenwaveCrossMapper.selectByGreenwaveId(greenWaveId);
if (!CollectionUtils.isEmpty(greenWaveCrossPOS)) { if (!CollectionUtils.isEmpty(greenWaveCrossPOS)) {
List<GreenwaveCrossPO> greenWaveCrossIds = greenWaveCrossPOS.stream() List<GreenwaveCrossPO> greenWaveCrossIds = greenWaveCrossPOS.stream()
.filter(po -> LocalTime.parse(po.getStartTime()).isBefore(now) && LocalTime.parse(po.getEndTime()).isAfter(now)) .filter(po -> LocalTime.parse(po.getStartTime()).isBefore(now)
&& LocalTime.parse(po.getEndTime()).isAfter(now) && Objects.equals(0, po.getWeek()))
.sorted(Comparator.comparingInt(GreenwaveCrossPO::getSort)) .sorted(Comparator.comparingInt(GreenwaveCrossPO::getSort))
.collect(Collectors.toList()); .collect(Collectors.toList());
greenWaveInfoMap.put(greenWaveId, greenWaveCrossIds); greenWaveInfoMap.put(greenWaveId, greenWaveCrossIds);
......
...@@ -48,35 +48,7 @@ public class CrossSchemePhaseLightsCache implements CommandLineRunner { ...@@ -48,35 +48,7 @@ public class CrossSchemePhaseLightsCache implements CommandLineRunner {
Integer schemeId = baseCrossSchemePO.getId(); Integer schemeId = baseCrossSchemePO.getId();
List<CrossPhasePO> crossPhasePOList = baseCrossPhaseMapper.selectByCrossIdAndSchemeId(crossId, schemeId); List<CrossPhasePO> crossPhasePOList = baseCrossPhaseMapper.selectByCrossIdAndSchemeId(crossId, schemeId);
if (!CollectionUtils.isEmpty(crossPhasePOList)) { if (!CollectionUtils.isEmpty(crossPhasePOList)) {
for (CrossPhasePO crossPhasePO : crossPhasePOList) { buildData(crossId, schemeNo, crossPhasePOList);
Integer phaseId = crossPhasePO.getId();
String phaseNo = crossPhasePO.getPhaseNo();
Integer redTime = crossPhasePO.getRedTime();
Integer phaseTime = crossPhasePO.getPhaseTime();
Integer greenTime = crossPhasePO.getGreenTime();
List<CrossPhaseLightsPO> crossPhaseLightsPOList = baseCrossPhaseLightsMapper.selectByPhaseId(phaseId);
if (!CollectionUtils.isEmpty(crossPhaseLightsPOList)) {
for (CrossPhaseLightsPO crossPhaseLightsPO : crossPhaseLightsPOList) {
CrossSchemePhaseDirTurnDTO crossSchemePhaseDirTurnDTO = new CrossSchemePhaseDirTurnDTO();
crossSchemePhaseDirTurnDTO.setCrossId(crossId);
crossSchemePhaseDirTurnDTO.setSchemeNo(schemeNo);
crossSchemePhaseDirTurnDTO.setPhaseNo(phaseNo);
Integer lightsId = crossPhaseLightsPO.getLightsId();
BaseCrossLightsPO baseCrossLightsPO = baseCrossLightsMapper.selectById(lightsId);
Integer dir = baseCrossLightsPO.getDir();
crossSchemePhaseDirTurnDTO.setLightsDir(dir);
Integer turn = baseCrossLightsPO.getType();
if (turn == 20) {
continue;
}
crossSchemePhaseDirTurnDTO.setPhaseTime(phaseTime);
crossSchemePhaseDirTurnDTO.setGreenTime(greenTime);
crossSchemePhaseDirTurnDTO.setRedTime(redTime);
crossSchemePhaseDirTurnDTO.setLightsTurn(turn);
crossSchemePhaseDirTurnDTOList.add(crossSchemePhaseDirTurnDTO);
}
}
}
} }
} }
} }
...@@ -87,6 +59,44 @@ public class CrossSchemePhaseLightsCache implements CommandLineRunner { ...@@ -87,6 +59,44 @@ public class CrossSchemePhaseLightsCache implements CommandLineRunner {
} }
} }
/**
* 构建路口方案相位方向转向实体
* @param crossId 路口编号
* @param schemeNo
* @param crossPhasePOList
*/
private void buildData(String crossId, String schemeNo, List<CrossPhasePO> crossPhasePOList) {
for (CrossPhasePO crossPhasePO : crossPhasePOList) {
Integer phaseId = crossPhasePO.getId();
String phaseNo = crossPhasePO.getPhaseNo();
Integer redTime = crossPhasePO.getRedTime();
Integer phaseTime = crossPhasePO.getPhaseTime();
Integer greenTime = crossPhasePO.getGreenTime();
List<CrossPhaseLightsPO> crossPhaseLightsPOList = baseCrossPhaseLightsMapper.selectByPhaseId(phaseId);
if (!CollectionUtils.isEmpty(crossPhaseLightsPOList)) {
for (CrossPhaseLightsPO crossPhaseLightsPO : crossPhaseLightsPOList) {
CrossSchemePhaseDirTurnDTO crossSchemePhaseDirTurnDTO = new CrossSchemePhaseDirTurnDTO();
crossSchemePhaseDirTurnDTO.setCrossId(crossId);
crossSchemePhaseDirTurnDTO.setSchemeNo(schemeNo);
crossSchemePhaseDirTurnDTO.setPhaseNo(phaseNo);
Integer lightsId = crossPhaseLightsPO.getLightsId();
BaseCrossLightsPO baseCrossLightsPO = baseCrossLightsMapper.selectById(lightsId);
Integer dir = baseCrossLightsPO.getDir();
crossSchemePhaseDirTurnDTO.setLightsDir(dir);
Integer turn = baseCrossLightsPO.getType();
if (turn == 20) {
continue;
}
crossSchemePhaseDirTurnDTO.setPhaseTime(phaseTime);
crossSchemePhaseDirTurnDTO.setGreenTime(greenTime);
crossSchemePhaseDirTurnDTO.setRedTime(redTime);
crossSchemePhaseDirTurnDTO.setLightsTurn(turn);
crossSchemePhaseDirTurnDTOList.add(crossSchemePhaseDirTurnDTO);
}
}
}
}
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
try { try {
......
...@@ -43,22 +43,31 @@ public class CrossTurnWeekMaxFlowCache implements CommandLineRunner { ...@@ -43,22 +43,31 @@ public class CrossTurnWeekMaxFlowCache implements CommandLineRunner {
queryWrapper.between(CrossTurnDataHistPO::getStartTime, lastWeek, lastDay); queryWrapper.between(CrossTurnDataHistPO::getStartTime, lastWeek, lastDay);
List<CrossTurnDataHistPO> crossTurnDataHistPOS = crossTurnDataHistMapper.selectList(queryWrapper); List<CrossTurnDataHistPO> crossTurnDataHistPOS = crossTurnDataHistMapper.selectList(queryWrapper);
if (!CollectionUtils.isEmpty(crossTurnDataHistPOS)) { if (!CollectionUtils.isEmpty(crossTurnDataHistPOS)) {
Map<Integer, List<CrossTurnDataHistPO>> dirDataPOsMap = crossTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getInDir)); calculateData(crossId, crossTurnDataHistPOS);
for (Map.Entry<Integer, List<CrossTurnDataHistPO>> entry : dirDataPOsMap.entrySet()) { }
Integer dir = entry.getKey(); }
List<CrossTurnDataHistPO> dirHistPOS = entry.getValue(); }
if (!CollectionUtils.isEmpty(dirHistPOS)) {
Map<String, List<CrossTurnDataHistPO>> turnDataPOMap = dirHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getTurnType)); /**
for (Map.Entry<String, List<CrossTurnDataHistPO>> turnEntry : turnDataPOMap.entrySet()) { * 计算转向一周最大流量
String turn = turnEntry.getKey(); * @param crossId
List<CrossTurnDataHistPO> turnDataHistPOList = turnEntry.getValue(); * @param crossTurnDataHistPOS
if (!CollectionUtils.isEmpty(turnDataHistPOList)) { */
OptionalInt max = turnDataHistPOList.stream().map(CrossTurnDataHistPO::getFlow).mapToInt(Integer::intValue).max(); private static void calculateData(String crossId, List<CrossTurnDataHistPO> crossTurnDataHistPOS) {
if (max.isPresent()) { Map<Integer, List<CrossTurnDataHistPO>> dirDataPOsMap = crossTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getInDir));
String key = crossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)).concat(Constants.UNDERLINE).concat(turn); for (Map.Entry<Integer, List<CrossTurnDataHistPO>> entry : dirDataPOsMap.entrySet()) {
crossTurnWeekMaxFlowMap.put(key, max.getAsInt()); Integer dir = entry.getKey();
} List<CrossTurnDataHistPO> dirHistPOS = entry.getValue();
} if (!CollectionUtils.isEmpty(dirHistPOS)) {
Map<String, List<CrossTurnDataHistPO>> turnDataPOMap = dirHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getTurnType));
for (Map.Entry<String, List<CrossTurnDataHistPO>> turnEntry : turnDataPOMap.entrySet()) {
String turn = turnEntry.getKey();
List<CrossTurnDataHistPO> turnDataHistPOList = turnEntry.getValue();
if (!CollectionUtils.isEmpty(turnDataHistPOList)) {
OptionalInt max = turnDataHistPOList.stream().map(CrossTurnDataHistPO::getFlow).mapToInt(Integer::intValue).max();
if (max.isPresent()) {
String key = crossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)).concat(Constants.UNDERLINE).concat(turn);
crossTurnWeekMaxFlowMap.put(key, max.getAsInt());
} }
} }
} }
......
...@@ -48,26 +48,7 @@ public class WeekDirFreeFlowSpeedCache implements CommandLineRunner { ...@@ -48,26 +48,7 @@ public class WeekDirFreeFlowSpeedCache implements CommandLineRunner {
queryWrapper.between(CrossDirDataHistPO::getStartTime, lastWeek, lastDay); queryWrapper.between(CrossDirDataHistPO::getStartTime, lastWeek, lastDay);
List<CrossDirDataHistPO> crossDirDataHistPOS = crossDirDataHistMapper.selectList(queryWrapper); List<CrossDirDataHistPO> crossDirDataHistPOS = crossDirDataHistMapper.selectList(queryWrapper);
if (!CollectionUtils.isEmpty(crossDirDataHistPOS)) { if (!CollectionUtils.isEmpty(crossDirDataHistPOS)) {
Map<Integer, List<CrossDirDataHistPO>> dirDataMap = crossDirDataHistPOS.stream().collect(Collectors.groupingBy(CrossDirDataHistPO::getDirType)); calculateData(crossDirFreeFlowSpeedDTOS, crossId, crossDirDataHistPOS);
if (!dirDataMap.isEmpty()) {
for (Map.Entry<Integer, List<CrossDirDataHistPO>> entry : dirDataMap.entrySet()) {
Integer dir = entry.getKey();
List<CrossDirDataHistPO> dirDataHistPOS = entry.getValue();
List<Double> weekSpeeds = dirDataHistPOS.stream().map(CrossDirDataHistPO::getSpeed).sorted().collect(Collectors.toList());
int startIndex = (int) (0.85 * weekSpeeds.size());
int endIndex = (int) (0.95 * weekSpeeds.size());
Double freeFlowSpeed = weekSpeeds.subList(startIndex, endIndex).stream().mapToDouble(Double::doubleValue).average().getAsDouble();
if (freeFlowSpeed == 0.0) {
freeFlowSpeed = 60.0;
}
crossDirFreeFlowSpeedMap.put(crossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)), freeFlowSpeed);
CrossDirFreeFlowSpeedDTO crossDirFreeFlowSpeedDTO = new CrossDirFreeFlowSpeedDTO();
crossDirFreeFlowSpeedDTO.setCrossId(crossId);
crossDirFreeFlowSpeedDTO.setDir(dir);
crossDirFreeFlowSpeedDTO.setFreeFlowSpeed(freeFlowSpeed);
crossDirFreeFlowSpeedDTOS.add(crossDirFreeFlowSpeedDTO);
}
}
} }
} }
Map<String, List<CrossDirFreeFlowSpeedDTO>> map = new HashMap<>(1); Map<String, List<CrossDirFreeFlowSpeedDTO>> map = new HashMap<>(1);
...@@ -75,6 +56,35 @@ public class WeekDirFreeFlowSpeedCache implements CommandLineRunner { ...@@ -75,6 +56,35 @@ public class WeekDirFreeFlowSpeedCache implements CommandLineRunner {
producerHandler.send2FirstKafka(Constants.CROSS_FREE_FLOW_SPEED_TOPIC, map); producerHandler.send2FirstKafka(Constants.CROSS_FREE_FLOW_SPEED_TOPIC, map);
} }
/**
* 计算前一周方向自由流速度85~95分位速度
* @param crossDirFreeFlowSpeedDTOS
* @param crossId
* @param crossDirDataHistPOS
*/
private static void calculateData(List<CrossDirFreeFlowSpeedDTO> crossDirFreeFlowSpeedDTOS, String crossId, List<CrossDirDataHistPO> crossDirDataHistPOS) {
Map<Integer, List<CrossDirDataHistPO>> dirDataMap = crossDirDataHistPOS.stream().collect(Collectors.groupingBy(CrossDirDataHistPO::getDirType));
if (!dirDataMap.isEmpty()) {
for (Map.Entry<Integer, List<CrossDirDataHistPO>> entry : dirDataMap.entrySet()) {
Integer dir = entry.getKey();
List<CrossDirDataHistPO> dirDataHistPOS = entry.getValue();
List<Double> weekSpeeds = dirDataHistPOS.stream().map(CrossDirDataHistPO::getSpeed).sorted().collect(Collectors.toList());
int startIndex = (int) (0.85 * weekSpeeds.size());
int endIndex = (int) (0.95 * weekSpeeds.size());
Double freeFlowSpeed = weekSpeeds.subList(startIndex, endIndex).stream().mapToDouble(Double::doubleValue).average().getAsDouble();
if (freeFlowSpeed == 0.0) {
freeFlowSpeed = 60.0;
}
crossDirFreeFlowSpeedMap.put(crossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)), freeFlowSpeed);
CrossDirFreeFlowSpeedDTO crossDirFreeFlowSpeedDTO = new CrossDirFreeFlowSpeedDTO();
crossDirFreeFlowSpeedDTO.setCrossId(crossId);
crossDirFreeFlowSpeedDTO.setDir(dir);
crossDirFreeFlowSpeedDTO.setFreeFlowSpeed(freeFlowSpeed);
crossDirFreeFlowSpeedDTOS.add(crossDirFreeFlowSpeedDTO);
}
}
}
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
try { try {
......
...@@ -43,29 +43,37 @@ public class WeekTurnFreeFlowSpeedCache implements CommandLineRunner { ...@@ -43,29 +43,37 @@ public class WeekTurnFreeFlowSpeedCache implements CommandLineRunner {
queryWrapper.between(CrossTurnDataHistPO::getStartTime, lastWeek, lastDay); queryWrapper.between(CrossTurnDataHistPO::getStartTime, lastWeek, lastDay);
List<CrossTurnDataHistPO> crossTurnDataHistPOS = crossTurnDataHistMapper.selectList(queryWrapper); List<CrossTurnDataHistPO> crossTurnDataHistPOS = crossTurnDataHistMapper.selectList(queryWrapper);
if (!CollectionUtils.isEmpty(crossTurnDataHistPOS)) { if (!CollectionUtils.isEmpty(crossTurnDataHistPOS)) {
Map<Integer, List<CrossTurnDataHistPO>> dirDataMap = crossTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getInDir)); calculateData(crossTurnDataHistPOS);
for (Map.Entry<Integer, List<CrossTurnDataHistPO>> dirEntry : dirDataMap.entrySet()) { }
Integer dir = dirEntry.getKey(); }
List<CrossTurnDataHistPO> dirTurnDataHistPOS = dirEntry.getValue(); }
Map<String, List<CrossTurnDataHistPO>> turnDataMap = dirTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getTurnType));
if (!turnDataMap.isEmpty()) { /**
for (Map.Entry<String, List<CrossTurnDataHistPO>> entry : turnDataMap.entrySet()) { * 计算前一周自由流速度85~95分位速度
String turn = entry.getKey(); * @param crossTurnDataHistPOS
List<CrossTurnDataHistPO> turnDataHistPOS = entry.getValue(); */
String currentCrossId = turnDataHistPOS.get(0).getCrossId(); private static void calculateData(List<CrossTurnDataHistPO> crossTurnDataHistPOS) {
List<Double> weekTurnSpeeds = turnDataHistPOS.stream().map(CrossTurnDataHistPO::getSpeed).sorted().collect(Collectors.toList()); Map<Integer, List<CrossTurnDataHistPO>> dirDataMap = crossTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getInDir));
int startIndex = (int) (0.85 * weekTurnSpeeds.size()); for (Map.Entry<Integer, List<CrossTurnDataHistPO>> dirEntry : dirDataMap.entrySet()) {
int endIndex = (int) (0.95 * weekTurnSpeeds.size()); Integer dir = dirEntry.getKey();
if (startIndex == endIndex) { List<CrossTurnDataHistPO> dirTurnDataHistPOS = dirEntry.getValue();
startIndex = startIndex - 1; Map<String, List<CrossTurnDataHistPO>> turnDataMap = dirTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getTurnType));
} if (!turnDataMap.isEmpty()) {
OptionalDouble average = weekTurnSpeeds.subList(startIndex, endIndex).stream().mapToDouble(Double::doubleValue).average(); for (Map.Entry<String, List<CrossTurnDataHistPO>> entry : turnDataMap.entrySet()) {
if (average.isPresent()) { String turn = entry.getKey();
double turnFreeFlowSpeed = average.getAsDouble(); List<CrossTurnDataHistPO> turnDataHistPOS = entry.getValue();
String key = currentCrossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)).concat(Constants.UNDERLINE).concat(turn); String currentCrossId = turnDataHistPOS.get(0).getCrossId();
crossTurnFreeFlowSpeedMap.put(key, turnFreeFlowSpeed); List<Double> weekTurnSpeeds = turnDataHistPOS.stream().map(CrossTurnDataHistPO::getSpeed).sorted().collect(Collectors.toList());
} int startIndex = (int) (0.85 * weekTurnSpeeds.size());
} int endIndex = (int) (0.95 * weekTurnSpeeds.size());
if (startIndex == endIndex) {
startIndex = startIndex - 1;
}
OptionalDouble average = weekTurnSpeeds.subList(startIndex, endIndex).stream().mapToDouble(Double::doubleValue).average();
if (average.isPresent()) {
double turnFreeFlowSpeed = average.getAsDouble();
String key = currentCrossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)).concat(Constants.UNDERLINE).concat(turn);
crossTurnFreeFlowSpeedMap.put(key, turnFreeFlowSpeed);
} }
} }
} }
......
...@@ -112,14 +112,10 @@ public class RedisConfig extends CachingConfigurerSupport { ...@@ -112,14 +112,10 @@ public class RedisConfig extends CachingConfigurerSupport {
return new FastJson2JsonRedisSerializer(Object.class); return new FastJson2JsonRedisSerializer(Object.class);
} }
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private Integer redisPort;
@Bean @Bean
public Jedis jedis() { public Jedis jedis() {
Jedis jedis = new Jedis(redisHost, redisPort); Jedis jedis = new Jedis(host, port);
jedis.auth("Wanji300552"); jedis.auth(password);
return jedis; return jedis;
} }
......
...@@ -32,4 +32,6 @@ public class Constant { ...@@ -32,4 +32,6 @@ public class Constant {
public static final String REAL_TIME_TURN_DATA_TOPIC= "cross_real_time_turn_data"; public static final String REAL_TIME_TURN_DATA_TOPIC= "cross_real_time_turn_data";
public static final String REAL_TIME_CROSS_DATA_KEY= "crossRealTimeData"; public static final String REAL_TIME_CROSS_DATA_KEY= "crossRealTimeData";
public static final String REAL_TIME_CROSS_DATA_TOPIC= "cross_real_time_data"; public static final String REAL_TIME_CROSS_DATA_TOPIC= "cross_real_time_data";
private Constant() {}
} }
...@@ -524,7 +524,7 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService { ...@@ -524,7 +524,7 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
crossDataIndexDTO.setLoadBalance(Double.valueOf(decimalFormat.format(loadBalance))); crossDataIndexDTO.setLoadBalance(Double.valueOf(decimalFormat.format(loadBalance)));
CrossRealTimeCache.crossDataIndexMap.put(crossId, crossDataIndexDTO); CrossRealTimeCache.crossDataIndexMap.put(crossId, crossDataIndexDTO);
} }
saveGreenWaveRealTime(insertCrossDirDataRealtimePOS, decimalFormat); saveGreenWaveRealTime(insertCrossDirDataRealtimePOS, decimalFormat, startDate);
return insertCrossDirDataRealtimePOS; return insertCrossDirDataRealtimePOS;
} }
...@@ -690,7 +690,7 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService { ...@@ -690,7 +690,7 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
} }
} }
private void saveGreenWaveRealTime(List<CrossDirDataRealtimePO> crossDirDataRealtimePOS, DecimalFormat decimalFormat) throws Exception { private void saveGreenWaveRealTime(List<CrossDirDataRealtimePO> crossDirDataRealtimePOS, DecimalFormat decimalFormat, Date date) throws Exception {
try { try {
Map<Integer, List<GreenwaveCrossPO>> greenWaveInfoMap = CrossGreenWaveInfoCache.greenWaveInfoMap; Map<Integer, List<GreenwaveCrossPO>> greenWaveInfoMap = CrossGreenWaveInfoCache.greenWaveInfoMap;
if (!greenWaveInfoMap.isEmpty()) { if (!greenWaveInfoMap.isEmpty()) {
...@@ -699,7 +699,6 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService { ...@@ -699,7 +699,6 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
for (Map.Entry<Integer, List<GreenwaveCrossPO>> entry : greenWaveInfoMap.entrySet()) { for (Map.Entry<Integer, List<GreenwaveCrossPO>> entry : greenWaveInfoMap.entrySet()) {
List<GreenwaveCrossPO> greenWaveCrossIds = entry.getValue(); List<GreenwaveCrossPO> greenWaveCrossIds = entry.getValue();
if (!CollectionUtils.isEmpty(greenWaveCrossIds)) { if (!CollectionUtils.isEmpty(greenWaveCrossIds)) {
int status = 1;
double trafficIndex = 1.0; double trafficIndex = 1.0;
Double speedSum = 0.0; Double speedSum = 0.0;
Double stopTimes = 0.0; Double stopTimes = 0.0;
...@@ -734,10 +733,7 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService { ...@@ -734,10 +733,7 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
for (CrossDirDataRealtimePO dirDataRealtimePO : crossDirDataRealtimePOS) { for (CrossDirDataRealtimePO dirDataRealtimePO : crossDirDataRealtimePOS) {
String crossIdDir = dirDataRealtimePO.getCrossId(); String crossIdDir = dirDataRealtimePO.getCrossId();
Integer dirType = dirDataRealtimePO.getDirType(); Integer dirType = dirDataRealtimePO.getDirType();
startTime = crossDirDataRealtimePOS.get(0).getStartTime();
if (StringUtils.equals(crossIdDir, crossId) && Objects.equals(inDir, dirType)) { if (StringUtils.equals(crossIdDir, crossId) && Objects.equals(inDir, dirType)) {
status = status > dirDataRealtimePO.getStatus() ? status : dirDataRealtimePO.getStatus();
trafficIndex = trafficIndex > dirDataRealtimePO.getTrafficIndex() ? trafficIndex : dirDataRealtimePO.getTrafficIndex(); trafficIndex = trafficIndex > dirDataRealtimePO.getTrafficIndex() ? trafficIndex : dirDataRealtimePO.getTrafficIndex();
speedSum += dirDataRealtimePO.getSpeed(); speedSum += dirDataRealtimePO.getSpeed();
stopTimes += dirDataRealtimePO.getStopTimes(); stopTimes += dirDataRealtimePO.getStopTimes();
...@@ -757,10 +753,10 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService { ...@@ -757,10 +753,10 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
noparkPassRateAvg = noparkPassRate / (greenwaveCount); noparkPassRateAvg = noparkPassRate / (greenwaveCount);
congRate = congCount / greenwaveCount; congRate = congCount / greenwaveCount;
} }
setTraffixIndex(decimalFormat, speedAvg, freeFlowSpeedAvg, greenwaveRealtimePO); // 计算交通指数,并设置status
setTraffixIndexAndStatus(decimalFormat, speedAvg, freeFlowSpeedAvg, greenwaveRealtimePO);
greenwaveRealtimePO.setGreenId(greenWaveId); greenwaveRealtimePO.setGreenId(greenWaveId);
greenwaveRealtimePO.setStartTime(startTime); greenwaveRealtimePO.setStartTime(date);
greenwaveRealtimePO.setStatus(status);
greenwaveRealtimePO.setType(1); greenwaveRealtimePO.setType(1);
greenwaveRealtimePO.setSpeed(speedAvg); greenwaveRealtimePO.setSpeed(speedAvg);
if (speedAvg != 0) { if (speedAvg != 0) {
...@@ -797,7 +793,7 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService { ...@@ -797,7 +793,7 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
} }
} }
private static void setTraffixIndex(DecimalFormat decimalFormat, double speedAvg, Double freeFlowSpeedAvg, GreenwaveRealtimePO greenwaveRealtimePO) { private static void setTraffixIndexAndStatus(DecimalFormat decimalFormat, double speedAvg, Double freeFlowSpeedAvg, GreenwaveRealtimePO greenwaveRealtimePO) {
Double greenwaveIndex = Double.valueOf(decimalFormat.format(freeFlowSpeedAvg / speedAvg)); Double greenwaveIndex = Double.valueOf(decimalFormat.format(freeFlowSpeedAvg / speedAvg));
if (greenwaveIndex >= 10) { if (greenwaveIndex >= 10) {
greenwaveRealtimePO.setTrafficIndex(10.0); greenwaveRealtimePO.setTrafficIndex(10.0);
...@@ -806,6 +802,18 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService { ...@@ -806,6 +802,18 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
} else { } else {
greenwaveRealtimePO.setTrafficIndex(1.0); greenwaveRealtimePO.setTrafficIndex(1.0);
} }
// 根据指数设置status
int status = 1;
if (greenwaveIndex <= 1.8) {
status = 1;
} else if (greenwaveIndex > 1.8 && greenwaveIndex <= 2.5) {
status = 2;
} else if (greenwaveIndex > 2.5 && greenwaveIndex <= 3.5) {
status = 3;
} else if (greenwaveIndex > 3.5) {
status = 4;
}
greenwaveRealtimePO.setStatus(status);
} }
private static Double getFreeFlowSpeed(Double freeFlowSpeed, String crossId, Map<String, Double> crossFreeFlowSpeedMap, Integer dirType) { private static Double getFreeFlowSpeed(Double freeFlowSpeed, String crossId, Map<String, Double> crossFreeFlowSpeedMap, Integer dirType) {
...@@ -860,8 +868,8 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService { ...@@ -860,8 +868,8 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
// 替换干线旅行速度 // 替换干线旅行速度
po.setSpeed(lineCongestion.getSpeed()); po.setSpeed(lineCongestion.getSpeed());
// 替换状态 // 替换状态
String congestionCode = lineCongestion.getCongestionCode(); String congestionType = lineCongestion.getCongestionType();
if (EventAbnormalEnum.NO_CONGEST.getType().equals(congestionCode)) { if (EventAbnormalEnum.NO_CONGEST.getType().equals(congestionType)) {
po.setStatus(1); po.setStatus(1);
} else { } else {
po.setStatus(3); po.setStatus(3);
......
...@@ -62,51 +62,56 @@ public class DataCenterTask { ...@@ -62,51 +62,56 @@ public class DataCenterTask {
int greenDelayTime = 0; // 每个路口绿波开始时间 int greenDelayTime = 0; // 每个路口绿波开始时间
List<GreenwaveCrossPO> greenwaveCrossPOList = entry.getValue(); List<GreenwaveCrossPO> greenwaveCrossPOList = entry.getValue();
if (!CollectionUtils.isEmpty(greenwaveCrossPOList)) { if (!CollectionUtils.isEmpty(greenwaveCrossPOList)) {
for (GreenwaveCrossPO greenwaveCrossPO : greenwaveCrossPOList) { greenwaveWidth = calculate(greenwaveWidth, lastCrossDistance, crossTimeOffset, greenDelayTime, greenwaveCrossPOList);
String crossId = greenwaveCrossPO.getCrossId(); }
Integer sectionId = greenwaveCrossPO.getSectionId(); log.info("当前绿波编号:{}, 绿波带宽:{}", greenId, greenwaveWidth);
Integer inDir = greenwaveCrossPO.getInDir(); }
Integer offset = greenwaveCrossPO.getOffset(); } catch (Exception e) {
crossTimeOffset += (int) (lastCrossDistance / (100 / 6)); // 自由流速度60 log.error("获取绿波带宽异常", e);
if (sectionId != null) { throw new Exception("获取绿波带宽异常", e);
CrossSectionPO crossSectionPO = baseCrossSectionMapper.selectById(sectionId); }
if (Objects.nonNull(crossSectionPO)) { }
Integer schemeId = crossSectionPO.getSchemeId();
BaseCrossSchemePO baseCrossSchemePO = baseCrossSchemeMapper.selectById(schemeId); private int calculate(int greenwaveWidth, Double lastCrossDistance, int crossTimeOffset, int greenDelayTime, List<GreenwaveCrossPO> greenwaveCrossPOList) {
if (Objects.nonNull(baseCrossSchemePO)) { for (GreenwaveCrossPO greenwaveCrossPO : greenwaveCrossPOList) {
String schemeNo = baseCrossSchemePO.getSchemeNo(); String crossId = greenwaveCrossPO.getCrossId();
List<CrossSchemePhaseDirTurnDTO> crossSchemePhaseDirTurnDTOList = CrossSchemePhaseLightsCache.crossSchemePhaseDirTurnDTOList; Integer sectionId = greenwaveCrossPO.getSectionId();
if (!CollectionUtils.isEmpty(crossSchemePhaseDirTurnDTOList)) { Integer inDir = greenwaveCrossPO.getInDir();
Integer offset = greenwaveCrossPO.getOffset();
crossTimeOffset += (int) (lastCrossDistance / (100 / 6)); // 自由流速度60
if (sectionId != null) {
CrossSectionPO crossSectionPO = baseCrossSectionMapper.selectById(sectionId);
if (Objects.nonNull(crossSectionPO)) {
Integer schemeId = crossSectionPO.getSchemeId();
BaseCrossSchemePO baseCrossSchemePO = baseCrossSchemeMapper.selectById(schemeId);
if (Objects.nonNull(baseCrossSchemePO)) {
String schemeNo = baseCrossSchemePO.getSchemeNo();
List<CrossSchemePhaseDirTurnDTO> crossSchemePhaseDirTurnDTOList = CrossSchemePhaseLightsCache.crossSchemePhaseDirTurnDTOList;
if (!CollectionUtils.isEmpty(crossSchemePhaseDirTurnDTOList)) {
// 过滤路口编号,方案号,直行或者圆饼灯, // 过滤路口编号,方案号,直行或者圆饼灯,
List<CrossSchemePhaseDirTurnDTO> currenSchemeList = crossSchemePhaseDirTurnDTOList.stream().filter(po -> StringUtils.equals(crossId, po.getCrossId()) && List<CrossSchemePhaseDirTurnDTO> currenSchemeList = crossSchemePhaseDirTurnDTOList.stream().filter(po -> StringUtils.equals(crossId, po.getCrossId()) &&
StringUtils.equals(po.getSchemeNo(), schemeNo) && StringUtils.equals(po.getSchemeNo(), schemeNo) &&
(Objects.equals(po.getLightsTurn(), 3) || Objects.equals(po.getLightsTurn(), 1))).collect(Collectors.toList()); (Objects.equals(po.getLightsTurn(), 3) || Objects.equals(po.getLightsTurn(), 1))).collect(Collectors.toList());
Integer lastPhaseTime = 0; // 上一个相位的时间 Integer lastPhaseTime = 0; // 上一个相位的时间
for (CrossSchemePhaseDirTurnDTO dto : currenSchemeList) { for (CrossSchemePhaseDirTurnDTO dto : currenSchemeList) {
// 当前路口方案绿波方向的绿灯时间 // 当前路口方案绿波方向的绿灯时间
lastPhaseTime += dto.getPhaseTime(); lastPhaseTime += dto.getPhaseTime();
if (Objects.equals(inDir, dto.getLightsDir())) { if (Objects.equals(inDir, dto.getLightsDir())) {
lastPhaseTime -= dto.getPhaseTime(); lastPhaseTime -= dto.getPhaseTime();
greenDelayTime = lastPhaseTime > greenDelayTime ? lastPhaseTime : greenDelayTime; greenDelayTime = lastPhaseTime > greenDelayTime ? lastPhaseTime : greenDelayTime;
Integer greenWaveStartIndex = offset + greenDelayTime + crossTimeOffset; Integer greenWaveStartIndex = offset + greenDelayTime + crossTimeOffset;
Integer greenWaveEndIndex = offset + dto.getPhaseTime(); Integer greenWaveEndIndex = offset + dto.getPhaseTime();
greenwaveWidth = Math.min(greenwaveWidth, greenWaveEndIndex - greenWaveStartIndex); greenwaveWidth = Math.min(greenwaveWidth, greenWaveEndIndex - greenWaveStartIndex);
}
}
}
} }
} }
} }
lastCrossDistance = greenwaveCrossPO.getNextCrossLen();
} }
} }
log.info("当前绿波编号:{}, 绿波带宽:{}", greenId, greenwaveWidth);
} }
} catch (Exception e) { lastCrossDistance = greenwaveCrossPO.getNextCrossLen();
log.error("获取绿波带宽异常", e);
throw new Exception("获取绿波带宽异常", e);
} }
return greenwaveWidth;
} }
} }
...@@ -12,7 +12,6 @@ import net.wanji.datacenter.cache.CrossSchemePhaseLightsCache; ...@@ -12,7 +12,6 @@ import net.wanji.datacenter.cache.CrossSchemePhaseLightsCache;
import net.wanji.datacenter.kafka.ConsumerHandler; import net.wanji.datacenter.kafka.ConsumerHandler;
import net.wanji.datacenter.pojo.dto.CrossSchemePhaseDirTurnDTO; import net.wanji.datacenter.pojo.dto.CrossSchemePhaseDirTurnDTO;
import net.wanji.datacenter.pojo.dto.CrossSnapshotDataDTO; import net.wanji.datacenter.pojo.dto.CrossSnapshotDataDTO;
import net.wanji.datacenter.pojo.dto.LanePeriodicDataDTO;
import net.wanji.datacenter.service.impl.LanePeriodicDataProcessServiceImpl; import net.wanji.datacenter.service.impl.LanePeriodicDataProcessServiceImpl;
import net.wanji.feign.service.UtcFeignClients; import net.wanji.feign.service.UtcFeignClients;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
...@@ -52,59 +51,67 @@ public class DataProcessTask { ...@@ -52,59 +51,67 @@ public class DataProcessTask {
return; return;
} }
List<LightsStatusVO> lightsStatusVOS = JacksonUtils.getInstance().convertValue(jsonViewObject.getContent(), new TypeReference<List<LightsStatusVO>>() { List<LightsStatusVO> lightsStatusVOS = JacksonUtils.getInstance().convertValue(jsonViewObject.getContent(), new TypeReference<List<LightsStatusVO>>() {});
}); calculateData(lightsStatusVOS);
for (LightsStatusVO lightsStatusVO : lightsStatusVOS) { }catch (Exception e) {
String crossId = lightsStatusVO.getCrossId(); // log.error("",e);
String schemeNo = lightsStatusVO.getSchemeId(); }
String currentPhaseId = lightsStatusVO.getPhaseId(); // 当前相位红灯说明都是红灯,全场红 }
long currenSignalTime = Long.parseLong(lightsStatusVO.getSchemeStartTime());
ConcurrentHashMap<String, Integer> crossIdPhaseIdClearGreenMap = CrossRealTimeCache.crossPhaseCarNumsMap; private void calculateData(List<LightsStatusVO> lightsStatusVOS) throws Exception {
getClearRateCache(crossId, currentPhaseId, crossIdPhaseIdClearGreenMap); for (LightsStatusVO lightsStatusVO : lightsStatusVOS) {
Map<String, Object> dirLampGroupMap = lightsStatusVO.getDirLampGroupMap(); String crossId = lightsStatusVO.getCrossId();
boolean isAllRed = true; String schemeNo = lightsStatusVO.getSchemeId();
if (!dirLampGroupMap.isEmpty()) { String currentPhaseId = lightsStatusVO.getPhaseId(); // 当前相位红灯说明都是红灯,全场红
for (Map.Entry<String, Object> entry : dirLampGroupMap.entrySet()) { long currenSignalTime = Long.parseLong(lightsStatusVO.getSchemeStartTime());
Map<String, String> turnLightColorMap = (Map<String, String>) entry.getValue(); ConcurrentHashMap<String, Integer> crossIdPhaseIdClearGreenMap = CrossRealTimeCache.crossPhaseCarNumsMap;
for (Map.Entry<String, String> colorEntry : turnLightColorMap.entrySet()) { getClearRateCache(crossId, currentPhaseId, crossIdPhaseIdClearGreenMap);
String lightColor = colorEntry.getValue(); Map<String, Object> dirLampGroupMap = lightsStatusVO.getDirLampGroupMap();
// 全场红,如果有一个绿灯就不计算 20行人跳过 // 是否是全场红
String key = colorEntry.getKey(); boolean isAllRed = getAllRed(dirLampGroupMap);
if (Objects.equals(lightColor, "green") && !StringUtils.equals(key, "20")) { // 如果全场红
isAllRed = false; if (isAllRed) {
} List<CrossSchemePhaseDirTurnDTO> crossSchemePhaseDirTurnDTOList = CrossSchemePhaseLightsCache.crossSchemePhaseDirTurnDTOList;
int redTime = 2;
if (!CollectionUtils.isEmpty(crossSchemePhaseDirTurnDTOList)) {
for (CrossSchemePhaseDirTurnDTO dto : crossSchemePhaseDirTurnDTOList) {
String crossIdCache = dto.getCrossId();
String schemeNoCache = dto.getSchemeNo();
String phaseNoCache = dto.getPhaseNo();
if (StringUtils.equals(crossIdCache, crossId) && StringUtils.equals(schemeNoCache, schemeNo)
&& StringUtils.equals(currentPhaseId, phaseNoCache)) {
redTime = dto.getRedTime();
} }
} }
// 剩余红灯时间
if (redTime >= 1) {
--redTime;
}
Integer carNums = getCarNums(crossId, currenSignalTime + redTime * 1000L);
startPhaseIdMap.putIfAbsent(crossId, currentPhaseId);
crossIdPhaseIdClearGreenMap.put(crossId.concat(currentPhaseId), carNums);
} }
LanePeriodicDataProcessServiceImpl.periodHaveAllRedMap.put(crossId, 1);
}
}
}
// 如果全场红 private static boolean getAllRed(Map<String, Object> dirLampGroupMap) {
if (isAllRed) { boolean isAllRed = true;
List<CrossSchemePhaseDirTurnDTO> crossSchemePhaseDirTurnDTOList = CrossSchemePhaseLightsCache.crossSchemePhaseDirTurnDTOList; if (!dirLampGroupMap.isEmpty()) {
int redTime = 2; for (Map.Entry<String, Object> entry : dirLampGroupMap.entrySet()) {
if (!CollectionUtils.isEmpty(crossSchemePhaseDirTurnDTOList)) { Map<String, String> turnLightColorMap = (Map<String, String>) entry.getValue();
for (CrossSchemePhaseDirTurnDTO dto : crossSchemePhaseDirTurnDTOList) { for (Map.Entry<String, String> colorEntry : turnLightColorMap.entrySet()) {
String crossIdCache = dto.getCrossId(); String lightColor = colorEntry.getValue();
String schemeNoCache = dto.getSchemeNo(); // 全场红,如果有一个绿灯就不计算 20行人跳过
String phaseNoCache = dto.getPhaseNo(); String key = colorEntry.getKey();
if (StringUtils.equals(crossIdCache, crossId) && StringUtils.equals(schemeNoCache, schemeNo) if (Objects.equals(lightColor, "green") && !StringUtils.equals(key, "20")) {
&& StringUtils.equals(currentPhaseId, phaseNoCache)) { isAllRed = false;
redTime = dto.getRedTime();
}
}
// 剩余红灯时间
if (redTime >= 1) {
--redTime;
}
Integer carNums = getCarNums(crossId, currenSignalTime + redTime * 1000L);
startPhaseIdMap.putIfAbsent(crossId, currentPhaseId);
crossIdPhaseIdClearGreenMap.put(crossId.concat(currentPhaseId), carNums);
} }
LanePeriodicDataProcessServiceImpl.periodHaveAllRedMap.put(crossId, 1);
} }
} }
}catch (Exception e) {
// log.error("",e);
} }
return isAllRed;
} }
private void getClearRateCache(String crossId, String currentPhaseId, Map<String, Integer> crossIdPhaseIdClearGreenMap) throws JsonProcessingException { private void getClearRateCache(String crossId, String currentPhaseId, Map<String, Integer> crossIdPhaseIdClearGreenMap) throws JsonProcessingException {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment