Commit 326a54f0 authored by zhoushiguang's avatar zhoushiguang

异常bug修改

parent a507df38
......@@ -3,7 +3,6 @@ package net.wanji.opt;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
......@@ -12,12 +11,11 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
* @author
*/
@SpringBootApplication(scanBasePackages = {"net.wanji.opt", "net.wanji.databus", "net.wanji.common",
"net.wanji.opt.controller","net.wanji.opt.synthesis.controller"})
"net.wanji.opt.task.controller"})
@MapperScan(basePackages = {"net.wanji.opt.dao.mapper", "net.wanji.databus.dao.mapper"})
@EnableTransactionManagement
@EnableScheduling
@EnableAsync
@EnableKafka
public class SignalOptimizeApplication {
public static void main(String[] args) {
......
package net.wanji.opt.cache;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.mapper.BaseCrossInfoMapper;
import net.wanji.databus.po.BaseCrossInfoPO;
import net.wanji.databus.vo.CrossInfoVO;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/03/14 10:42
*/
@Component
@Order(1)
@Slf4j
public class BaseCrossInfoCache implements CommandLineRunner {
@Resource
private BaseCrossInfoMapper baseCrossInfoMapper;
private static final List<BaseCrossInfoPO> crossInfoList = new ArrayList<>();
/**
* 获取所有路口信息
*
* @return
*/
public List<BaseCrossInfoPO> getCrossInfoCache() {
return crossInfoList;
}
/**
* 通过路口编号查询经纬度
*/
public static String getCoordinateByCrossId(String crossId) {
for (BaseCrossInfoPO baseCrossInfoPO : crossInfoList) {
if (StringUtils.equals(crossId, baseCrossInfoPO.getId())) {
return baseCrossInfoPO.getLocation();
}
}
return null;
}
/**
* 获取所有信控路口的路口编号
*
* @return
*/
public List<String> getIsSignalCrossIdList() {
return crossInfoList.stream().filter(crossInfoPO -> crossInfoPO.getIsSignal() == 1).map(BaseCrossInfoPO::getId).collect(Collectors.toList());
}
public String getCrossName(String crossId) {
for (BaseCrossInfoPO baseCrossInfoPO : crossInfoList) {
if (StringUtils.equals(crossId, baseCrossInfoPO.getId())) {
return baseCrossInfoPO.getName();
}
}
return "";
}
@Override
public void run(String... args) throws Exception {
//init();
}
public void init() {
List<BaseCrossInfoPO> baseCrossInfoPOS = baseCrossInfoMapper.selectAll(new CrossInfoVO());
if (!CollectionUtils.isEmpty(baseCrossInfoPOS)) {
List<BaseCrossInfoPO> collect = baseCrossInfoPOS.stream().filter(po -> Objects.equals(po.getIsSignal(), 1)).collect(Collectors.toList());
crossInfoList.addAll(collect);
}
}
}
//package net.wanji.opt.cache;
//
//import lombok.extern.slf4j.Slf4j;
//import net.wanji.databus.dao.mapper.BaseCrossInfoMapper;
//import net.wanji.databus.po.BaseCrossInfoPO;
//import net.wanji.databus.vo.CrossInfoVO;
//import org.apache.commons.lang3.StringUtils;
//import org.springframework.boot.CommandLineRunner;
//import org.springframework.core.annotation.Order;
//import org.springframework.stereotype.Component;
//import org.springframework.util.CollectionUtils;
//
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.List;
//import java.util.Objects;
//import java.util.stream.Collectors;
//
///**
// * @author duanruiming
// * @date 2023/03/14 10:42
// */
//@Component
//@Order(1)
//@Slf4j
//public class BaseCrossInfoCache implements CommandLineRunner {
//
// @Resource
// private BaseCrossInfoMapper baseCrossInfoMapper;
//
// private static final List<BaseCrossInfoPO> crossInfoList = new ArrayList<>();
//
// /**
// * 获取所有路口信息
// *
// * @return
// */
// public List<BaseCrossInfoPO> getCrossInfoCache() {
// return crossInfoList;
// }
//
// /**
// * 通过路口编号查询经纬度
// */
// public static String getCoordinateByCrossId(String crossId) {
// for (BaseCrossInfoPO baseCrossInfoPO : crossInfoList) {
// if (StringUtils.equals(crossId, baseCrossInfoPO.getId())) {
// return baseCrossInfoPO.getLocation();
// }
// }
// return null;
// }
//
// /**
// * 获取所有信控路口的路口编号
// *
// * @return
// */
// public List<String> getIsSignalCrossIdList() {
// return crossInfoList.stream().filter(crossInfoPO -> crossInfoPO.getIsSignal() == 1).map(BaseCrossInfoPO::getId).collect(Collectors.toList());
// }
//
// public String getCrossName(String crossId) {
// for (BaseCrossInfoPO baseCrossInfoPO : crossInfoList) {
// if (StringUtils.equals(crossId, baseCrossInfoPO.getId())) {
// return baseCrossInfoPO.getName();
// }
// }
// return "";
// }
//
// @Override
// public void run(String... args) throws Exception {
// //init();
// }
//
// public void init() {
// List<BaseCrossInfoPO> baseCrossInfoPOS = baseCrossInfoMapper.selectAll(new CrossInfoVO());
// if (!CollectionUtils.isEmpty(baseCrossInfoPOS)) {
// List<BaseCrossInfoPO> collect = baseCrossInfoPOS.stream().filter(po -> Objects.equals(po.getIsSignal(), 1)).collect(Collectors.toList());
// crossInfoList.addAll(collect);
// }
// }
//}
package net.wanji.opt.cache;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.entity.CrossPhasePO;
import net.wanji.databus.dao.mapper.BaseCrossPhaseMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/03/06 15:03
*/
@Component
@Slf4j
public class BaseCrossPhaseInfoCache implements CommandLineRunner {
@Resource
BaseCrossPhaseMapper baseCrossPhaseMapper;
/**
* 路口相位缓存
* key:crossId value:List<CrossPhasePO>
*/
private static final Map<String, List<CrossPhasePO>> crossPhaseInfoMap = new HashMap<>();
/**
* 获取所有路口相位列表
* @return
*/
public static Map<String, List<CrossPhasePO>> getCrossPhaseInfoCache() {
return crossPhaseInfoMap;
}
/**
* 通过路口编号获取路口列表
* @param crossId
* @return
*/
public List<CrossPhasePO> getCrossPhaseCacheByCrossId(String crossId) {
if (!crossPhaseInfoMap.isEmpty()) {
return crossPhaseInfoMap.get(crossId);
}
return Collections.emptyList();
}
@Override
public void run(String... args) throws Exception {
// init();
}
public void init() {
List<CrossPhasePO> crossPhasePOS = baseCrossPhaseMapper.listCrossPhasePO(new CrossPhasePO());
if (!CollectionUtils.isEmpty(crossPhasePOS)) {
crossPhaseInfoMap.putAll(crossPhasePOS.stream().collect(Collectors.groupingBy(CrossPhasePO::getCrossId)));
}
}
}
//package net.wanji.opt.cache;
//
//import lombok.extern.slf4j.Slf4j;
//import net.wanji.databus.dao.entity.CrossPhasePO;
//import net.wanji.databus.dao.mapper.BaseCrossPhaseMapper;
//import org.springframework.boot.CommandLineRunner;
//import org.springframework.stereotype.Component;
//import org.springframework.util.CollectionUtils;
//
//import javax.annotation.Resource;
//import java.util.Collections;
//import java.util.HashMap;
//import java.util.List;
//import java.util.Map;
//import java.util.stream.Collectors;
//
///**
// * @author duanruiming
// * @date 2023/03/06 15:03
// */
//@Component
//@Slf4j
//public class BaseCrossPhaseInfoCache implements CommandLineRunner {
//
// @Resource
// BaseCrossPhaseMapper baseCrossPhaseMapper;
//
// /**
// * 路口相位缓存
// * key:crossId value:List<CrossPhasePO>
// */
// private static final Map<String, List<CrossPhasePO>> crossPhaseInfoMap = new HashMap<>();
//
// /**
// * 获取所有路口相位列表
// * @return
// */
// public static Map<String, List<CrossPhasePO>> getCrossPhaseInfoCache() {
// return crossPhaseInfoMap;
// }
//
// /**
// * 通过路口编号获取路口列表
// * @param crossId
// * @return
// */
// public List<CrossPhasePO> getCrossPhaseCacheByCrossId(String crossId) {
// if (!crossPhaseInfoMap.isEmpty()) {
// return crossPhaseInfoMap.get(crossId);
// }
// return Collections.emptyList();
// }
//
// @Override
// public void run(String... args) throws Exception {
// // init();
// }
//
// public void init() {
// List<CrossPhasePO> crossPhasePOS = baseCrossPhaseMapper.listCrossPhasePO(new CrossPhasePO());
// if (!CollectionUtils.isEmpty(crossPhasePOS)) {
// crossPhaseInfoMap.putAll(crossPhasePOS.stream().collect(Collectors.groupingBy(CrossPhasePO::getCrossId)));
// }
// }
//}
package net.wanji.opt.cache;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.framework.Constants;
import net.wanji.databus.dao.entity.BaseCrossSchemePO;
import net.wanji.databus.dao.mapper.BaseCrossSchemeMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author duanruiming
* @date 2023/03/06 14:30
*/
@Component
@Slf4j
public class BaseCrossSchemeInfoCache implements CommandLineRunner {
@Resource
BaseCrossSchemeMapper baseCrossSchemeMapper;
/**
* key:crossId_id value:CrossSchemePO
*/
private static final Map<String, BaseCrossSchemePO> crossSchemeInfoMap = new HashMap<>();
/**
* 获取所有路口方案信息
* @return
*/
public static Map<String, BaseCrossSchemePO> getCrossSchemeInfosCache() {
return crossSchemeInfoMap;
}
/**
* 通过路口编号_方案id获取方案信息
* @param key: crossId_id
* @return
*/
public BaseCrossSchemePO getSchemePOCache(String key) {
if (!crossSchemeInfoMap.isEmpty()) {
return crossSchemeInfoMap.get(key);
}
return null;
}
@Override
public void run(String... args) throws Exception {
init();
}
private void init() {
try {
long start = System.currentTimeMillis();
List<BaseCrossSchemePO> baseCrossSchemePOS = baseCrossSchemeMapper.listCrossSchemeInfo(Constants.SystemParam.NULL, Constants.SystemParam.NULL, null);
if (!CollectionUtils.isEmpty(baseCrossSchemePOS)) {
baseCrossSchemePOS.forEach(crossSchemePO -> {
String crossId = crossSchemePO.getCrossId();
Integer schemeId = crossSchemePO.getId();
String key = String.join(Constants.SystemParam.SEPARATOR_UNDER_LINE, crossId, String.valueOf(schemeId));
crossSchemeInfoMap.put(key, crossSchemePO);
});
}
long end = System.currentTimeMillis();
log.info("加载路口方案信息到缓存耗时:{}ms,条数据,size={}", (end - start), crossSchemeInfoMap.size());
} catch (Exception e) {
log.info("加载路口方案信息到缓存失败", e);
}
}
}
//package net.wanji.opt.cache;
//
//import lombok.extern.slf4j.Slf4j;
//import net.wanji.common.framework.Constants;
//import net.wanji.databus.dao.entity.BaseCrossSchemePO;
//import net.wanji.databus.dao.mapper.BaseCrossSchemeMapper;
//import org.springframework.boot.CommandLineRunner;
//import org.springframework.stereotype.Component;
//import org.springframework.util.CollectionUtils;
//
//import javax.annotation.Resource;
//import java.util.HashMap;
//import java.util.List;
//import java.util.Map;
//
///**
// * @author duanruiming
// * @date 2023/03/06 14:30
// */
//@Component
//@Slf4j
//public class BaseCrossSchemeInfoCache implements CommandLineRunner {
//
// @Resource
// BaseCrossSchemeMapper baseCrossSchemeMapper;
//
// /**
// * key:crossId_id value:CrossSchemePO
// */
// private static final Map<String, BaseCrossSchemePO> crossSchemeInfoMap = new HashMap<>();
//
// /**
// * 获取所有路口方案信息
// * @return
// */
// public static Map<String, BaseCrossSchemePO> getCrossSchemeInfosCache() {
// return crossSchemeInfoMap;
// }
//
// /**
// * 通过路口编号_方案id获取方案信息
// * @param key: crossId_id
// * @return
// */
// public BaseCrossSchemePO getSchemePOCache(String key) {
// if (!crossSchemeInfoMap.isEmpty()) {
// return crossSchemeInfoMap.get(key);
// }
// return null;
// }
//
// @Override
// public void run(String... args) throws Exception {
// init();
// }
//
// private void init() {
//
// try {
// long start = System.currentTimeMillis();
// List<BaseCrossSchemePO> baseCrossSchemePOS = baseCrossSchemeMapper.listCrossSchemeInfo(Constants.SystemParam.NULL, Constants.SystemParam.NULL, null);
// if (!CollectionUtils.isEmpty(baseCrossSchemePOS)) {
// baseCrossSchemePOS.forEach(crossSchemePO -> {
// String crossId = crossSchemePO.getCrossId();
// Integer schemeId = crossSchemePO.getId();
// String key = String.join(Constants.SystemParam.SEPARATOR_UNDER_LINE, crossId, String.valueOf(schemeId));
// crossSchemeInfoMap.put(key, crossSchemePO);
// });
// }
//
// long end = System.currentTimeMillis();
// log.info("加载路口方案信息到缓存耗时:{}ms,条数据,size={}", (end - start), crossSchemeInfoMap.size());
// } catch (Exception e) {
// log.info("加载路口方案信息到缓存失败", e);
// }
// }
//}
package net.wanji.opt.cache;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.framework.Constants;
import net.wanji.common.utils.tool.StringUtils;
import net.wanji.databus.dao.entity.RidInfoEntity;
import net.wanji.databus.dao.mapper.RidInfoMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author duanruiming
* @date 2023/03/07 13:44
*/
@Component
@Slf4j
public class CrossRidInfoCache implements CommandLineRunner {
@Resource
RidInfoMapper ridInfoMapper;
private static final List<RidInfoEntity> ridInfoList = new ArrayList<>();
/**
* 获取所有路口关联列表
* @return
*/
public List<RidInfoEntity> getRidInfoAllList() {
return ridInfoList;
}
/**
* 通过上下游路口编号获取关联路口列表
* @param startCrossId
* @param endCrossId
* @return
*/
public List<RidInfoEntity> getRidInfoListByCrossId(String startCrossId, String endCrossId) {
Stream<RidInfoEntity> stream = ridInfoList.stream();
if (StringUtils.isNotBlank(startCrossId)) {
stream = stream.filter(ridInfoEntity -> StringUtils.equalsIgnoreCase(startCrossId, ridInfoEntity.getStartCrossId()));
}
if (StringUtils.isNotBlank(endCrossId)) {
stream = stream.filter(ridInfoEntity -> StringUtils.equalsIgnoreCase(endCrossId, ridInfoEntity.getEndCrossId()));
}
return stream.collect(Collectors.toList());
}
/**
* 刷新缓存
*/
public void refresh() {
init();
}
@Override
public void run(String... args) throws Exception {
// init();
}
public void init() {
List<RidInfoEntity> ridInfoEntities = ridInfoMapper.selectOne(Constants.SystemParam.NULL, Constants.SystemParam.NULL);
if (!CollectionUtils.isEmpty(ridInfoEntities)) {
ridInfoList.addAll(ridInfoEntities);
}
}
}
//package net.wanji.opt.cache;
//
//import lombok.extern.slf4j.Slf4j;
//import net.wanji.common.framework.Constants;
//import net.wanji.common.utils.tool.StringUtils;
//import net.wanji.databus.dao.entity.RidInfoEntity;
//import net.wanji.databus.dao.mapper.RidInfoMapper;
//import org.springframework.boot.CommandLineRunner;
//import org.springframework.stereotype.Component;
//import org.springframework.util.CollectionUtils;
//
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.List;
//import java.util.stream.Collectors;
//import java.util.stream.Stream;
//
///**
// * @author duanruiming
// * @date 2023/03/07 13:44
// */
//@Component
//@Slf4j
//public class CrossRidInfoCache implements CommandLineRunner {
//
// @Resource
// RidInfoMapper ridInfoMapper;
//
// private static final List<RidInfoEntity> ridInfoList = new ArrayList<>();
//
// /**
// * 获取所有路口关联列表
// * @return
// */
// public List<RidInfoEntity> getRidInfoAllList() {
// return ridInfoList;
// }
//
// /**
// * 通过上下游路口编号获取关联路口列表
// * @param startCrossId
// * @param endCrossId
// * @return
// */
// public List<RidInfoEntity> getRidInfoListByCrossId(String startCrossId, String endCrossId) {
// Stream<RidInfoEntity> stream = ridInfoList.stream();
// if (StringUtils.isNotBlank(startCrossId)) {
// stream = stream.filter(ridInfoEntity -> StringUtils.equalsIgnoreCase(startCrossId, ridInfoEntity.getStartCrossId()));
// }
// if (StringUtils.isNotBlank(endCrossId)) {
// stream = stream.filter(ridInfoEntity -> StringUtils.equalsIgnoreCase(endCrossId, ridInfoEntity.getEndCrossId()));
// }
// return stream.collect(Collectors.toList());
// }
//
// /**
// * 刷新缓存
// */
// public void refresh() {
// init();
// }
//
// @Override
// public void run(String... args) throws Exception {
// // init();
// }
//
// public void init() {
// List<RidInfoEntity> ridInfoEntities = ridInfoMapper.selectOne(Constants.SystemParam.NULL, Constants.SystemParam.NULL);
// if (!CollectionUtils.isEmpty(ridInfoEntities)) {
// ridInfoList.addAll(ridInfoEntities);
// }
// }
//}
package net.wanji.opt.cache;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.entity.GreenwaveInfoPO;
import net.wanji.databus.dao.mapper.GreenwaveInfoMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author duanruiming
* @date 2024/11/29 11:19
*/
@Component
@Slf4j
public class GreenWaveInfoCache implements CommandLineRunner {
public static final Map<Integer, GreenwaveInfoPO> greenWaveMap = new HashMap<>();
@Resource
private GreenwaveInfoMapper greenwaveInfoMapper;
@Override
public void run(String... args) throws Exception {
try {
// List<GreenwaveInfoPO> greenWaveInfoPOS = greenwaveInfoMapper.selectAll();
// if (!CollectionUtils.isEmpty(greenWaveInfoPOS)) {
// for (GreenwaveInfoPO greenWaveInfoPO : greenWaveInfoPOS) {
// greenWaveMap.put(greenWaveInfoPO.getId(), greenWaveInfoPO);
// }
// }
} catch (Exception e) {
log.error("绿波信息初始化失败:", e);
throw new RuntimeException("绿波信息初始化失败!");
}
}
}
//package net.wanji.opt.cache;
//
//import lombok.extern.slf4j.Slf4j;
//import net.wanji.databus.dao.entity.GreenwaveInfoPO;
//import net.wanji.databus.dao.mapper.GreenwaveInfoMapper;
//import org.springframework.boot.CommandLineRunner;
//import org.springframework.stereotype.Component;
//import org.springframework.util.CollectionUtils;
//
//import javax.annotation.Resource;
//import java.util.HashMap;
//import java.util.List;
//import java.util.Map;
//
///**
// * @author duanruiming
// * @date 2024/11/29 11:19
// */
//@Component
//@Slf4j
//public class GreenWaveInfoCache implements CommandLineRunner {
//
// public static final Map<Integer, GreenwaveInfoPO> greenWaveMap = new HashMap<>();
//
// @Resource
// private GreenwaveInfoMapper greenwaveInfoMapper;
//
// @Override
// public void run(String... args) throws Exception {
// try {
//// List<GreenwaveInfoPO> greenWaveInfoPOS = greenwaveInfoMapper.selectAll();
//// if (!CollectionUtils.isEmpty(greenWaveInfoPOS)) {
//// for (GreenwaveInfoPO greenWaveInfoPO : greenWaveInfoPOS) {
//// greenWaveMap.put(greenWaveInfoPO.getId(), greenWaveInfoPO);
//// }
//// }
// } catch (Exception e) {
// log.error("绿波信息初始化失败:", e);
// throw new RuntimeException("绿波信息初始化失败!");
// }
// }
//}
package net.wanji.opt.common;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.opt.dto.LineCongestion;
import net.wanji.opt.dto.PhaseEmptyResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
/**
* 从 Kafka 中按时间范围消费数据
*
* @author Kent HAN
* @date 2024/4/16 8:45
*/
@Slf4j
public class KafkaConsumerUtil {
private final KafkaConsumer<String, String> consumer;
private final ObjectMapper objectMapper;
public KafkaConsumerUtil(String bootstrapServers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
this.objectMapper = new ObjectMapper();
}
public List<PhaseEmptyResult> consumeEmptyPhaseForTimeRange(
String topic, int partition, long startTime, long endTime) {
List<PhaseEmptyResult> results = new ArrayList<>();
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(Collections.singletonList(topicPartition));
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(topicPartition, startTime);
OffsetAndTimestamp offsetAndTimestamp = consumer.offsetsForTimes(timestampToSearch).get(topicPartition);
if (offsetAndTimestamp == null) {
return results;
}
long startOffset = offsetAndTimestamp.offset();
// 开始消费
consumer.seek(topicPartition, startOffset);
int emptyPollCount = 0; // 记录空轮询次数的计数器
int maxEmptyPolls = 10; // 设置最大空轮询次数
try {
boolean keepConsuming = true;
while (keepConsuming) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
emptyPollCount++; // 如果没有记录,增加空轮询计数
if (emptyPollCount >= maxEmptyPolls) {
// 如果达到最大空轮询次数,退出循环
break;
}
} else {
emptyPollCount = 0; // 如果有记录,重置空轮询计数器
for (ConsumerRecord<String, String> record : records) {
long recordTime = record.timestamp();
if (recordTime >= startTime && recordTime <= endTime) {
PhaseEmptyResult phaseEmptyResult =
objectMapper.readValue(record.value(), PhaseEmptyResult.class);
results.add(phaseEmptyResult);
} else if (recordTime > endTime) {
keepConsuming = false;
break;
}
}
}
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} finally {
consumer.close();
}
return results;
}
public List<LineCongestion> consumeLineMetricForTimeRange(
String topic, int partition, long startTime, long endTime) {
List<LineCongestion> results = new ArrayList<>();
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(Collections.singletonList(topicPartition));
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(topicPartition, startTime);
OffsetAndTimestamp offsetAndTimestamp = consumer.offsetsForTimes(timestampToSearch).get(topicPartition);
if (offsetAndTimestamp == null) {
return results;
}
long startOffset = offsetAndTimestamp.offset();
// 开始消费
consumer.seek(topicPartition, startOffset);
int emptyPollCount = 0; // 记录空轮询次数的计数器
int maxEmptyPolls = 10; // 设置最大空轮询次数
try {
boolean keepConsuming = true;
while (keepConsuming) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
emptyPollCount++; // 如果没有记录,增加空轮询计数
if (emptyPollCount >= maxEmptyPolls) {
// 如果达到最大空轮询次数,退出循环
break;
}
} else {
emptyPollCount = 0; // 如果有记录,重置空轮询计数器
for (ConsumerRecord<String, String> record : records) {
long recordTime = record.timestamp();
if (recordTime >= startTime && recordTime <= endTime) {
LineCongestion lineCongestion = JSONObject.parseObject(record.value(), LineCongestion.class);
results.add(lineCongestion);
} else if (recordTime > endTime) {
keepConsuming = false;
break;
}
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
consumer.close();
}
log.info("从kafka中获取干线指标数据 topic:{},startTime:{},endTime:{}");
return results;
}
}
package net.wanji.opt.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author duanruiming
* @date 2024/11/18 21:48
* 多个kafka配置,不能使用Yml了,会有冲突,找不到groupId
*/
@Configuration
@EnableKafka
public class KafkaConsumersConfig {
//@Bean
//public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory1() {
// ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// factory.setConsumerFactory(consumerFactory1());
// // 可以在此设置其他属性,如concurrency, pollTimeout等
// return factory;
//}
//
//@Bean
//public ConsumerFactory<Integer, String> consumerFactory1() {
// return new DefaultKafkaConsumerFactory<>(consumerConfigs1());
//}
//
//@Bean
//public Map<String, Object> consumerConfigs1() {
// Map<String, Object> props = new HashMap<>();
// // 设置消费者所需的所有必要属性
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "37.12.182.31:9092");
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// // ... 其他必要属性
// return props;
//}
@Bean
public ConsumerFactory<Integer, String> consumerFactory2() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs2());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory2());
// 可以在此设置其他属性,如concurrency, pollTimeout等
return factory;
}
@Bean
public Map<String, Object> consumerConfigs2() {
Map<String, Object> props = new HashMap<>();
// 设置消费者所需的所有必要属性
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "37.12.182.31:9093");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// ... 其他必要属性
return props;
}
}
\ No newline at end of file
/*
* Copyright (C) 2018 Zhejiang xiaominfo Technology CO.,LTD.
* All rights reserved.
* Official Web Site: http://www.xiaominfo.com.
* Developer Web Site: http://open.xiaominfo.com.
*/
package net.wanji.opt.config;
import com.github.xiaoymin.knife4j.spring.annotations.EnableKnife4j;
import com.google.common.collect.Lists;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.ApiKey;
import springfox.documentation.service.AuthorizationScope;
import springfox.documentation.service.SecurityReference;
import springfox.documentation.service.SecurityScheme;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spi.service.contexts.SecurityContext;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.List;
//import com.github.xiaoymin.knife4j.spring.extension.OpenApiExtensionResolver;
//import springfox.documentation.swagger2.annotations.EnableSwagger2WebMvc;
//import springfox.documentation.swagger2.annotations.EnableSwagger2;
//import springfox.documentation.swagger2.annotations.EnableSwagger2WebMvc;
//import springfox.documentation.swagger2.annotations.EnableSwagger2WebMvc;
/**
* @author <a href="mailto:xiaoymin@foxmail.com">xiaoymin@foxmail.com</a>
* 2020/11/07 9:26
* @since:knife4j-spring-boot-fast-demo 1.0
*/
@Configuration
//@EnableSwagger2WebMvc
@EnableSwagger2
@EnableKnife4j
public class Knife4jConfiguration {
/**
* 设置头,因为平台携带token进行验证,所以需要在Swagger添加头设置的功能
* @return
*/
private List<SecurityScheme> securitySchemes(){
return Lists.newArrayList(
new ApiKey("Authorization","token","header"));
}
private List<SecurityContext> securityContexts(){
return Lists.newArrayList(SecurityContext.builder()
.securityReferences(defaultAuth())
.forPaths(PathSelectors.regex("^(?!login).*$"))
.build()
);
}
private List<SecurityReference> defaultAuth() {
AuthorizationScope authorizationScope = new AuthorizationScope("global", "accessEverything");
AuthorizationScope[] authorizationScopes = new AuthorizationScope[1];
authorizationScopes[0] = authorizationScope;
return Lists.newArrayList(
new SecurityReference("Authorization",authorizationScopes)
);
}
@Bean(value = "defaultApi2")
public Docket defaultApi2() {
String groupName="信控调优平台接口";
Docket docket=new Docket(DocumentationType.SWAGGER_2)
//.host(swaggerDocUrl)
.apiInfo(apiInfo())
.groupName(groupName)
.select()
.apis(RequestHandlerSelectors.any())
.paths(PathSelectors.any())
.build()
.securitySchemes(securitySchemes())
.securityContexts(securityContexts())
;//.extensions(openApiExtensionResolver.buildSettingExtensions());
return docket;
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("信控调优平台")
.description("信控调优平台 RESTful APIs")
//.termsOfServiceUrl(swaggerDocUrl)
.contact("wj@qq.com")
.version("0.1")
.build();
}
}
package net.wanji.opt.config;
import com.google.common.base.Predicates;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class Swagger2 {
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
//为当前包路径
.apis(Predicates.or(RequestHandlerSelectors.basePackage("net.wanji.opt.controller"),
RequestHandlerSelectors.basePackage("net.wanji.opt.synthesis.controller")))
.paths(PathSelectors.any())
.build();
}
//构建 api文档的详细信息函数,注意这里的注解引用的是哪个
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
//页面标题
.title("信控调优平台")
//创建人
.contact(new Contact("test", "#", "signal-opt@wanji.net.cn"))
//版本号
.version("1.0.0")
//描述
.description("城市交通信号优化系统API")
.build();
}
}
\ No newline at end of file
......@@ -67,11 +67,6 @@ public class StrategyPriorityDailyInfo implements Serializable {
@TableField(exist = false)
private Integer type;
/**
* 开关 0-关闭 1-开启
*/
private Integer switchStatus;
}
package net.wanji.opt.service.impl;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.databus.dao.mapper.BaseCrossLightsMapper;
import net.wanji.databus.po.BaseCrossLightsPO;
import net.wanji.opt.dto.CrossLightsDTO;
import net.wanji.opt.dto.LaneInfoDTO;
import net.wanji.opt.service.CrossLightsService;
import net.wanji.opt.service.LaneInfoService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author hfx
* @date 2023/1/13 17:48
* @desc CrossLightsServiceImpl
*/
@Service
@Slf4j
public class CrossLightsServiceImpl implements CrossLightsService {
@Resource
BaseCrossLightsMapper baseCrossLightsMapper;
@Resource
LaneInfoService laneInfoService;
@Override
public List<CrossLightsDTO> listCrossLights() {
List<BaseCrossLightsPO> lightsPOList = baseCrossLightsMapper.listCrossLights();
return BeanListUtils.populateList(lightsPOList, new ArrayList<>(), CrossLightsDTO.class);
}
@Override
public List<CrossLightsDTO> listLightsDetails() {
// 获取灯组基础信息
List<CrossLightsDTO> lightsDTOList = listCrossLights();
if(lightsDTOList.isEmpty()) {
return lightsDTOList;
}
// 获取灯组车道信息
List<LaneInfoDTO> laneInfoDTOList = laneInfoService.listLightsLaneInfo(null);
// 处理灯组车道信息,封装成Map:key是灯组ID,value 是车道集合
Map<Integer, List<LaneInfoDTO>> laneInfoMap = laneInfoDTOList.stream().collect(Collectors.groupingBy(LaneInfoDTO::getLightsId));
for(CrossLightsDTO lightsDTO: lightsDTOList) {
lightsDTO.setLaneInfos(laneInfoMap.get(lightsDTO.getId()));
}
return lightsDTOList;
}
}
package net.wanji.opt.service.impl;
import net.wanji.databus.dao.entity.CrossPhaseLightsPO;
import net.wanji.databus.dao.entity.CrossPhasePO;
import net.wanji.databus.dao.mapper.BaseCrossPhaseLightsMapper;
import net.wanji.databus.dao.mapper.BaseCrossPhaseMapper;
import net.wanji.opt.dto.CrossLightsDTO;
import net.wanji.opt.dto.CrossPhaseDTO;
import net.wanji.opt.service.CrossLightsService;
import net.wanji.opt.service.CrossPhaseService;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author hfx
* @date 2023/1/13 10:10
* @desc 相位接口服务实现
*/
@Service
public class CrossPhaseServiceImpl implements CrossPhaseService {
@Resource
BaseCrossPhaseMapper baseCrossPhaseMapper;
@Resource
BaseCrossPhaseLightsMapper baseCrossPhaseLightsMapper;
@Resource
CrossLightsService crossLightsService;
@Override
public List<CrossPhaseDTO> listCrossPhase() {
// 获取相位信息
List<CrossPhasePO> crossPhasePOList = baseCrossPhaseMapper.listCrossPhasePO(new CrossPhasePO());
List<CrossPhaseDTO> crossPhaseDTOS = new ArrayList<>();
for (CrossPhasePO crossPhasePO : crossPhasePOList) {
CrossPhaseDTO crossPhaseDTO = new CrossPhaseDTO();
BeanUtils.copyProperties(crossPhasePO, crossPhaseDTO);
crossPhaseDTO.setSchemeId(crossPhasePO.getPlanId());
crossPhaseDTOS.add(crossPhaseDTO);
}
return crossPhaseDTOS;
}
@Override
public List<CrossPhaseDTO> listPhaseDetails() {
// 获取相位信息
List<CrossPhaseDTO> phaseDTOList = listCrossPhase();
if(phaseDTOList.isEmpty()) {
return phaseDTOList;
}
// 获取相位灯组信息
Map<Integer, List<CrossLightsDTO>> phaseLightsMap = listPhaseLights();
for(CrossPhaseDTO phase : phaseDTOList) {
phase.setLightsInfos(phaseLightsMap.get(phase.getId()));
}
return phaseDTOList;
}
/**
* 获取相位灯组信息
* @return
*/
private Map<Integer, List<CrossLightsDTO>> listPhaseLights() {
Map<Integer, List<CrossLightsDTO>> phaseLightsMap = new HashMap<>();
// 获取相位灯组信息
List<CrossPhaseLightsPO> phaseLightsPOList = baseCrossPhaseLightsMapper.listCrossPhaseLightsPO(new CrossPhaseLightsPO());
if(phaseLightsPOList.isEmpty()) {
return phaseLightsMap;
}
// 获取灯组详情信息
List<CrossLightsDTO> lightsDTOList = crossLightsService.listLightsDetails();
// 处理灯组数据,封装成map:key是相位ID,value灯组集合
Map<Integer, CrossLightsDTO> lightsMap = lightsDTOList.stream().collect(Collectors.toMap(e -> e.getId(), e -> e));
Integer phaseId;
for(CrossPhaseLightsPO po : phaseLightsPOList) {
phaseId = po.getPhaseId();
if(phaseLightsMap.containsKey(phaseId)) {
lightsDTOList = phaseLightsMap.get(phaseId);
} else {
lightsDTOList = new ArrayList<>();
phaseLightsMap.put(phaseId, lightsDTOList);
}
lightsDTOList.add(lightsMap.get(po.getLightsId()));
}
return phaseLightsMap;
}
}
package net.wanji.opt.service.impl;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.databus.dao.entity.BaseCrossSchedulesPlanPO;
import net.wanji.databus.dao.mapper.BaseCrossSchedulesMapper;
import net.wanji.opt.dto.CrossSchedulesDTO;
import net.wanji.opt.dto.CrossSectionDTO;
import net.wanji.opt.service.CrossSchedulesService;
import net.wanji.opt.service.CrossSectionService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author hfx
* @date 2023/1/17 15:58
* @desc CrossSchedulesServiceImpl
*/
@Service
@Slf4j
public class CrossSchedulesServiceImpl implements CrossSchedulesService {
@Resource
BaseCrossSchedulesMapper baseCrossSchedulesMapper;
@Resource
CrossSectionService crossSectionService;
@Override
public List<CrossSchedulesDTO> listCrossSchedulesDetails(String crossId, Integer status) {
List<CrossSchedulesDTO> schedulesDTOList = new ArrayList<>();
// 获取调度计划信息
List<BaseCrossSchedulesPlanPO> schedulesPlanPOList = baseCrossSchedulesMapper.listSchedulesPlan(crossId, status);
schedulesDTOList = BeanListUtils.populateList(schedulesPlanPOList, schedulesDTOList, CrossSchedulesDTO.class);
// 获取日计划信息
List<CrossSectionDTO> sectionPOList = crossSectionService.listCrossSectionInfo(crossId, null);
// 处理日计划数据,封装成map:key是计划ID,value时段集合
Map<Integer, List<CrossSectionDTO>> sectionMap = sectionPOList.stream().collect(Collectors.groupingBy(CrossSectionDTO::getPlanId));
for(CrossSchedulesDTO schedules : schedulesDTOList) {
schedules.setSectionInfos(sectionMap.get(schedules.getPlanId()));
}
return schedulesDTOList;
}
}
package net.wanji.opt.service.impl;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.databus.dao.entity.BaseCrossSchemePO;
import net.wanji.databus.dao.mapper.BaseCrossSchemeMapper;
import net.wanji.opt.dto.CrossPhaseDTO;
import net.wanji.opt.dto.CrossSchemeDTO;
import net.wanji.opt.service.CrossPhaseService;
import net.wanji.opt.service.CrossSchemeService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author hfx
* @date 2023/1/17 15:05
* @desc CrossSchemeServiceImpl
*/
@Service
@Slf4j
public class CrossSchemeServiceImpl implements CrossSchemeService {
@Resource
BaseCrossSchemeMapper baseCrossSchemeMapper;
@Resource
CrossPhaseService crossPhaseService;
@Override
public List<CrossSchemeDTO> listCrossSchemeInfo(String crossId) {
List<BaseCrossSchemePO> schemePOList = baseCrossSchemeMapper.listCrossSchemeInfo(crossId, "", null);
return BeanListUtils.populateList(schemePOList, new ArrayList<>(), CrossSchemeDTO.class);
}
@Override
public List<CrossSchemeDTO> listCrossSchemeDetails(String crossId) {
// 获取方案基础信息
List<CrossSchemeDTO> schemeDTOList = listCrossSchemeInfo(crossId);
// 获取相位详情信息
List<CrossPhaseDTO> phaseDTOList = crossPhaseService.listPhaseDetails();
// 处理相位数据,封装成map:key是方案ID,value相位信息
Map<Integer, List<CrossPhaseDTO>> phaseMap = phaseDTOList.stream().collect(Collectors.groupingBy(CrossPhaseDTO::getSchemeId));
for(CrossSchemeDTO scheme : schemeDTOList) {
// 获取方案对应的相位列表
scheme.setPhaseInfos(phaseMap.get(scheme.getId()));
}
return schemeDTOList;
}
}
package net.wanji.opt.service.impl;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.databus.dao.entity.CrossSectionPO;
import net.wanji.databus.dao.mapper.BaseCrossSectionMapper;
import net.wanji.opt.dto.CrossSectionDTO;
import net.wanji.opt.service.CrossSectionService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* @author hfx
* @date 2023/1/17 17:48
* @desc CrossSectionServiceImpl
*/
@Service
@Slf4j
public class CrossSectionServiceImpl implements CrossSectionService {
@Resource
BaseCrossSectionMapper baseCrossSectionMapper;
@Override
public List<CrossSectionDTO> listCrossSectionInfo(String crossId, String planId) {
// 获取时段信息
List<CrossSectionPO> sectionPOList = baseCrossSectionMapper.listCrossSectionPO(new CrossSectionPO());
return BeanListUtils.populateList(sectionPOList, new ArrayList<>(), CrossSectionDTO.class);
}
}
package net.wanji.opt.service.impl;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.enums.BaseEnum;
import net.wanji.common.enums.TurnConvertEnum;
import net.wanji.common.framework.dubbointerface.impl.BaseDubboInterfaceImpl;
import net.wanji.common.framework.exception.DubboProviderException;
import net.wanji.common.framework.mapper.BaseInterfaceMapper;
import net.wanji.common.utils.tool.DateUtil;
import net.wanji.databus.dao.mapper.BaseCrossInfoMapper;
import net.wanji.databus.po.TBaseCrossInfo;
import net.wanji.opt.bo.BottomMenuBO;
import net.wanji.opt.common.EsDateIndexUtil;
import net.wanji.opt.constant.ServiceLevelEnum;
import net.wanji.opt.dao.mapper.GreenwaveHistoryMapper;
import net.wanji.opt.dto.report.GreenHistIndexDTO;
import net.wanji.opt.entity.GreenwaveHist;
import net.wanji.opt.po.base.CrossLaneDataHistPoExtend;
import net.wanji.opt.service.GreenwaveHistProvider;
import org.apache.dubbo.config.annotation.DubboService;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
......@@ -59,8 +35,6 @@ public class GreenwaveHistProviderImpl extends BaseDubboInterfaceImpl<GreenwaveH
@Resource
private GreenwaveHistoryMapper greenwaveHistoryMapper;
@Resource
BaseCrossInfoMapper baseCrossInfoMapper;
@Value("${data.mock.flag}")
private boolean mockFlag;
......
package net.wanji.opt.service.impl;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.databus.dao.mapper.BaseCrossLaneLightsMapper;
import net.wanji.databus.dao.mapper.LaneInfoMapper;
import net.wanji.databus.po.CrossLaneLightsPO;
import net.wanji.databus.po.LaneInfoPO;
import net.wanji.opt.dto.LaneInfoDTO;
import net.wanji.opt.query.LaneQuery;
import net.wanji.opt.service.LaneInfoService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author hfx
* @date 2023/1/13 19:13
* @desc LaneInfoServiceImpl
*/
@Slf4j
@Service
public class LaneInfoServiceImpl implements LaneInfoService {
@Resource
LaneInfoMapper laneInfoMapper;
@Resource
BaseCrossLaneLightsMapper baseCrossLaneLightsMapper;
@Override
public List<LaneInfoDTO> listLaneInfo(LaneQuery query) {
// 获取车道基础信息
List<LaneInfoPO> laneInfoPOList = laneInfoMapper.listLaneInfo();
return BeanListUtils.populateList(laneInfoPOList, new ArrayList<>(), LaneInfoDTO.class);
}
@Override
public List<LaneInfoDTO> listLightsLaneInfo(LaneQuery query) {
// 获取车道信息
List<LaneInfoDTO> laneInfoDTOList = listLaneInfo(query);
if(laneInfoDTOList.isEmpty()) {
return laneInfoDTOList;
}
// 获取灯组车道信息
List<CrossLaneLightsPO> lightLanePOList = baseCrossLaneLightsMapper.listLightLane();
if(lightLanePOList.isEmpty()) {
return laneInfoDTOList;
}
// 处理灯组车道信息,封装成Map:key是车道ID,value 是灯组ID
Map<String, Integer> lightLaneMap = lightLanePOList.stream().collect(Collectors.toMap(po -> po.getLaneId(), po -> po.getLightsId()));
List<LaneInfoDTO> newLaneList = new ArrayList<>();
String laneId;
for(LaneInfoDTO lane : laneInfoDTOList) {
laneId = lane.getId();
if(!lightLaneMap.containsKey(laneId)) {
continue;
}
lane.setLightsId(lightLaneMap.get(laneId));
newLaneList.add(lane);
}
return newLaneList;
}
}
......@@ -1515,7 +1515,7 @@ public class AnalysisGreenCongestionPeriodServiceImpl implements AnalysisGreenCo
//计算该路口交通指数阈值,取一天的9点至16点30之间交通指数倒序的90%的数值为动态阈值
try {
List<laneDataHist> crossTrafficList = analysisGreenCongestionPeriodMapper.selectCrossTrafficThreshold(date.toString().replaceAll("-", ""), crossId);
if (crossTrafficList.get(9).getTrafficIndex() <= 1.5) {
if (!crossTrafficList.isEmpty() && crossTrafficList.get(9).getTrafficIndex() <= 1.5) {
trafficThreshold = 1.5;
} else {
trafficThreshold = crossTrafficList.get(9).getTrafficIndex();
......
package net.wanji.opt.servicev2.judgeanalysis.impl;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.DateUtil;
import net.wanji.databus.po.CrossBaseLaneInfoPO;
import net.wanji.databus.po.CrossDirDataHistPO;
......@@ -41,6 +42,7 @@ import java.util.stream.Collectors;
* @since 2025-03-20
*/
@Service
@Slf4j
public class AnalysisProblemCrossDirHourServiceImpl extends ServiceImpl<AnalysisProblemCrossDirHourMapper, AnalysisProblemCrossDirHour> implements AnalysisProblemCrossDirHourService {
@Resource
......@@ -216,6 +218,7 @@ import java.util.stream.Collectors;
public void insertCorssOverFlowData() {
try {
long st = System.currentTimeMillis();
//最终组装的数据
ArrayList<AnalysisCrossHourOverFlowTask> crossHourOverFlowList=new ArrayList<>();
//获取上一天的日期
......@@ -367,7 +370,11 @@ import java.util.stream.Collectors;
}
//System.out.println("crossHourOverFlowList"+crossHourOverFlowList);
analysisProblemCrossDirHourMapper.insertCrossOverFlow(crossHourOverFlowList);
if (!crossHourOverFlowList.isEmpty()) {
analysisProblemCrossDirHourMapper.insertCrossOverFlow(crossHourOverFlowList);
}
long et = System.currentTimeMillis();
log.info("计算溢出原因耗时:{}ms,size:{},分析日期:{}",et-st,crossHourOverFlowList.size(),formattedDate);
}catch (Exception e){
e.printStackTrace();
log.error("定时任务--路口溢出情况更新--任务执行错误" + e);
......
package net.wanji.opt.synthesis.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import net.wanji.common.framework.rest.JsonViewObject;
import net.wanji.opt.synthesis.pojo.vo.GreenBeltAreaIndexVO;
import net.wanji.opt.synthesis.service.impl.StrategyGreenBeltServiceImpl;
import net.wanji.opt.vo.GreenBeltChartVO;
import net.wanji.opt.vo.GreenBeltInfoVO;
import net.wanji.opt.vo.GreenBeltStopTimesQueueLengthVO;
import net.wanji.opt.vo.HotspotCrossVO;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.ws.rs.core.MediaType;
import java.util.List;
/**
* @author duanruiming
* @date 2024/11/19 20:10
* @description 神思推送绿波时序图
*/
@Api(value = "StrategyGreenBeltController", description = "策略绿波带弹窗")
@RequestMapping("/strategyBelt")
@RestController
public class StrategyGreenBeltController {
@Resource
private StrategyGreenBeltServiceImpl strategyGreenBeltService;
@ApiOperation(value = "绿波时序图图形查询", notes = "绿波时序图图形查询", response = JsonViewObject.class,
produces = MediaType.APPLICATION_JSON, consumes = MediaType.APPLICATION_JSON)
@GetMapping(value = "/greenBeltChart")
@ApiResponses({
@ApiResponse(code = 200, message = "OK", response = GreenBeltInfoVO.class),
})
public JsonViewObject greenBeltChart(Integer greenId, String dateStr) throws Exception {
GreenBeltChartVO greenBeltChartVO = strategyGreenBeltService.selectChart(greenId, dateStr);
return JsonViewObject.newInstance().success(greenBeltChartVO);
}
@ApiOperation(value = "绿波时序图历史查询", notes = "绿波时序图历史查询", response = JsonViewObject.class,
produces = MediaType.APPLICATION_JSON, consumes = MediaType.APPLICATION_JSON)
@GetMapping(value = "/greenBeltChartHist")
@ApiResponses({
@ApiResponse(code = 200, message = "OK", response = GreenBeltInfoVO.class),
})
public JsonViewObject greenBeltChartHist(Integer greenId, String dateStr) throws Exception {
GreenBeltChartVO greenBeltChartVO = strategyGreenBeltService.selectChartHist(greenId, dateStr);
return JsonViewObject.newInstance().success(greenBeltChartVO);
}
@ApiOperation(value = "绿波时序图当天下发时间", notes = "绿波时序图当天下发时间", response = JsonViewObject.class,
produces = MediaType.APPLICATION_JSON, consumes = MediaType.APPLICATION_JSON)
@GetMapping(value = "/greenBeltChartControlTime")
@ApiResponses({
@ApiResponse(code = 200, message = "OK"),
})
public JsonViewObject greenBeltChartControlTime(Integer greenId) throws Exception {
List<String> dates = strategyGreenBeltService.greenBeltChartControlTime(greenId);
return JsonViewObject.newInstance().success(dates);
}
@ApiOperation(value = "绿波停车次数排队长度", notes = "绿波停车次数排队长度", response = JsonViewObject.class,
produces = MediaType.APPLICATION_JSON, consumes = MediaType.APPLICATION_JSON)
@GetMapping(value = "/greenBeltStopTimeQueueLength")
@ApiResponses({
@ApiResponse(code = 200, message = "OK", response = HotspotCrossVO.class),
})
public JsonViewObject greenBeltStopTimeQueueLength(Integer greenId) throws Exception {
List<GreenBeltStopTimesQueueLengthVO> results = strategyGreenBeltService.greenBeltStopTimeQueueLength(greenId);
return JsonViewObject.newInstance().success(results);
}
@ApiOperation(value = "绿波弹窗-绿波子区指标", notes = "绿波子区指标", response = JsonViewObject.class,
produces = MediaType.APPLICATION_JSON, consumes = MediaType.APPLICATION_JSON)
@GetMapping(value = "/greenBeltAreaIndex")
@ApiResponses({
@ApiResponse(code = 200, message = "OK", response = HotspotCrossVO.class),
})
public JsonViewObject greenBeltAreaIndex(Integer greenId) throws Exception {
GreenBeltAreaIndexVO result = strategyGreenBeltService.greenBeltAreaIndex(greenId);
return JsonViewObject.newInstance().success(result);
}
}
......@@ -37,7 +37,7 @@
<insert id="insertGreenCongestionPeriodData" parameterType="map">
insert into t_analysis_green_wave_congest_time_span (green_id,road_direction,week_day,congest_start_time,congest_end_time,congest_index,status,travel_time,speed,peak_type,year_week,week_start_time,week_end_time,insert_time)
SELECT a.green_id,a.road_direction,#{weekDay} as week_day, TO_TIMESTAMP(#{startTime},'YYYY-MM-DD HH24:MI:SS') as congest_start_time,TO_TIMESTAMP(DATE_ADD(#{endTime},INTERVAL '5 MINUTE'),'YYYY-MM-DD HH24:MI:SS') as congest_end_time,AVG(traffic_index),#{status} as status,CEIL(AVG(trval_time)),AVG(speed),
SELECT a.green_id,a.road_direction,#{weekDay} as week_day, TO_TIMESTAMP(#{startTime},'YYYY-MM-DD HH24:MI:SS') as congest_start_time,TO_TIMESTAMP(DATE_ADD(#{endTime},INTERVAL '5 MINUTE')::text,'YYYY-MM-DD HH24:MI:SS') as congest_end_time,AVG(traffic_index),#{status} as status,CEIL(AVG(trval_time)),AVG(speed),
#{peakType} as peak_type,#{weekNumber} as year_week,TO_TIMESTAMP(#{weekStartTime},'YYYY-MM-DD') as week_start_time,TO_TIMESTAMP(#{weekEndTime},'YYYY-MM-DD') as week_end_time,now()
FROM t_greenwave_hist a
where a.start_time between TO_TIMESTAMP(#{startTime},'YYYY-MM-DD HH24:MI:SS') and TO_TIMESTAMP(#{endTime},'YYYY-MM-DD HH24:MI:SS')
......
......@@ -237,8 +237,7 @@
max(extract(epoch from (a.end_time - a.start_time))) as serious_duration,
STRING_AGG(
CONCAT(
STRING_AGG(CONCAT(
TO_CHAR(a.start_time,'HH24:MI:SS'),
'~',
CONCAT(TO_CHAR(a.end_time,'HH24:MI:SS'))
......
......@@ -56,38 +56,45 @@
</select>
<select id="selectGreenWaveWeekDataByTime" resultType="net.wanji.opt.entity.report.GreenWaveWeekData">
SELECT green_id,road_direction,max(traffic_index) max_congest_index,avg(traffic_index) congest_index,
sum(status>=3 and status &lt;=4)*60*5 congest_duration,avg(speed) speed,avg(trval_time) travel_time
SUM ( (status >= 3 AND status &lt;= 4)::int )*60*5 congest_duration,avg(speed) speed,avg(trval_time) travel_time
from t_greenwave_hist
where to_timestamp(start_time,'YYYY-MM-DD') BETWEEN to_timestamp(#{startDate},'YYYY-MM-DD') AND to_timestamp(#{endDate},'YYYY-MM-DD')
AND to_timestamp( start_time,'HH24:MI:SS' ) BETWEEN to_timestamp(#{startTime},'HH24:MI:SS' ) AND to_timestamp(#{endTime} ,'HH24:MI:SS' )
where to_timestamp(to_char(start_time, 'YYYY-MM-DD'),'YYYY-MM-DD') BETWEEN to_timestamp(#{startDate},'YYYY-MM-DD') AND to_timestamp(#{endDate},'YYYY-MM-DD')
AND start_time::time BETWEEN #{startTime}::time AND #{endTime}::time
GROUP BY green_id,road_direction
</select>
<select id="selectGreenWaveWeekData" resultType="net.wanji.opt.entity.report.GreenWaveWeekData">
SELECT green_id,road_direction,max(traffic_index) max_congest_index,avg(traffic_index) congest_index,
sum(status>=3 and status&lt;=4)*60*5 congest_duration,avg(speed) speed,avg(trval_time) travel_time
from t_greenwave_hist where start_time BETWEEN to_timestamp(#{startDate},'YYYY-MM-DD HH24:MI:SS') AND to_timestamp(#{endDate} ,'YYYY-MM-DD HH24:MI:SS')
SUM ( (status >= 3 AND status &lt;= 4)::int )*60*5 congest_duration,avg(speed) speed,avg(trval_time) travel_time
from t_greenwave_hist
where start_time BETWEEN to_timestamp(#{startDate},'YYYY-MM-DD HH24:MI:SS') AND to_timestamp(#{endDate} ,'YYYY-MM-DD HH24:MI:SS')
GROUP BY green_id,road_direction
</select>
<select id="getFlow" resultType="java.lang.Integer">
SELECT COALESCE(flow, 0) from
(select cross_id from t_greenwave_cross where green_id = #{greenId} and is_key_route = 1 limit 1) cross_data
left join
(SELECT sum(flow) as flow,cross_id from t_cross_dir_data_hist
where start_time BETWEEN to_timestamp(#{startDate},'YYYY-MM-DD HH24:MI:SS') AND to_timestamp(#{endDate} ,'YYYY-MM-DD HH24:MI:SS')
and dir_type = #{dir} GROUP BY cross_id) cross_flow on cross_data.cross_id = cross_flow.cross_id
left join
(
SELECT sum(flow) as flow,cross_id
from t_cross_dir_data_hist
where start_time BETWEEN to_timestamp(#{startDate},'YYYY-MM-DD HH24:MI:SS') AND to_timestamp(#{endDate} ,'YYYY-MM-DD HH24:MI:SS')
and dir_type = #{dir} GROUP BY cross_id
) cross_flow on cross_data.cross_id = cross_flow.cross_id
</select>
<select id="getFlowByTime" resultType="java.lang.Integer">
SELECT COALESCE(flow, 0) from
SELECT COALESCE(flow, 0) from
(select cross_id from t_greenwave_cross where green_id = #{greenId} and is_key_route = 1 limit 1) cross_data
left join
(SELECT sum(flow) as flow,cross_id from t_cross_dir_data_hist where start_time BETWEEN #{startDate} AND #{endDate} AND TIME( start_time ) BETWEEN #{startTime} AND #{endTime} and dir_type = #{dir} GROUP BY cross_id) cross_flow on cross_data.cross_id = cross_flow.cross_id
left join
(SELECT sum(flow) as flow,cross_id from t_cross_dir_data_hist
where start_time BETWEEN to_timestamp(#{startDate},'YYYY-MM-DD HH24:MI:SS') AND to_timestamp(#{endDate} ,'YYYY-MM-DD HH24:MI:SS')
AND start_time::time BETWEEN #{startTime}::time AND #{endTime}::time and dir_type = #{dir}
GROUP BY cross_id) cross_flow on cross_data.cross_id = cross_flow.cross_id
</select>
<select id="selectAllByTime" resultType="net.wanji.databus.dao.entity.GreenwaveHistPO">
select * from t_greenwave_hist where start_time BETWEEN #{startDate} AND #{endDate}
<if test="startTime != null and startTime != '' and endTime != null and endTime != ''">
and to_timestamp( start_time,'HH24:MI:SS' ) BETWEEN to_timestamp(#{startTime},'HH24:MI:SS' ) AND to_timestamp(#{endTime} ,'HH24:MI:SS' )
and start_time::time BETWEEN #{startTime}::time AND #{endTime}::time
</if>
and green_id = #{greenId}
and status >= 3 and status &lt;= 4 and road_direction = #{roadDirection} order by start_time
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment