Commit c3f12a81 authored by duanruiming's avatar duanruiming

[add] 绿波城项目分支

parent 97c5d2b0
package net.wanji.datacenter.cache;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.mapper.CrossDirDataHistMapper;
import net.wanji.databus.po.CrossDirDataHistPO;
import net.wanji.datacenter.common.Constants;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/10/20 10:41
*/
@Component
@Slf4j
public class CrossDirWeekAvgFlowCache implements CommandLineRunner {
public static final Map<String, Double> crossDirWeekAvgFlowMap = new HashMap<>();
@Resource
private CrossDirDataHistMapper crossDirDataHistMapper;
private void getAvgFlow() {
LocalDate currentDate = LocalDate.now();
LocalDate lastDay = currentDate.minus(1, ChronoUnit.DAYS);
LocalDate lastWeek = currentDate.minus(1, ChronoUnit.WEEKS);
List<String> crossIds = CrossInfoCache.getSignalIds();
for (String crossId : crossIds) {
// 前一周
List<CrossDirDataHistPO> crossDirDataHistPOS = getDirDataHistPOS(lastDay, lastWeek, crossId);
calculateAvgFlow(crossDirDataHistPOS);
}
}
private static void calculateAvgFlow(List<CrossDirDataHistPO> crossDirDataHistPOS) {
if (!CollectionUtils.isEmpty(crossDirDataHistPOS)) {
Map<Integer, List<CrossDirDataHistPO>> dirDataMap = crossDirDataHistPOS.stream().collect(Collectors.groupingBy(CrossDirDataHistPO::getDirType));
if (!dirDataMap.isEmpty()) {
for (Map.Entry<Integer, List<CrossDirDataHistPO>> entry : dirDataMap.entrySet()) {
Integer dir = entry.getKey();
List<CrossDirDataHistPO> dirDataHistPOS = entry.getValue();
String currentCrossId = dirDataHistPOS.get(0).getCrossId();
OptionalDouble average = dirDataHistPOS.stream().map(CrossDirDataHistPO::getFlow).mapToInt(Integer::intValue).average();
double dirFlowAvg = 1.0;
if (average.isPresent()) {
dirFlowAvg = average.getAsDouble() == 0.0 ? 1.0 : average.getAsDouble();
}
crossDirWeekAvgFlowMap.put(currentCrossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)), dirFlowAvg);
}
}
}
}
/**
* 前一周的数据
*/
private List<CrossDirDataHistPO> getDirDataHistPOS(LocalDate lastDay, LocalDate lastWeek, String crossId) {
LambdaQueryWrapper<CrossDirDataHistPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CrossDirDataHistPO::getCrossId, crossId);
queryWrapper.between(CrossDirDataHistPO::getStartTime, lastWeek, lastDay);
return crossDirDataHistMapper.selectList(queryWrapper);
}
@Override
public void run(String... args) throws Exception {
try {
getAvgFlow();
} catch (Exception e) {
log.error("7天内路口方向平均流量获取失败", e);
throw new Exception(e);
}
log.info("7天内路口方向平均流量:{}", crossDirWeekAvgFlowMap);
}
}
\ No newline at end of file
package net.wanji.datacenter.cache;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.mapper.CrossDirDataHistMapper;
import net.wanji.databus.po.CrossDirDataHistPO;
import net.wanji.datacenter.common.Constants;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/10/18 15:50
*/
@Component
@Slf4j
public class CrossDirWeekMaxFlowCache implements CommandLineRunner {
public static final Map<String, Integer> crossDirWeekMaxFlowMap = new HashMap<>();
@Resource
private CrossDirDataHistMapper crossDirDataHistMapper;
private void getMaxFlow() throws Exception {
LocalDate currentDate = LocalDate.now();
LocalDate lastDay = currentDate.minus(1, ChronoUnit.DAYS);
LocalDate lastWeek = currentDate.minus(1, ChronoUnit.WEEKS);
List<String> crossIds = CrossInfoCache.getSignalIds();
for (String crossId : crossIds) {
// 前一周
LambdaQueryWrapper<CrossDirDataHistPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CrossDirDataHistPO::getCrossId, crossId);
queryWrapper.between(CrossDirDataHistPO::getStartTime, lastWeek, lastDay);
List<CrossDirDataHistPO> crossDirDataHistPOS = crossDirDataHistMapper.selectList(queryWrapper);
if (!CollectionUtils.isEmpty(crossDirDataHistPOS)) {
Map<Integer, List<CrossDirDataHistPO>> dirDataMap = crossDirDataHistPOS.stream().collect(Collectors.groupingBy(CrossDirDataHistPO::getDirType));
if (!dirDataMap.isEmpty()) {
for (Map.Entry<Integer, List<CrossDirDataHistPO>> entry : dirDataMap.entrySet()) {
Integer dir = entry.getKey();
List<CrossDirDataHistPO> dirDataHistPOS = entry.getValue();
String currentCrossId = dirDataHistPOS.get(0).getCrossId();
int dirFlowMax = dirDataHistPOS.stream().map(CrossDirDataHistPO::getFlow).mapToInt(Integer::intValue).max().getAsInt();
crossDirWeekMaxFlowMap.put(currentCrossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)), dirFlowMax);
}
}
}
}
}
@Override
public void run(String... args) throws Exception {
try {
getMaxFlow();
} catch (Exception e) {
log.error("7天内路口方向最大流量获取失败", e);
throw new Exception(e);
}
log.info("7天内路口方向最大流量:{}", crossDirWeekMaxFlowMap);
}
}
\ No newline at end of file
...@@ -23,6 +23,8 @@ import java.util.stream.Collectors; ...@@ -23,6 +23,8 @@ import java.util.stream.Collectors;
public class CrossLaneInfoCache implements InitializingBean { public class CrossLaneInfoCache implements InitializingBean {
public static final Map<String, LaneInfoPO> laneInfoMap = new HashMap<>(); public static final Map<String, LaneInfoPO> laneInfoMap = new HashMap<>();
/** key crossId11, value LaneInfoPO */
public static final Map<String, LaneInfoPO> crossIdLaneId2Map = new HashMap<>();
@Resource @Resource
private LaneInfoMapper laneInfoMapper; private LaneInfoMapper laneInfoMapper;
...@@ -34,6 +36,13 @@ public class CrossLaneInfoCache implements InitializingBean { ...@@ -34,6 +36,13 @@ public class CrossLaneInfoCache implements InitializingBean {
if (!CollectionUtils.isEmpty(laneInfoPOS)) { if (!CollectionUtils.isEmpty(laneInfoPOS)) {
Map<String, LaneInfoPO> crossMap = laneInfoPOS.stream().collect(Collectors.toMap(LaneInfoPO::getId, Function.identity(), (key1, key2) -> key2)); Map<String, LaneInfoPO> crossMap = laneInfoPOS.stream().collect(Collectors.toMap(LaneInfoPO::getId, Function.identity(), (key1, key2) -> key2));
laneInfoMap.putAll(crossMap); laneInfoMap.putAll(crossMap);
for (LaneInfoPO laneInfoPO : laneInfoPOS) {
String currentCrossId = laneInfoPO.getCrossId();
String laneId = laneInfoPO.getId();
String laneId2 = laneId.substring(laneId.length() - 2);
String dir = String.valueOf(laneInfoPO.getDir());
crossIdLaneId2Map.put(currentCrossId.concat(dir).concat(laneId2), laneInfoPO);
}
} }
} }
} }
......
package net.wanji.datacenter.cache;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.mapper.CrossTurnDataHistMapper;
import net.wanji.databus.po.CrossTurnDataHistPO;
import net.wanji.datacenter.common.Constants;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/10/19 15:33
*/
@Component
@Slf4j
public class CrossTurnWeekMaxFlowCache implements CommandLineRunner {
public static final Map<String, Integer> crossTurnWeekMaxFlowMap = new HashMap<>();
@Resource
private CrossTurnDataHistMapper crossTurnDataHistMapper;
private void getTurnMaxFlow() throws Exception {
LocalDate currentDate = LocalDate.now();
LocalDate lastDay = currentDate.minus(1, ChronoUnit.DAYS);
LocalDate lastWeek = currentDate.minus(1, ChronoUnit.WEEKS);
List<String> crossIds = CrossInfoCache.getSignalIds();
for (String crossId : crossIds) {
// 前一周
LambdaQueryWrapper<CrossTurnDataHistPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CrossTurnDataHistPO::getCrossId, crossId);
queryWrapper.between(CrossTurnDataHistPO::getStartTime, lastWeek, lastDay);
List<CrossTurnDataHistPO> crossTurnDataHistPOS = crossTurnDataHistMapper.selectList(queryWrapper);
if (!CollectionUtils.isEmpty(crossTurnDataHistPOS)) {
calculateData(crossId, crossTurnDataHistPOS);
}
}
}
/**
* 计算转向一周最大流量
* @param crossId
* @param crossTurnDataHistPOS
*/
private static void calculateData(String crossId, List<CrossTurnDataHistPO> crossTurnDataHistPOS) {
Map<Integer, List<CrossTurnDataHistPO>> dirDataPOsMap = crossTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getInDir));
for (Map.Entry<Integer, List<CrossTurnDataHistPO>> entry : dirDataPOsMap.entrySet()) {
Integer dir = entry.getKey();
List<CrossTurnDataHistPO> dirHistPOS = entry.getValue();
if (!CollectionUtils.isEmpty(dirHistPOS)) {
Map<String, List<CrossTurnDataHistPO>> turnDataPOMap = dirHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getTurnType));
for (Map.Entry<String, List<CrossTurnDataHistPO>> turnEntry : turnDataPOMap.entrySet()) {
String turn = turnEntry.getKey();
List<CrossTurnDataHistPO> turnDataHistPOList = turnEntry.getValue();
if (!CollectionUtils.isEmpty(turnDataHistPOList)) {
OptionalInt max = turnDataHistPOList.stream().map(CrossTurnDataHistPO::getFlow).mapToInt(Integer::intValue).max();
if (max.isPresent()) {
String key = crossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)).concat(Constants.UNDERLINE).concat(turn);
crossTurnWeekMaxFlowMap.put(key, max.getAsInt());
}
}
}
}
}
}
@Override
public void run(String... args) throws Exception {
try {
getTurnMaxFlow();
} catch (Exception e) {
log.error("7天内路口方向转向最大流量获取失败", e);
throw new Exception(e);
}
log.info("7天内路口方向转向最大流量:{}", crossTurnWeekMaxFlowMap);
}
}
\ No newline at end of file
package net.wanji.datacenter.cache;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.framework.spring.ServiceBeanContext;
import net.wanji.datacenter.service.DataProcessService;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author duanruiming
* @date 2023/03/10 14:04
*/
@Component
@Slf4j
public class DataProcessServiceRegistrator implements CommandLineRunner {
private final Map<String, DataProcessService> dataProcessServiceRegistrar = new HashMap<>();
/**
* 获取注册器
* key:主题名/类名,value: bean
*
* @return
*/
public Map<String, DataProcessService> getDataProcessServiceRegistrar() {
return dataProcessServiceRegistrar;
}
@Override
public void run(String... args) throws Exception {
Map<String, DataProcessService> map = ServiceBeanContext.getInterfaceBeanMap(DataProcessService.class);
if (!map.isEmpty()) {
dataProcessServiceRegistrar.putAll(map);
} else {
log.error("初始化数据处理注册器失败");
}
}
}
package net.wanji.datacenter.cache;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.mapper.CrossDirDataHistMapper;
import net.wanji.databus.po.CrossDirDataHistPO;
import net.wanji.datacenter.common.Constants;
import net.wanji.datacenter.kafka.ProducerHandler;
import net.wanji.datacenter.pojo.dto.CrossDirFreeFlowSpeedDTO;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/10/14 14:16
*/
@Component
@Slf4j
public class WeekDirFreeFlowSpeedCache implements CommandLineRunner {
public static final Map<String, Double> crossDirFreeFlowSpeedMap = new HashMap<>();
@Resource
private CrossDirDataHistMapper crossDirDataHistMapper;
@Resource
private ProducerHandler producerHandler;
private void getDirWeekFreeFlowSpeed() throws Exception {
LocalDate currentDate = LocalDate.now();
LocalDate lastDay = currentDate.minus(1, ChronoUnit.DAYS);
LocalDate lastWeek = currentDate.minus(1, ChronoUnit.WEEKS);
List<String> crossIds = CrossInfoCache.getSignalIds();
List<CrossDirFreeFlowSpeedDTO> crossDirFreeFlowSpeedDTOS = new ArrayList<>();
for (String crossId : crossIds) {
// 前一周
LambdaQueryWrapper<CrossDirDataHistPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CrossDirDataHistPO::getCrossId, crossId);
queryWrapper.between(CrossDirDataHistPO::getStartTime, lastWeek, lastDay);
List<CrossDirDataHistPO> crossDirDataHistPOS = crossDirDataHistMapper.selectList(queryWrapper);
if (!CollectionUtils.isEmpty(crossDirDataHistPOS)) {
calculateData(crossDirFreeFlowSpeedDTOS, crossId, crossDirDataHistPOS);
}
}
Map<String, List<CrossDirFreeFlowSpeedDTO>> map = new HashMap<>(1);
map.put(Constants.CROSS_FREE_FLOW_SPEED_KEY, crossDirFreeFlowSpeedDTOS);
producerHandler.send2FirstKafka(Constants.CROSS_FREE_FLOW_SPEED_TOPIC, map);
}
/**
* 计算前一周方向自由流速度85~95分位速度
* @param crossDirFreeFlowSpeedDTOS
* @param crossId
* @param crossDirDataHistPOS
*/
private static void calculateData(List<CrossDirFreeFlowSpeedDTO> crossDirFreeFlowSpeedDTOS, String crossId, List<CrossDirDataHistPO> crossDirDataHistPOS) {
Map<Integer, List<CrossDirDataHistPO>> dirDataMap = crossDirDataHistPOS.stream().collect(Collectors.groupingBy(CrossDirDataHistPO::getDirType));
if (!dirDataMap.isEmpty()) {
for (Map.Entry<Integer, List<CrossDirDataHistPO>> entry : dirDataMap.entrySet()) {
Integer dir = entry.getKey();
List<CrossDirDataHistPO> dirDataHistPOS = entry.getValue();
List<Double> weekSpeeds = dirDataHistPOS.stream().map(CrossDirDataHistPO::getSpeed).sorted().collect(Collectors.toList());
int startIndex = (int) (0.85 * weekSpeeds.size());
int endIndex = (int) (0.95 * weekSpeeds.size());
Double freeFlowSpeed = weekSpeeds.subList(startIndex, endIndex).stream().mapToDouble(Double::doubleValue).average().getAsDouble();
if (freeFlowSpeed == 0.0) {
freeFlowSpeed = 60.0;
}
crossDirFreeFlowSpeedMap.put(crossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)), freeFlowSpeed);
CrossDirFreeFlowSpeedDTO crossDirFreeFlowSpeedDTO = new CrossDirFreeFlowSpeedDTO();
crossDirFreeFlowSpeedDTO.setCrossId(crossId);
crossDirFreeFlowSpeedDTO.setDir(dir);
crossDirFreeFlowSpeedDTO.setFreeFlowSpeed(freeFlowSpeed);
crossDirFreeFlowSpeedDTOS.add(crossDirFreeFlowSpeedDTO);
}
}
}
@Override
public void run(String... args) throws Exception {
try {
getDirWeekFreeFlowSpeed();
} catch (Exception e) {
log.error("路口方向自由流速度失败", e);
throw new Exception(e);
}
log.info("路口方向自由流速度:{}", crossDirFreeFlowSpeedMap);
}
}
package net.wanji.datacenter.cache;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.mapper.CrossTurnDataHistMapper;
import net.wanji.databus.po.CrossTurnDataHistPO;
import net.wanji.datacenter.common.Constants;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/10/14 14:16
*/
@Component
@Slf4j
public class WeekTurnFreeFlowSpeedCache implements CommandLineRunner {
public static final Map<String, Double> crossTurnFreeFlowSpeedMap = new HashMap<>();
@Resource
private CrossTurnDataHistMapper crossTurnDataHistMapper;
private void getDirWeekFreeFlowSpeed() throws Exception {
LocalDate currentDate = LocalDate.now();
LocalDate lastDay = currentDate.minus(1, ChronoUnit.DAYS);
LocalDate lastWeek = currentDate.minus(1, ChronoUnit.WEEKS);
List<String> crossIds = CrossInfoCache.getSignalIds();
for (String crossId : crossIds) {
// 前一周
LambdaQueryWrapper<CrossTurnDataHistPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CrossTurnDataHistPO::getCrossId, crossId);
queryWrapper.between(CrossTurnDataHistPO::getStartTime, lastWeek, lastDay);
List<CrossTurnDataHistPO> crossTurnDataHistPOS = crossTurnDataHistMapper.selectList(queryWrapper);
if (!CollectionUtils.isEmpty(crossTurnDataHistPOS)) {
calculateData(crossTurnDataHistPOS);
}
}
}
/**
* 计算前一周自由流速度85~95分位速度
* @param crossTurnDataHistPOS
*/
private static void calculateData(List<CrossTurnDataHistPO> crossTurnDataHistPOS) {
Map<Integer, List<CrossTurnDataHistPO>> dirDataMap = crossTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getInDir));
for (Map.Entry<Integer, List<CrossTurnDataHistPO>> dirEntry : dirDataMap.entrySet()) {
Integer dir = dirEntry.getKey();
List<CrossTurnDataHistPO> dirTurnDataHistPOS = dirEntry.getValue();
Map<String, List<CrossTurnDataHistPO>> turnDataMap = dirTurnDataHistPOS.stream().collect(Collectors.groupingBy(CrossTurnDataHistPO::getTurnType));
if (!turnDataMap.isEmpty()) {
for (Map.Entry<String, List<CrossTurnDataHistPO>> entry : turnDataMap.entrySet()) {
String turn = entry.getKey();
List<CrossTurnDataHistPO> turnDataHistPOS = entry.getValue();
String currentCrossId = turnDataHistPOS.get(0).getCrossId();
List<Double> weekTurnSpeeds = turnDataHistPOS.stream().map(CrossTurnDataHistPO::getSpeed).sorted().collect(Collectors.toList());
int startIndex = (int) (0.85 * weekTurnSpeeds.size());
int endIndex = (int) (0.95 * weekTurnSpeeds.size());
if (startIndex == endIndex) {
startIndex = startIndex - 1;
}
OptionalDouble average = weekTurnSpeeds.subList(startIndex, endIndex).stream().mapToDouble(Double::doubleValue).average();
if (average.isPresent()) {
double turnFreeFlowSpeed = average.getAsDouble();
String key = currentCrossId.concat(Constants.UNDERLINE).concat(String.valueOf(dir)).concat(Constants.UNDERLINE).concat(turn);
crossTurnFreeFlowSpeedMap.put(key, turnFreeFlowSpeed);
}
}
}
}
}
@Override
public void run(String... args) throws Exception {
try {
getDirWeekFreeFlowSpeed();
} catch (Exception e) {
log.error("路口转向自由流速度失败", e);
throw new Exception(e);
}
log.info("路口转向自由流速度:{}", crossTurnFreeFlowSpeedMap);
}
}
package net.wanji.datacenter.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* # 事件数据集文件保存配置
*/
@ConfigurationProperties(prefix = "eventsave")
@Component
@Data
public class EventSaveConfiguration {
String rootFolder;
String city;
String subType;
String nvrurl;
String positiveSample;
String negativeSample;
String trackJson;
String lightJson;
String evidence;
String image;
String video;
String vision;
String labelJson;
}
package net.wanji.datacenter.kafka; //package net.wanji.datacenter.kafka;
//
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; //import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException; //import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils; //import net.wanji.common.utils.tool.JacksonUtils;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate; //import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult; //import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture; //import org.springframework.util.concurrent.ListenableFuture;
//
/** ///**
* @author duanruiming // * @author duanruiming
* @date 2023/08/02 16:58 // * @date 2023/08/02 16:58
*/ // */
@Component //@Component
@Slf4j //@Slf4j
@SuppressWarnings("all") //@SuppressWarnings("all")
public class ProducerHandler { //public class ProducerHandler {
@Autowired // @Autowired
private KafkaTemplate<String, String> kafkaTemplate1; // private KafkaTemplate<String, String> kafkaTemplate1;
@Autowired // @Autowired
private KafkaTemplate<String, String> kafkaTemplate2; // private KafkaTemplate<String, String> kafkaTemplate2;
//
public void send2FirstKafka(String topic, Object msg) { // public void send2FirstKafka(String topic, Object msg) {
Page<Object> objectPage = new Page<>(); // Page<Object> objectPage = new Page<>();
try { // try {
ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate1.send(topic, JacksonUtils.getInstance().writeValueAsString(msg)); // ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate1.send(topic, JacksonUtils.getInstance().writeValueAsString(msg));
future.addCallback( // future.addCallback(
success -> { // success -> {
}, // },
failure -> log.error("消息发送失败:", failure)); // failure -> log.error("消息发送失败:", failure));
} catch (JsonProcessingException e) { // } catch (JsonProcessingException e) {
log.error("{}主题发送kafka1消息失败:", e); // log.error("{}主题发送kafka1消息失败:", e);
return; // return;
} // }
} // }
//
public void send2SecondKafka(String topic, Object msg) { // public void send2SecondKafka(String topic, Object msg) {
Page<Object> objectPage = new Page<>(); // Page<Object> objectPage = new Page<>();
try { // try {
ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate2.send(topic, JacksonUtils.getInstance().writeValueAsString(msg)); // ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate2.send(topic, JacksonUtils.getInstance().writeValueAsString(msg));
future.addCallback( // future.addCallback(
success -> { // success -> {
}, // },
failure -> log.error("消息发送失败:", failure)); // failure -> log.error("消息发送失败:", failure));
} catch (JsonProcessingException e) { // } catch (JsonProcessingException e) {
log.error("{}主题发送kafka2消息失败:", e); // log.error("{}主题发送kafka2消息失败:", e);
return; // return;
} // }
} // }
} //}
package net.wanji.datacenter.pojo.dto;
import lombok.Data;
import java.util.List;
@Data
public class CrossEventDTO {
private Integer dir;
private String rid;
//该路段的结束路口
private String crossId;
private Double index;
private String indexName;
// 501畅通 502轻微拥堵 503中度拥堵 504重度拥堵
private String congestionCode;
private String timestamp;
private Long globalTimeStamp;
private Long startTime;
private Long endTime;
private Integer duration;
// 事件序列号
private String eventSerialNumber;
// 检测时间
private Long detectTime;
private List<CrossEventDTO> details;
private String msgType;
// 失衡方向
private Integer unbalanceDir;
// 路灯结束时排队长度
private Double greenEndQueueLength;
// 非协调方向饱和度
private Double noCoordSaturation;
// 协调方向行程时间
private Double transitTime;
// 协调方向自由流时间
private Double freeTime;
}
package net.wanji.datacenter.pojo.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import net.wanji.databus.po.CrossDataRealtimePO;
import java.util.List;
/**
* @author duanruiming
* @date 2024/11/15 20:17
*/
@Data
public class CrossPeriodDataDTO {
/**
* 车道事件集合
*/
@JsonProperty("eventList")
private List<CrossDataRealtimePO> eventList;
/**
* 车道数量
*/
private Integer laneNum;
/**
* 全域编号
*/
private String orgCode;
/**
* 数据生成时间: yyyy-MM-dd HH:mm:ss:SSS
*/
private String timeStamp;
}
...@@ -7,23 +7,17 @@ package net.wanji.datacenter.service; ...@@ -7,23 +7,17 @@ package net.wanji.datacenter.service;
public interface DataProcessService { public interface DataProcessService {
/** /**
* 数据中心发送数据, 通过远程调用调用不同的服务 * 车道信息处理存库
*
* @param convertData
*/
void send(String topic, Object convertData);
/**
* 数据中心转换数据
*
* @param originalData * @param originalData
* @return * @throws Exception
*/ */
Object convert(String originalData) throws Exception; void laneSave(String originalData) throws Exception;
/** /**
* 转换后保存数据 * 路口信息处理存库
* @param originalData
* @throws Exception
*/ */
void save(Object convertData) throws Exception; void crossSave(String originalData) throws Exception;
} }
package net.wanji.datacenter.service.impl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.datacenter.kafka.ProducerHandler;
import net.wanji.datacenter.pojo.dto.CrossSnapshotDataDTO;
import net.wanji.datacenter.service.DataProcessService;
import org.springframework.stereotype.Service;
/**
* @author duanruiming
* @date 2023/08/04 10:24
*/
@Slf4j
@Service(value = "crossSnapshotDataProcessService")
@RequiredArgsConstructor
public class CrossSnapshotDataProcessService implements DataProcessService {
private final ProducerHandler producerHandler;
@Override
public void send(String topic, Object convertData) {
producerHandler.send2FirstKafka(topic, convertData);
}
@Override
public Object convert(String originalData) throws Exception {
return JacksonUtils.getInstance().readValue(originalData, CrossSnapshotDataDTO.class);
}
@Override
public void save(Object convertData) throws Exception {
}
}
package net.wanji.datacenter.service.impl;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.mapper.CrossDataRealtimeMapper;
import net.wanji.databus.dao.mapper.CrossLaneDataRealTimeMapper;
import net.wanji.databus.po.CrossDataRealtimePO;
import net.wanji.databus.po.CrossLaneDataRealTimePO;
import net.wanji.databus.po.LaneInfoPO;
import net.wanji.datacenter.cache.CrossLaneInfoCache;
import net.wanji.datacenter.pojo.dto.CrossPeriodDataDTO;
import net.wanji.datacenter.pojo.dto.LanePeriodicDataDTO;
import net.wanji.datacenter.service.DataProcessService;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* @author duanruiming
* @date 2024/11/15 19:59
*/
@Service
@Slf4j
public class DataProcessServiceImpl implements DataProcessService {
@Resource
private CrossLaneDataRealTimeMapper crossLaneDataRealTimeMapper;
@Resource
private CrossDataRealtimeMapper crossDataRealtimeMapper;
/**
* 车道信息处理存库
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void laneSave(String originalData) throws Exception {
try {
Map<String, LaneInfoPO> laneInfoMap = CrossLaneInfoCache.crossIdLaneId2Map;
LanePeriodicDataDTO LanePeriodicDataDTO = JSONObject.parseObject(originalData, LanePeriodicDataDTO.class);
String timeStamp = LanePeriodicDataDTO.getTimeStamp();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
List<LanePeriodicDataDTO.CrossLaneDataRealTimeDTO> eventList = LanePeriodicDataDTO.getEventList();
List<CrossLaneDataRealTimePO> insertList = getInsertLaneList(laneInfoMap, timeStamp, dateFormat, eventList);
//crossLaneDataRealTimeMapper.insertBatch(insertList);
} catch (Exception e) {
log.error("车道数据处理入库异常", e);
throw new Exception(e);
}
}
private static List<CrossLaneDataRealTimePO> getInsertLaneList(Map<String, LaneInfoPO> laneInfoMap, String timeStamp, SimpleDateFormat dateFormat, List<LanePeriodicDataDTO.CrossLaneDataRealTimeDTO> eventList) throws ParseException {
List<CrossLaneDataRealTimePO> insertList = new ArrayList<>();
if (!CollectionUtils.isEmpty(eventList)) {
for (LanePeriodicDataDTO.CrossLaneDataRealTimeDTO crossLaneDataRealTimeDTO : eventList) {
String crossId = crossLaneDataRealTimeDTO.getCrossId();
Integer dir = crossLaneDataRealTimeDTO.getDir();
Integer laneNum = crossLaneDataRealTimeDTO.getLane_num();
// 方向=神思dir + 1;车道号=神思laneNum + 10
String key = crossId.concat(String.valueOf(dir + 1)).concat(String.valueOf(laneNum + 10));
LaneInfoPO laneInfoPO = laneInfoMap.get(key);
if (Objects.isNull(laneInfoPO)) {
continue;
}
String laneId = laneInfoPO.getId();
crossLaneDataRealTimeDTO.setId(laneId);
CrossLaneDataRealTimePO crossLaneDataRealTime = new CrossLaneDataRealTimePO();
BeanUtils.copyProperties(crossLaneDataRealTimeDTO, crossLaneDataRealTime);
Date startDate = dateFormat.parse(timeStamp);
crossLaneDataRealTime.setStartTime(startDate);
Integer batchTime = Integer.valueOf(String.valueOf(startDate.getTime()).substring(0, 10));
crossLaneDataRealTime.setBatchTime(batchTime);
insertList.add(crossLaneDataRealTime);
}
}
return insertList;
}
@Transactional(rollbackFor = Exception.class)
@Override
public void crossSave(String originalData) throws Exception {
try {
CrossPeriodDataDTO crossPeriodDataDTO = JSONObject.parseObject(originalData, CrossPeriodDataDTO.class);
String timeStamp = crossPeriodDataDTO.getTimeStamp();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
List<CrossDataRealtimePO> eventList = crossPeriodDataDTO.getEventList();
//crossDataRealtimeMapper.insertBatch(eventList);
} catch (Exception e) {
log.error("路口实时数据存库处理异常:{}", e);
throw new RuntimeException(e);
}
}
}
package net.wanji.datacenter.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.enums.DateStyle;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.databus.po.LaneInfoPO;
import net.wanji.datacenter.cache.CrossLaneInfoCache;
import net.wanji.datacenter.common.Constants;
import net.wanji.datacenter.pojo.dto.CrossLaneSnapshotDataDTO;
import net.wanji.datacenter.pojo.dto.LaneSnapshotDataDTO;
import net.wanji.datacenter.service.DataProcessService;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
/**
* @author duanruiming
* @date 2023/10/25 17:27
*/
@Slf4j
@Service(value = "laneSnapshotDataProcessService")
public class LaneSnapshotDataProcessService implements DataProcessService {
@Resource
RestHighLevelClient client;
@Override
public void send(String topic, Object convertData) {
}
@Override
public List<CrossLaneSnapshotDataDTO> convert(String originalData) throws Exception {
List<CrossLaneSnapshotDataDTO> result = new ArrayList<>();
try {
ObjectMapper mapper = JacksonUtils.getInstance();
LaneSnapshotDataDTO laneSnapshotDataDTO = mapper.readValue(originalData, LaneSnapshotDataDTO.class);
if (Objects.nonNull(laneSnapshotDataDTO)) {
String timeStamp = laneSnapshotDataDTO.getTimeStamp();
Date date = new SimpleDateFormat(DateStyle.YYYY_MM_DD_HH_MM_SS.getValue()).parse(timeStamp);
Integer batchTime = Integer.valueOf(String.valueOf(date.getTime()).substring(0, 10));
List<LaneSnapshotDataDTO.EventList> eventList = laneSnapshotDataDTO.getEventList();
if (!CollectionUtils.isEmpty(eventList)) {
for (LaneSnapshotDataDTO.EventList list : eventList) {
CrossLaneSnapshotDataDTO crossLaneSnapshotDataDTO = new CrossLaneSnapshotDataDTO();
String laneId = list.getLaneId();
LaneInfoPO laneInfoPO = CrossLaneInfoCache.laneInfoMap.get(laneId);
if (Objects.nonNull(laneInfoPO)) {
crossLaneSnapshotDataDTO.setId(laneInfoPO.getCrossId().concat(timeStamp));
crossLaneSnapshotDataDTO.setCrossId(laneInfoPO.getCrossId());
crossLaneSnapshotDataDTO.setDir(laneInfoPO.getDir());
crossLaneSnapshotDataDTO.setTurn(laneInfoPO.getTurn());
crossLaneSnapshotDataDTO.setTimeStamp(batchTime);
crossLaneSnapshotDataDTO.setLaneId(laneInfoPO.getId());
crossLaneSnapshotDataDTO.setStaticQueueLength(list.getStaticQueueLength());
crossLaneSnapshotDataDTO.setTeamHeadDistance(list.getTeamHeadDistance());
crossLaneSnapshotDataDTO.setTeamTailDistance(list.getTeamTailDistance());
crossLaneSnapshotDataDTO.setDynamicQueueLength(list.getDynamicQueueLength());
crossLaneSnapshotDataDTO.setMeanSpaceHeadway(list.getMeanSpaceHeadway());
crossLaneSnapshotDataDTO.setStdSpaceHeadway(list.getStdSpaceHeadway());
crossLaneSnapshotDataDTO.setQueueNums(list.getQueueNums());
crossLaneSnapshotDataDTO.setCarNums(list.getCarNums());
crossLaneSnapshotDataDTO.setVehicleLengthRatio(list.getVehicleLengthRatio());
crossLaneSnapshotDataDTO.setVehicleNumsRatio(list.getVehicleNumsRatio());
crossLaneSnapshotDataDTO.setMeanV(list.getMeanV());
crossLaneSnapshotDataDTO.setHeadCarPosition(list.getHeadCarPosition());
crossLaneSnapshotDataDTO.setStartV(list.getStartV());
crossLaneSnapshotDataDTO.setTailCarPosition(list.getTailCarPosition());
crossLaneSnapshotDataDTO.setEndV(list.getEndV());
crossLaneSnapshotDataDTO.setOverflowNums(list.getOverflowNums());
crossLaneSnapshotDataDTO.setOverflow(list.isOverflow());
result.add(crossLaneSnapshotDataDTO);
}
}
}
}
} catch (Exception e) {
log.error("车道快照数据转换异常", e);
throw new Exception(e);
}
return result;
}
@Override
public void save(Object convertData) throws Exception {
try {
List<CrossLaneSnapshotDataDTO> list = (List<CrossLaneSnapshotDataDTO>) convertData;
for (CrossLaneSnapshotDataDTO crossLaneSnapshotDataDTO : list) {
String json = JacksonUtils.getInstance().writeValueAsString(crossLaneSnapshotDataDTO);
long timeStamp = crossLaneSnapshotDataDTO.getTimeStamp() * 1000L;
Date date = new Date(timeStamp);
String currentDateStr = new SimpleDateFormat("yyyyMMdd").format(date);
GetIndexRequest getIndexRequest = new GetIndexRequest(Constants.LANE_SNAPSHOT_DATA_ES_INDEX);
boolean exists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
if (exists) {
IndexRequest indexRequest = new IndexRequest(Constants.LANE_SNAPSHOT_DATA_ES_INDEX);
indexRequest.source(json, XContentType.JSON);
IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
} else {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(Constants.LANE_SNAPSHOT_DATA_ES_INDEX);
createIndexRequest.source(json, XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
}
}
} catch (Exception e) {
log.error("车道快照数据存储elastic异常", e);
throw new Exception(e);
}
}
}
package net.wanji.datacenter.task; //package net.wanji.datacenter.task;
//
class ProcessCloser implements AutoCloseable { //class ProcessCloser implements AutoCloseable {
Process process; // Process process;
//
public ProcessCloser(Process process) { // public ProcessCloser(Process process) {
this.process = process; // this.process = process;
} // }
//
@Override // @Override
public void close() { // public void close() {
process.destroy(); // process.destroy();
} // }
} //}
\ No newline at end of file \ No newline at end of file
package net.wanji.datacenter.task; //package net.wanji.datacenter.task;
//
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import net.wanji.datacenter.util.RedisUtils; //import net.wanji.datacenter.util.RedisUtils;
import org.springframework.data.redis.connection.DataType; //import org.springframework.data.redis.connection.DataType;
import org.springframework.scheduling.annotation.Scheduled; //import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import javax.annotation.Resource; //import javax.annotation.Resource;
import java.util.Set; //import java.util.Set;
//
/** ///**
* @author Kent HAN // * @author Kent HAN
* @date 2023/12/15 16:41 // * @date 2023/12/15 16:41
*/ // */
@Component //@Component
@Slf4j //@Slf4j
public class RedisTask { //public class RedisTask {
@Resource // @Resource
private RedisUtils redisUtils; // private RedisUtils redisUtils;
//
/** // /**
* 每分钟清理路口及干线事件缓存 // * 每分钟清理路口及干线事件缓存
* @author Kent HAN // * @author Kent HAN
* @date 2023/12/15 16:42 // * @date 2023/12/15 16:42
*/ // */
@Scheduled(fixedRate = 1000 * 60) // @Scheduled(fixedRate = 1000 * 60)
public void cleanCrossEventCache() throws Exception { // public void cleanCrossEventCache() throws Exception {
try { // try {
Set<String> keys = redisUtils.keys(); // Set<String> keys = redisUtils.keys();
for (String key : keys) { // for (String key : keys) {
if (redisUtils.keyType(key) == DataType.ZSET) { // if (redisUtils.keyType(key) == DataType.ZSET) {
redisUtils.removeExpiredElements(key); // redisUtils.removeExpiredElements(key);
} // }
} // }
} catch (Exception e) { // } catch (Exception e) {
log.error("清理路口事件缓存失败", e); // log.error("清理路口事件缓存失败", e);
throw new Exception(e); // throw new Exception(e);
} // }
} // }
//
} //}
server: server:
port: 33000 port: 43000
tomcat: tomcat:
uri-encoding: utf-8 uri-encoding: utf-8
servlet: servlet:
...@@ -9,7 +9,7 @@ spring: ...@@ -9,7 +9,7 @@ spring:
elasticsearch: elasticsearch:
username: elastic username: elastic
password: Wanji300552 password: Wanji300552
uris: http://10.102.1.182:9200 uris: http://37.12.182.31:9200
application: application:
name: wj-datacenter-service name: wj-datacenter-service
main: main:
...@@ -21,20 +21,20 @@ spring: ...@@ -21,20 +21,20 @@ spring:
datasource: datasource:
master: master:
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://10.102.1.182:3306/t_signal_control_jinan?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true url: jdbc:mysql://37.12.182.29:3306/wjdit_ecosystem_db_v1.0.0?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
# url: jdbc:mysql://10.102.1.112:53306/t_signal_control?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true # url: jdbc:mysql://10.102.1.112:53306/t_signal_control?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root username: root
password: Wanji300552 password: Wanji300552
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
slave: slave:
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://10.102.1.182:3306/t_signal_control_jinan?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true url: jdbc:mysql://37.12.182.29:3306/wjdit_ecosystem_db_v1.0.0?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root username: root
password: Wanji300552 password: Wanji300552
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
holo: holo:
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://10.102.1.182:3306/holo_roadnet?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true url: jdbc:mysql://37.12.182.29:3306/holo_roadnet?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root username: root
password: Wanji300552 password: Wanji300552
driverClassName: com.mysql.cj.jdbc.Driver driverClassName: com.mysql.cj.jdbc.Driver
...@@ -42,11 +42,11 @@ spring: ...@@ -42,11 +42,11 @@ spring:
#生产者配置 #生产者配置
producer: producer:
producer1: producer1:
bootstrap-servers: 10.102.1.182:9092 bootstrap-servers: 37.12.182.31:9092
key-deserializer: org.apache.kafka.common.serialization.StringSerializer key-deserializer: org.apache.kafka.common.serialization.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer value-deserializer: org.apache.kafka.common.serialization.StringSerializer
producer2: producer2:
bootstrap-servers: 10.102.1.181:9092 bootstrap-servers: 37.12.182.31:9093
key-deserializer: org.apache.kafka.common.serialization.StringSerializer key-deserializer: org.apache.kafka.common.serialization.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer value-deserializer: org.apache.kafka.common.serialization.StringSerializer
#消费者配置 #消费者配置
...@@ -59,12 +59,12 @@ spring: ...@@ -59,12 +59,12 @@ spring:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: group group-id: group
bootstrap-servers: 10.102.1.182:9092 bootstrap-servers: 37.12.182.31:9093
listener: listener:
concurrency: 3 concurrency: 3
ack-mode: manual_immediate ack-mode: manual_immediate
redis: redis:
host: 10.102.1.182 host: 37.12.182.29
port: 6379 port: 6379
password: Wanji300552 password: Wanji300552
jedis: jedis:
...@@ -108,7 +108,8 @@ threadPoolConfig: ...@@ -108,7 +108,8 @@ threadPoolConfig:
allowCoreTimeOut: false allowCoreTimeOut: false
kafka-consumer: kafka-consumer:
lanePeriodicDataTopic: JN050001LanePeriodicData lanePeriodicDataTopic: cross_period_lane_data
crossPeriodicDataTopic: cross_period_data
laneSnapshotDataTopic: JN050001LaneSnapshotData laneSnapshotDataTopic: JN050001LaneSnapshotData
crossSnapshotDataTopic: JN050001CrossSnapshotData crossSnapshotDataTopic: JN050001CrossSnapshotData
crossEventIndex: phase.emptiness.topic crossEventIndex: phase.emptiness.topic
...@@ -131,7 +132,7 @@ eventsave: ...@@ -131,7 +132,7 @@ eventsave:
kafka: kafka:
producer: producer:
bootstrap.servers: 10.102.1.182:9092 # kafka连接地址 bootstrap.servers: 37.12.182.31:9092 # kafka连接地址
acks: 0 acks: 0
retries: 0 retries: 0
batch.size: 102400 batch.size: 102400
......
...@@ -6,7 +6,7 @@ spring: ...@@ -6,7 +6,7 @@ spring:
cloud: cloud:
nacos: nacos:
config: config:
server-addr: 172.17.0.1:8848 server-addr: 37.12.182.29:8848
file-extension: yaml file-extension: yaml
group: wanji group: wanji
namespace: wanji namespace: wanji
......
spring: spring:
profiles: profiles:
active: dev active: greencity
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment