Commit d86905af authored by duanruiming's avatar duanruiming

[update] 方向周平均流量;转向自由流速度

parent 918a5279
package net.wanji.datacenter.cache;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.mapper.CrossDirDataHistMapper;
import net.wanji.databus.po.CrossDirDataHistPO;
import net.wanji.datacenter.common.Constants;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/10/20 10:41
*/
@Component
@Slf4j
public class CrossDirWeekAvgFlowCache implements CommandLineRunner {
public static final Map<String, Double> crossDirWeekAvgFlowMap = new HashMap<>();
@Resource
private CrossDirDataHistMapper crossDirDataHistMapper;
private void getAvgFlow() throws Exception {
LocalDate currentDate = LocalDate.now();
LocalDate lastDay = currentDate.minus(1, ChronoUnit.DAYS);
LocalDate lastWeek = currentDate.minus(1, ChronoUnit.WEEKS);
List<String> crossIds = Arrays.asList("13NF80B5QN0", "13NGH0B5RC0", "13NI00B5RM0");
for (String crossId : crossIds) {
// 前一周
LambdaQueryWrapper<CrossDirDataHistPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CrossDirDataHistPO::getCrossId, crossId);
queryWrapper.between(CrossDirDataHistPO::getStartTime, lastWeek, lastDay);
List<CrossDirDataHistPO> crossDirDataHistPOS = crossDirDataHistMapper.selectList(queryWrapper);
if (!CollectionUtils.isEmpty(crossDirDataHistPOS)) {
Map<Integer, List<CrossDirDataHistPO>> dirDataMap = crossDirDataHistPOS.stream().collect(Collectors.groupingBy(CrossDirDataHistPO::getDirType));
if (!dirDataMap.isEmpty()) {
for (Map.Entry<Integer, List<CrossDirDataHistPO>> entry : dirDataMap.entrySet()) {
Integer dir = entry.getKey();
List<CrossDirDataHistPO> dirDataHistPOS = entry.getValue();
String currentCrossId = dirDataHistPOS.get(0).getCrossId();
Double dirFlowAvg = dirDataHistPOS.stream().map(CrossDirDataHistPO::getFlow).mapToInt(Integer::intValue).average().getAsDouble();
crossDirWeekAvgFlowMap.put(currentCrossId.concat(Constants.underline).concat(String.valueOf(dir)), dirFlowAvg);
}
}
}
}
}
@Override
public void run(String... args) throws Exception {
try {
getAvgFlow();
} catch (Exception e) {
log.error("7天内路口方向平均流量获取失败", e);
throw new Exception(e);
}
log.info("7天内路口方向平均流量:{}", crossDirWeekAvgFlowMap);
}
}
\ No newline at end of file
...@@ -49,8 +49,9 @@ public class CrossDirWeekMaxFlowCache implements CommandLineRunner { ...@@ -49,8 +49,9 @@ public class CrossDirWeekMaxFlowCache implements CommandLineRunner {
for (Map.Entry<Integer, List<CrossDirDataHistPO>> entry : dirDataMap.entrySet()) { for (Map.Entry<Integer, List<CrossDirDataHistPO>> entry : dirDataMap.entrySet()) {
Integer dir = entry.getKey(); Integer dir = entry.getKey();
List<CrossDirDataHistPO> dirDataHistPOS = entry.getValue(); List<CrossDirDataHistPO> dirDataHistPOS = entry.getValue();
String currentCrossId = dirDataHistPOS.get(0).getCrossId();
int dirFlowMax = dirDataHistPOS.stream().map(CrossDirDataHistPO::getFlow).mapToInt(Integer::intValue).max().getAsInt(); int dirFlowMax = dirDataHistPOS.stream().map(CrossDirDataHistPO::getFlow).mapToInt(Integer::intValue).max().getAsInt();
crossDirWeekMaxFlowMap.put(crossId.concat(Constants.underline).concat(String.valueOf(dir)), dirFlowMax); crossDirWeekMaxFlowMap.put(currentCrossId.concat(Constants.underline).concat(String.valueOf(dir)), dirFlowMax);
} }
} }
} }
......
...@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; ...@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.mapper.CrossDirDataHistMapper; import net.wanji.databus.dao.mapper.CrossDirDataHistMapper;
import net.wanji.databus.po.CrossDirDataHistPO; import net.wanji.databus.po.CrossDirDataHistPO;
import net.wanji.datacenter.common.Constants;
import net.wanji.datacenter.kafka.ProducerHandler; import net.wanji.datacenter.kafka.ProducerHandler;
import net.wanji.datacenter.pojo.dto.CrossDirFreeFlowSpeedDTO; import net.wanji.datacenter.pojo.dto.CrossDirFreeFlowSpeedDTO;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
...@@ -22,16 +23,16 @@ import java.util.stream.Collectors; ...@@ -22,16 +23,16 @@ import java.util.stream.Collectors;
*/ */
@Component @Component
@Slf4j @Slf4j
public class WeekFreeFlowSpeedCache implements CommandLineRunner { public class WeekDirFreeFlowSpeedCache implements CommandLineRunner {
public static final Map<String, Double> crossFreeFlowSpeedMap = new HashMap<>(); public static final Map<String, Double> crossDirFreeFlowSpeedMap = new HashMap<>();
@Resource @Resource
private CrossDirDataHistMapper crossDirDataHistMapper; private CrossDirDataHistMapper crossDirDataHistMapper;
@Resource @Resource
private ProducerHandler producerHandler; private ProducerHandler producerHandler;
private void getWeekFreeFlowSpeed() throws Exception { private void getDirWeekFreeFlowSpeed() throws Exception {
LocalDate currentDate = LocalDate.now(); LocalDate currentDate = LocalDate.now();
LocalDate lastDay = currentDate.minus(1, ChronoUnit.DAYS); LocalDate lastDay = currentDate.minus(1, ChronoUnit.DAYS);
LocalDate lastWeek = currentDate.minus(1, ChronoUnit.WEEKS); LocalDate lastWeek = currentDate.minus(1, ChronoUnit.WEEKS);
...@@ -43,22 +44,18 @@ public class WeekFreeFlowSpeedCache implements CommandLineRunner { ...@@ -43,22 +44,18 @@ public class WeekFreeFlowSpeedCache implements CommandLineRunner {
queryWrapper.eq(CrossDirDataHistPO::getCrossId, crossId); queryWrapper.eq(CrossDirDataHistPO::getCrossId, crossId);
queryWrapper.between(CrossDirDataHistPO::getStartTime, lastWeek, lastDay); queryWrapper.between(CrossDirDataHistPO::getStartTime, lastWeek, lastDay);
List<CrossDirDataHistPO> crossDirDataHistPOS = crossDirDataHistMapper.selectList(queryWrapper); List<CrossDirDataHistPO> crossDirDataHistPOS = crossDirDataHistMapper.selectList(queryWrapper);
// 前一天 if (!CollectionUtils.isEmpty(crossDirDataHistPOS)) {
LambdaQueryWrapper<CrossDirDataHistPO> queryWrapper1 = new LambdaQueryWrapper<>(); Map<Integer, List<CrossDirDataHistPO>> dirDataMap = crossDirDataHistPOS.stream().collect(Collectors.groupingBy(CrossDirDataHistPO::getDirType));
queryWrapper1.eq(CrossDirDataHistPO::getCrossId, crossId);
queryWrapper1.between(CrossDirDataHistPO::getStartTime, lastDay, currentDate);
List<CrossDirDataHistPO> crossDirDataHistPOS111 = crossDirDataHistMapper.selectList(queryWrapper1);
if (!CollectionUtils.isEmpty(crossDirDataHistPOS111)) {
Map<Integer, List<CrossDirDataHistPO>> dirDataMap = crossDirDataHistPOS111.stream().collect(Collectors.groupingBy(CrossDirDataHistPO::getDirType));
if (!dirDataMap.isEmpty()) { if (!dirDataMap.isEmpty()) {
for (Map.Entry<Integer, List<CrossDirDataHistPO>> entry : dirDataMap.entrySet()) { for (Map.Entry<Integer, List<CrossDirDataHistPO>> entry : dirDataMap.entrySet()) {
Integer dir = entry.getKey(); Integer dir = entry.getKey();
List<CrossDirDataHistPO> dirDataHistPOS = entry.getValue(); List<CrossDirDataHistPO> dirDataHistPOS = entry.getValue();
String currentCrossId = dirDataHistPOS.get(0).getCrossId();
List<Double> weekSpeeds = dirDataHistPOS.stream().map(CrossDirDataHistPO::getSpeed).sorted().collect(Collectors.toList()); List<Double> weekSpeeds = dirDataHistPOS.stream().map(CrossDirDataHistPO::getSpeed).sorted().collect(Collectors.toList());
int startIndex = (int) (0.85 * weekSpeeds.size()); int startIndex = (int) (0.85 * weekSpeeds.size());
int endIndex = (int) (0.95 * weekSpeeds.size()); int endIndex = (int) (0.95 * weekSpeeds.size());
Double freeFlowSpeed = weekSpeeds.subList(startIndex, endIndex).stream().mapToDouble(Double::doubleValue).average().getAsDouble(); Double freeFlowSpeed = weekSpeeds.subList(startIndex, endIndex).stream().mapToDouble(Double::doubleValue).average().getAsDouble();
crossFreeFlowSpeedMap.put(crossId.concat("_").concat(String.valueOf(dir)), freeFlowSpeed); crossDirFreeFlowSpeedMap.put(crossId.concat(Constants.underline).concat(String.valueOf(dir)), freeFlowSpeed);
CrossDirFreeFlowSpeedDTO crossDirFreeFlowSpeedDTO = new CrossDirFreeFlowSpeedDTO(); CrossDirFreeFlowSpeedDTO crossDirFreeFlowSpeedDTO = new CrossDirFreeFlowSpeedDTO();
crossDirFreeFlowSpeedDTO.setCrossId(crossId); crossDirFreeFlowSpeedDTO.setCrossId(crossId);
crossDirFreeFlowSpeedDTO.setDir(dir); crossDirFreeFlowSpeedDTO.setDir(dir);
...@@ -76,12 +73,12 @@ public class WeekFreeFlowSpeedCache implements CommandLineRunner { ...@@ -76,12 +73,12 @@ public class WeekFreeFlowSpeedCache implements CommandLineRunner {
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
try { try {
getWeekFreeFlowSpeed(); getDirWeekFreeFlowSpeed();
} catch (Exception e) { } catch (Exception e) {
log.error("路口方向自由流速度失败", e); log.error("路口方向自由流速度失败", e);
throw new Exception(e); throw new Exception(e);
} }
log.info("路口方向自由流速度:{}", crossFreeFlowSpeedMap); log.info("路口方向自由流速度:{}", crossDirFreeFlowSpeedMap);
} }
} }
package net.wanji.datacenter.cache;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.mapper.CrossDirDataHistMapper;
import net.wanji.databus.dao.mapper.CrossTurnDataHistMapper;
import net.wanji.databus.po.CrossDirDataHistPO;
import net.wanji.databus.po.CrossTurnDataHistPO;
import net.wanji.datacenter.common.Constants;
import net.wanji.datacenter.kafka.ProducerHandler;
import net.wanji.datacenter.pojo.dto.CrossDirFreeFlowSpeedDTO;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/10/14 14:16
*/
@Component
@Slf4j
public class WeekTurnFreeFlowSpeedCache implements CommandLineRunner {
public static final Map<String, Double> crossTurnFreeFlowSpeedMap = new HashMap<>();
@Resource
private CrossTurnDataHistMapper crossTurnDataHistMapper;
private void getDirWeekFreeFlowSpeed() throws Exception {
LocalDate currentDate = LocalDate.now();
LocalDate lastDay = currentDate.minus(1, ChronoUnit.DAYS);
LocalDate lastWeek = currentDate.minus(1, ChronoUnit.WEEKS);
List<String> crossIds = Arrays.asList("13NF80B5QN0", "13NGH0B5RC0", "13NI00B5RM0");
for (String crossId : crossIds) {
// 前一周
LambdaQueryWrapper<CrossTurnDataHistPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CrossTurnDataHistPO::getCrossId, crossId);
queryWrapper.between(CrossTurnDataHistPO::getStartTime, lastWeek, lastDay);
List<CrossTurnDataHistPO> crossTurnDataHistPOS = crossTurnDataHistMapper.selectList(queryWrapper);
if (!CollectionUtils.isEmpty(crossTurnDataHistPOS)) {
Map<Integer, List<CrossTurnDataHistPO>> dirDataMap = crossTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getInDir));
for (Map.Entry<Integer, List<CrossTurnDataHistPO>> dirEntry : dirDataMap.entrySet()) {
Integer dir = dirEntry.getKey();
List<CrossTurnDataHistPO> dirTurnDataHistPOS = dirEntry.getValue();
Map<String, List<CrossTurnDataHistPO>> turnDataMap = dirTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getTurnType));
if (!turnDataMap.isEmpty()) {
for (Map.Entry<String, List<CrossTurnDataHistPO>> entry : turnDataMap.entrySet()) {
String turn = entry.getKey();
List<CrossTurnDataHistPO> turnDataHistPOS = entry.getValue();
String currentCrossId = turnDataHistPOS.get(0).getCrossId();
List<Double> weekTurnSpeeds = turnDataHistPOS.stream().map(CrossTurnDataHistPO::getSpeed).sorted().collect(Collectors.toList());
int startIndex = (int) (0.85 * weekTurnSpeeds.size());
int endIndex = (int) (0.95 * weekTurnSpeeds.size());
Double freeFlowSpeed = weekTurnSpeeds.subList(startIndex, endIndex).stream().mapToDouble(Double::doubleValue).average().getAsDouble();
String key = currentCrossId.concat(Constants.underline).concat(String.valueOf(dir)).concat(Constants.underline).concat(turn);
crossTurnFreeFlowSpeedMap.put(key, freeFlowSpeed);
}
}
}
}
}
}
@Override
public void run(String... args) throws Exception {
try {
getDirWeekFreeFlowSpeed();
} catch (Exception e) {
log.error("路口转向自由流速度失败", e);
throw new Exception(e);
}
log.info("路口转向自由流速度:{}", crossTurnFreeFlowSpeedMap);
}
}
...@@ -45,7 +45,7 @@ public class DataProcessTask { ...@@ -45,7 +45,7 @@ public class DataProcessTask {
* 需要注意搭接相位,目前搭接相位的两个相位当作为一个相位 * 需要注意搭接相位,目前搭接相位的两个相位当作为一个相位
* 延迟30s,否则Kafka中未消费但是运行定时任务获取消息 * 延迟30s,否则Kafka中未消费但是运行定时任务获取消息
*/ */
//@Scheduled(initialDelay = 30 * 1000, fixedRate = 1000) @Scheduled(initialDelay = 30 * 1000, fixedRate = 1000)
public void calculateGreenLightsClearRate() throws Exception { public void calculateGreenLightsClearRate() throws Exception {
List<CrossSchemePhaseDirTurnDTO> crossSchemePhaseDirTurnDTOList = CrossSchemePhaseLightsCache.crossSchemePhaseDirTurnDTOList; List<CrossSchemePhaseDirTurnDTO> crossSchemePhaseDirTurnDTOList = CrossSchemePhaseLightsCache.crossSchemePhaseDirTurnDTOList;
JsonViewObject jsonViewObject = utcFeignClients.lightStatus(); JsonViewObject jsonViewObject = utcFeignClients.lightStatus();
......
package net.wanji.datacenter.task; package net.wanji.datacenter.task;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.datacenter.cache.WeekFreeFlowSpeedCache; import net.wanji.datacenter.cache.WeekDirFreeFlowSpeedCache;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -17,10 +17,10 @@ import javax.annotation.Resource; ...@@ -17,10 +17,10 @@ import javax.annotation.Resource;
public class WeekFreeFlowSpeedTask { public class WeekFreeFlowSpeedTask {
@Resource @Resource
private WeekFreeFlowSpeedCache weekFreeFlowSpeedCache; private WeekDirFreeFlowSpeedCache weekDirFreeFlowSpeedCache;
@Scheduled(cron = "0 0 0 * * ?") @Scheduled(cron = "0 0 0 * * ?")
public void getWeekFreeFlowSpeed() throws Exception { public void getWeekFreeFlowSpeed() throws Exception {
weekFreeFlowSpeedCache.run(); weekDirFreeFlowSpeedCache.run();
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment