Commit ede5bef5 authored by zhoushiguang's avatar zhoushiguang

Merge remote-tracking branch 'origin/master'

parents 6b2fdf0c 9b76d884
...@@ -6,14 +6,11 @@ import net.wanji.databus.dao.entity.GreenwaveInfoPO; ...@@ -6,14 +6,11 @@ import net.wanji.databus.dao.entity.GreenwaveInfoPO;
import net.wanji.databus.dao.mapper.GreenwaveCrossMapper; import net.wanji.databus.dao.mapper.GreenwaveCrossMapper;
import net.wanji.databus.dao.mapper.GreenwaveInfoMapper; import net.wanji.databus.dao.mapper.GreenwaveInfoMapper;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalTime; import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -28,7 +25,7 @@ import java.util.stream.Collectors; ...@@ -28,7 +25,7 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
public class CrossGreenWaveInfoCache implements InitializingBean { public class CrossGreenWaveInfoCache implements InitializingBean {
;public static final Map<Integer, List<GreenwaveCrossPO>> greenWaveInfoMap = new HashMap<>(); public static final Map<Integer, List<GreenwaveCrossPO>> greenWaveInfoMap = new HashMap<>();
@Resource @Resource
private GreenwaveInfoMapper greenwaveInfoMapper; private GreenwaveInfoMapper greenwaveInfoMapper;
......
...@@ -51,6 +51,8 @@ public class CrossSchemePhaseLightsCache implements CommandLineRunner { ...@@ -51,6 +51,8 @@ public class CrossSchemePhaseLightsCache implements CommandLineRunner {
Integer phaseId = crossPhasePO.getId(); Integer phaseId = crossPhasePO.getId();
String phaseNo = crossPhasePO.getPhaseNo(); String phaseNo = crossPhasePO.getPhaseNo();
Integer redTime = crossPhasePO.getRedTime(); Integer redTime = crossPhasePO.getRedTime();
Integer phaseTime = crossPhasePO.getPhaseTime();
Integer greenTime = crossPhasePO.getGreenTime();
List<CrossPhaseLightsPO> crossPhaseLightsPOList = baseCrossPhaseLightsMapper.selectByPhaseId(phaseId); List<CrossPhaseLightsPO> crossPhaseLightsPOList = baseCrossPhaseLightsMapper.selectByPhaseId(phaseId);
if (!CollectionUtils.isEmpty(crossPhaseLightsPOList)) { if (!CollectionUtils.isEmpty(crossPhaseLightsPOList)) {
for (CrossPhaseLightsPO crossPhaseLightsPO : crossPhaseLightsPOList) { for (CrossPhaseLightsPO crossPhaseLightsPO : crossPhaseLightsPOList) {
...@@ -66,6 +68,8 @@ public class CrossSchemePhaseLightsCache implements CommandLineRunner { ...@@ -66,6 +68,8 @@ public class CrossSchemePhaseLightsCache implements CommandLineRunner {
if (turn == 20) { if (turn == 20) {
continue; continue;
} }
crossSchemePhaseDirTurnDTO.setPhaseTime(phaseTime);
crossSchemePhaseDirTurnDTO.setGreenTime(greenTime);
crossSchemePhaseDirTurnDTO.setRedTime(redTime); crossSchemePhaseDirTurnDTO.setRedTime(redTime);
crossSchemePhaseDirTurnDTO.setLightsTurn(turn); crossSchemePhaseDirTurnDTO.setLightsTurn(turn);
crossSchemePhaseDirTurnDTOList.add(crossSchemePhaseDirTurnDTO); crossSchemePhaseDirTurnDTOList.add(crossSchemePhaseDirTurnDTO);
......
...@@ -13,5 +13,7 @@ public class CrossSchemePhaseDirTurnDTO { ...@@ -13,5 +13,7 @@ public class CrossSchemePhaseDirTurnDTO {
private String phaseNo; private String phaseNo;
private Integer lightsDir; private Integer lightsDir;
private Integer lightsTurn; private Integer lightsTurn;
private Integer phaseTime;
private Integer greenTime;
private Integer redTime; private Integer redTime;
} }
package net.wanji.datacenter.task;
import lombok.extern.slf4j.Slf4j;
import net.wanji.databus.dao.entity.BaseCrossSchemePO;
import net.wanji.databus.dao.entity.CrossSectionPO;
import net.wanji.databus.dao.entity.GreenwaveCrossPO;
import net.wanji.databus.dao.mapper.BaseCrossSchemeMapper;
import net.wanji.databus.dao.mapper.BaseCrossSectionMapper;
import net.wanji.datacenter.cache.CrossGreenWaveInfoCache;
import net.wanji.datacenter.cache.CrossSchemePhaseLightsCache;
import net.wanji.datacenter.cache.WeekDirFreeFlowSpeedCache;
import net.wanji.datacenter.pojo.dto.CrossSchemePhaseDirTurnDTO;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @description 每天凌晨更新自由流速度
* @date 2023/10/14 9:22
*/
@Component
@Slf4j
public class DataCenterTask {
@Resource
private WeekDirFreeFlowSpeedCache weekDirFreeFlowSpeedCache;
@Resource
private BaseCrossSectionMapper baseCrossSectionMapper;
@Resource
private BaseCrossSchemeMapper baseCrossSchemeMapper;
@Scheduled(cron = "0 0 0 * * ?")
public void getWeekFreeFlowSpeed() throws Exception {
weekDirFreeFlowSpeedCache.run();
}
@Scheduled(initialDelay = 1000 * 60, fixedRate = 1000 * 60 * 60)
public void getGreenWaveWidth() throws Exception {
try {
Map<Integer, List<GreenwaveCrossPO>> greenWaveInfoMap = CrossGreenWaveInfoCache.greenWaveInfoMap;
int greenwaveWidth = 100;
for (Map.Entry<Integer, List<GreenwaveCrossPO>> entry : greenWaveInfoMap.entrySet()) {
Integer greenId = entry.getKey();
Double lastCrossDistance = 0.0; // 当前路口与上一路口距离
int crossTimeOffset = 0; // 上一路口到当前路口设计速度下时间差
int greenDelayTime = 0; // 每个路口绿波开始时间
List<GreenwaveCrossPO> greenwaveCrossPOList = entry.getValue();
if (!CollectionUtils.isEmpty(greenwaveCrossPOList)) {
for (GreenwaveCrossPO greenwaveCrossPO : greenwaveCrossPOList) {
String crossId = greenwaveCrossPO.getCrossId();
Integer sectionId = greenwaveCrossPO.getSectionId();
Integer inDir = greenwaveCrossPO.getInDir();
Integer offset = greenwaveCrossPO.getOffset();
crossTimeOffset += (int) (lastCrossDistance / (100 / 6)); // 自由流速度60
if (sectionId != null) {
CrossSectionPO crossSectionPO = baseCrossSectionMapper.selectById(sectionId);
if (Objects.nonNull(crossSectionPO)) {
Integer schemeId = crossSectionPO.getSchemeId();
BaseCrossSchemePO baseCrossSchemePO = baseCrossSchemeMapper.selectById(schemeId);
if (Objects.nonNull(baseCrossSchemePO)) {
String schemeNo = baseCrossSchemePO.getSchemeNo();
List<CrossSchemePhaseDirTurnDTO> crossSchemePhaseDirTurnDTOList = CrossSchemePhaseLightsCache.crossSchemePhaseDirTurnDTOList;
if (!CollectionUtils.isEmpty(crossSchemePhaseDirTurnDTOList)) {
// 过滤路口编号,方案号,直行或者圆饼灯,
List<CrossSchemePhaseDirTurnDTO> currenSchemeList = crossSchemePhaseDirTurnDTOList.stream().filter(po -> StringUtils.equals(crossId, po.getCrossId()) &&
StringUtils.equals(po.getSchemeNo(), schemeNo) &&
(Objects.equals(po.getLightsTurn(), 3) || Objects.equals(po.getLightsTurn(), 1))).collect(Collectors.toList());
Integer lastPhaseTime = 0; // 上一个相位的时间
for (CrossSchemePhaseDirTurnDTO dto : currenSchemeList) {
// 当前路口方案绿波方向的绿灯时间
lastPhaseTime += dto.getPhaseTime();
if (Objects.equals(inDir, dto.getLightsDir())) {
lastPhaseTime -= dto.getPhaseTime();
greenDelayTime = lastPhaseTime > greenDelayTime ? lastPhaseTime : greenDelayTime;
Integer greenWaveStartIndex = offset + greenDelayTime + crossTimeOffset;
Integer greenWaveEndIndex = offset + dto.getPhaseTime();
greenwaveWidth = Math.min(greenwaveWidth, greenWaveEndIndex - greenWaveStartIndex);
}
}
}
}
}
}
lastCrossDistance = greenwaveCrossPO.getNextCrossLen();
}
}
log.info("当前绿波编号:{}, 绿波带宽:{}", greenId, greenwaveWidth);
}
} catch (Exception e) {
log.error("获取绿波带宽异常", e);
throw new Exception("获取绿波带宽异常", e);
}
}
}
package net.wanji.datacenter.task;
import lombok.extern.slf4j.Slf4j;
import net.wanji.datacenter.cache.WeekDirFreeFlowSpeedCache;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author duanruiming
* @description 每天凌晨更新自由流速度
* @date 2023/10/14 9:22
*/
@Component
@Slf4j
public class WeekFreeFlowSpeedTask {
@Resource
private WeekDirFreeFlowSpeedCache weekDirFreeFlowSpeedCache;
@Scheduled(cron = "0 0 0 * * ?")
public void getWeekFreeFlowSpeed() throws Exception {
weekDirFreeFlowSpeedCache.run();
}
}
...@@ -73,6 +73,16 @@ public class Constant { ...@@ -73,6 +73,16 @@ public class Constant {
*/ */
public static int CROSSING = 1; public static int CROSSING = 1;
/**
* 行人灯转向
*/
public static final String PEDESTRAIN_LIGHT_CODE = "20";
/**
* 右转灯转向
*/
public static final String RIGHT_LIGHT_CODE = "3";
/** /**
* 入侵结果 * 入侵结果
*/ */
......
...@@ -9,13 +9,9 @@ import com.wanji.indicators.model.FrameModel; ...@@ -9,13 +9,9 @@ import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.source.LaneInfoSource; import com.wanji.indicators.source.LaneInfoSource;
import com.wanji.indicators.task.lightstatus.BindCrossLaneInfoCoFlatMap; import com.wanji.indicators.task.lightstatus.BindCrossLaneInfoCoFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap; import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.task.trajectory.func.CarDataValidatorFlatMap; import com.wanji.indicators.task.trajectory.func.*;
import com.wanji.indicators.task.trajectory.func.CrossRoadLightStatusCoFlatMap;
import com.wanji.indicators.task.trajectory.func.PhaseEmptyTimeProcessFunction;
import com.wanji.indicators.task.trajectory.func.VehicleGapTimeProcessFunction;
import com.wanji.indicators.task.trajectory.helper.FileReadingHelper; import com.wanji.indicators.task.trajectory.helper.FileReadingHelper;
import com.wanji.indicators.task.trajectory.pojo.*; import com.wanji.indicators.task.trajectory.pojo.*;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper; import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
...@@ -39,14 +35,12 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; ...@@ -39,14 +35,12 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
...@@ -56,7 +50,6 @@ import java.sql.Timestamp; ...@@ -56,7 +50,6 @@ import java.sql.Timestamp;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
...@@ -69,16 +62,32 @@ import java.util.stream.StreamSupport; ...@@ -69,16 +62,32 @@ import java.util.stream.StreamSupport;
public class CarTrajectoryIndexMain { public class CarTrajectoryIndexMain {
private static final Set<String> crossIdSet; private static final Set<String> crossIdSet;
public static final Map<String,String> virtualCrossRoadArea = new HashMap<>(); public static final Map<String, String> virtualCrossRoadArea = new HashMap<>();
private static final Map<String, Double> ridFreeTimeMap = new HashMap<>(); public static final Map<String, Double> ridFreeTimeMap = new HashMap<>();
public static final Map<String, List<CrossExitInfo>> crossExitMap = new ConcurrentHashMap<>(); public static final Map<String, List<CrossExitInfo>> crossExitMap = new ConcurrentHashMap<>();
private static final Properties properties = PropertiesHelper.getInstance().getProperties(); public static final Properties properties = PropertiesHelper.getInstance().getProperties();
private static final PropertiesHelper instance = PropertiesHelper.getInstance(); private static final PropertiesHelper instance = PropertiesHelper.getInstance();
// 路段ID和终点路口进口方向对应关系
public static final Map<String, Integer> ridEndIndirMap = new ConcurrentHashMap<>();
// 路段ID和起点路口出口方向对应关系
public static final Map<String, Integer> ridStartOutdirMap = new ConcurrentHashMap<>();
// Key:路口ID;Value:路口所有方向 格式"路口ID-方向"
public static final Map<String, List<String>> crossDirs = new ConcurrentHashMap<>();
// Key:路口ID-方向;Value:车道数量
public static final Map<String, Integer> dirLaneCounts = new ConcurrentHashMap<>();
// Key:路口ID-方向-转向;Value:行人灯通过时长
public static final Map<String, Integer> crossDirTurnPedGreen = new ConcurrentHashMap<>();
static { static {
String listOfCrossId = properties.getProperty("full.area.cross.list"); String listOfCrossId = properties.getProperty("full.area.cross.list");
crossIdSet = Arrays.stream(listOfCrossId.split(",")).collect(Collectors.toSet()); crossIdSet = Arrays.stream(listOfCrossId.split(",")).collect(Collectors.toSet());
...@@ -86,11 +95,11 @@ public class CarTrajectoryIndexMain { ...@@ -86,11 +95,11 @@ public class CarTrajectoryIndexMain {
static { static {
List<String> content = FileReadingHelper.getFileContent("data/cross_exit_info.txt"); List<String> content = FileReadingHelper.getFileContent("data/cross_exit_info.txt");
if(!CollectionUtils.isEmpty(content)){ if (!CollectionUtils.isEmpty(content)) {
//数组结构[路口id][路段id][出口区域经纬度坐标集] //数组结构[路口id][路段id][出口区域经纬度坐标集]
Map<String, List<String[]>> groupMapByCrossId = content.stream().map(s -> s.split(":")).collect(Collectors.groupingBy(array -> array[0])); Map<String, List<String[]>> groupMapByCrossId = content.stream().map(s -> s.split(":")).collect(Collectors.groupingBy(array -> array[0]));
crossIdSet.forEach(crossId -> { crossIdSet.forEach(crossId -> {
if(groupMapByCrossId.containsKey(crossId)){ if (groupMapByCrossId.containsKey(crossId)) {
List<String[]> list = groupMapByCrossId.get(crossId); List<String[]> list = groupMapByCrossId.get(crossId);
List<CrossExitInfo> exitList = list.stream().map(info -> { List<CrossExitInfo> exitList = list.stream().map(info -> {
CrossExitInfo exitInfo = new CrossExitInfo(); CrossExitInfo exitInfo = new CrossExitInfo();
...@@ -129,6 +138,62 @@ public class CarTrajectoryIndexMain { ...@@ -129,6 +138,62 @@ public class CarTrajectoryIndexMain {
ridFreeTimeMap.put("13NF80B5QN013NED0B5Q900", 5.0); ridFreeTimeMap.put("13NF80B5QN013NED0B5Q900", 5.0);
} }
static {
String str = properties.getProperty("rid.end.indir.list");
String[] pairs = str.split(",");
for (String pair : pairs) {
String[] keyValue = pair.split(":");
String key = keyValue[0];
Integer value = Integer.valueOf(keyValue[1]);
ridEndIndirMap.put(key, value);
}
}
static {
String str = properties.getProperty("rid.start.outdir.list");
String[] pairs = str.split(",");
for (String pair : pairs) {
String[] keyValue = pair.split(":");
String key = keyValue[0];
Integer value = Integer.valueOf(keyValue[1]);
ridStartOutdirMap.put(key, value);
}
}
static {
crossDirs.put("13NF80B5QN0", new ArrayList<>(Arrays.asList("13NF80B5QN0-1", "13NF80B5QN0-2", "13NF80B5QN0-4", "13NF80B5QN0-6")));
crossDirs.put("13NGH0B5RC0", new ArrayList<>(Arrays.asList("13NGH0B5RC0-1", "13NGH0B5RC0-3", "13NGH0B5RC0-5", "13NGH0B5RC0-6")));
crossDirs.put("13NI00B5RM0", new ArrayList<>(Arrays.asList("13NI00B5RM0-1", "13NI00B5RM0-3", "13NI00B5RM0-7")));
}
static {
dirLaneCounts.put("13NF80B5QN0-1", 1);
dirLaneCounts.put("13NF80B5QN0-2", 3);
dirLaneCounts.put("13NF80B5QN0-4", 1);
dirLaneCounts.put("13NF80B5QN0-6", 3);
dirLaneCounts.put("13NGH0B5RC0-1", 4);
dirLaneCounts.put("13NGH0B5RC0-3", 4);
dirLaneCounts.put("13NGH0B5RC0-5", 3);
dirLaneCounts.put("13NGH0B5RC0-6", 4);
dirLaneCounts.put("13NI00B5RM0-1", 3);
dirLaneCounts.put("13NI00B5RM0-3", 3);
dirLaneCounts.put("13NI00B5RM0-7", 4);
}
static {
String str = properties.getProperty("cross.dir.turn.ped.green");
String[] pairs = str.split(",");
for (String pair : pairs) {
String[] keyValue = pair.split(":");
String key = keyValue[0];
Integer value = Integer.valueOf(keyValue[1]);
crossDirTurnPedGreen.put(key, value);
}
}
public static void main(String[] args) { public static void main(String[] args) {
try { try {
//获取配置文件中的kafka消费topic //获取配置文件中的kafka消费topic
...@@ -136,7 +201,7 @@ public class CarTrajectoryIndexMain { ...@@ -136,7 +201,7 @@ public class CarTrajectoryIndexMain {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//配置检查点机制 //配置检查点机制
env.enableCheckpointing(60*1000); env.enableCheckpointing(60 * 1000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000); env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri"))); env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri")));
...@@ -150,7 +215,7 @@ public class CarTrajectoryIndexMain { ...@@ -150,7 +215,7 @@ public class CarTrajectoryIndexMain {
.setBootstrapServers(properties.getProperty("bootstrap.servers")) .setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic) .setTopics(topic)
.setGroupId(properties.getProperty("consumer.group.id") + "-car-trace") .setGroupId(properties.getProperty("consumer.group.id") + "-car-trace")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build(); .build();
...@@ -162,7 +227,7 @@ public class CarTrajectoryIndexMain { ...@@ -162,7 +227,7 @@ public class CarTrajectoryIndexMain {
SingleOutputStreamOperator<CarTrackModel> qualifiedStream = frameModelStream.flatMap(new CarDataValidatorFlatMap()) SingleOutputStreamOperator<CarTrackModel> qualifiedStream = frameModelStream.flatMap(new CarDataValidatorFlatMap())
.setParallelism(1) .setParallelism(1)
.name("校验轨迹帧数据-ToCarTrackModel"); .name("完善轨迹帧数据字段-ToCarTrackModel");
SingleOutputStreamOperator<CarTrackModel> filteredStream = qualifiedStream.filter(new FilterFunction<CarTrackModel>() { SingleOutputStreamOperator<CarTrackModel> filteredStream = qualifiedStream.filter(new FilterFunction<CarTrackModel>() {
@Override @Override
...@@ -215,9 +280,9 @@ public class CarTrajectoryIndexMain { ...@@ -215,9 +280,9 @@ public class CarTrajectoryIndexMain {
}).setParallelism(2).name("统计每辆车通过路口的通行时间"); }).setParallelism(2).name("统计每辆车通过路口的通行时间");
//对同一窗口的统计数据进行处理 //对同一窗口的统计数据进行处理
SingleOutputStreamOperator<List<RidIndexResultOfEastAndWest>> trafficIndexStream = singleCarStatStream SingleOutputStreamOperator<List<CongestionResult>> trafficIndexStream = singleCarStatStream
.keyBy(SingleCarInfo::getWindowEndTime) .keyBy(SingleCarInfo::getWindowEndTime)
.process(new KeyedProcessFunction<Long, SingleCarInfo, List<RidIndexResultOfEastAndWest>>() { .process(new KeyedProcessFunction<Long, SingleCarInfo, List<CongestionResult>>() {
private ListState<SingleCarInfo> listState; private ListState<SingleCarInfo> listState;
@Override @Override
...@@ -226,7 +291,7 @@ public class CarTrajectoryIndexMain { ...@@ -226,7 +291,7 @@ public class CarTrajectoryIndexMain {
} }
@Override @Override
public void processElement(SingleCarInfo singleCarInfo, Context context, Collector<List<RidIndexResultOfEastAndWest>> collector) throws Exception { public void processElement(SingleCarInfo singleCarInfo, Context context, Collector<List<CongestionResult>> collector) throws Exception {
//将数据保存到列表中 //将数据保存到列表中
listState.add(singleCarInfo); listState.add(singleCarInfo);
//注册windowEnd + 1ms的定时器 //注册windowEnd + 1ms的定时器
...@@ -234,19 +299,20 @@ public class CarTrajectoryIndexMain { ...@@ -234,19 +299,20 @@ public class CarTrajectoryIndexMain {
} }
@Override @Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<RidIndexResultOfEastAndWest>> out) throws Exception { public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<CongestionResult>> out) throws Exception {
//存储每个路口进口车道的拥堵统计结果 //存储每个路口进口车道的拥堵统计结果
List<RidIndexResultOfEastAndWest> results = new ArrayList<>(); List<CongestionResult> results = new ArrayList<>();
long windowEnd = timestamp - 1; long windowEnd = timestamp - 1;
//从列表中获取所有统计数据 //从列表中获取所有统计数据
List<SingleCarInfo> list = StreamSupport.stream(listState.get().spliterator(), false).collect(Collectors.toList()); List<SingleCarInfo> list = StreamSupport.stream(listState.get().spliterator(), false).collect(Collectors.toList());
//清空资源 //清空资源
listState.clear(); listState.clear();
Map<String, List<SingleCarInfo>> mapGroupByRid = list.stream().collect(Collectors.groupingBy(SingleCarInfo::getRid)); Map<String, List<SingleCarInfo>> mapGroupByRid = list.stream()
.collect(Collectors.groupingBy(SingleCarInfo::getRid));
mapGroupByRid.forEach((rid, value) -> { mapGroupByRid.forEach((rid, value) -> {
if(ridFreeTimeMap.containsKey(rid)){ if (ridFreeTimeMap.containsKey(rid)) {
RidIndexResultOfEastAndWest result = new RidIndexResultOfEastAndWest(); CongestionResult result = new CongestionResult();
result.setRid(rid); result.setRid(rid);
result.setCrossId(value.get(0).getCrossId()); result.setCrossId(value.get(0).getCrossId());
double avgIndex = value.stream().mapToLong(SingleCarInfo::getTransitTime).summaryStatistics().getAverage() / 1000; double avgIndex = value.stream().mapToLong(SingleCarInfo::getTransitTime).summaryStatistics().getAverage() / 1000;
...@@ -256,25 +322,22 @@ public class CarTrajectoryIndexMain { ...@@ -256,25 +322,22 @@ public class CarTrajectoryIndexMain {
String indexName; String indexName;
String congestionType; String congestionType;
int count = value.size();//通过当前路段的车辆数 int count = value.size();//通过当前路段的车辆数
if(count > 10 && index > 1.8 && index <= 2.5){ if (count > 10 && index > 1.5 && index <= 2.0) {
indexName = "轻度拥堵"; indexName = "轻度拥堵";
congestionType = CongestEnum.LIGHT_CONGEST.getType(); congestionType = CongestEnum.LIGHT_CONGEST.getType();
}else if(count > 10 && index > 2.5 && index <= 3.5){ } else if (count > 10 && index > 2.0 && index <= 3.0) {
indexName = "中度拥堵"; indexName = "中度拥堵";
congestionType = CongestEnum.MODERATE_CONGEST.getType(); congestionType = CongestEnum.MODERATE_CONGEST.getType();
}else if(count > 10 && index > 3.5){ } else if (count > 10 && index > 3.0) {
indexName = "重度拥堵"; indexName = "重度拥堵";
congestionType = CongestEnum.HEAVY_CONGEST.getType(); congestionType = CongestEnum.HEAVY_CONGEST.getType();
}else{ } else {
indexName = "畅通"; indexName = "畅通";
congestionType = CongestEnum.NO_CONGEST.getType(); congestionType = CongestEnum.NO_CONGEST.getType();
} }
result.setIndexName(indexName); result.setIndexName(indexName);
result.setCongestionType(congestionType); result.setCongestionType(congestionType);
result.setTimestamp(DateUtil.toDateTime(windowEnd, DateUtil.YYYY_MM_DD_HH_MM_SS)); result.setGlobalTimeStamp(new Date().getTime());
result.setGlobalTimeStamp(windowEnd);
result.setStartTime(windowEnd - 60000);
result.setEndTime(windowEnd);
results.add(result); results.add(result);
} }
...@@ -283,91 +346,36 @@ public class CarTrajectoryIndexMain { ...@@ -283,91 +346,36 @@ public class CarTrajectoryIndexMain {
} }
}).setParallelism(1).name("同窗口内每个路口的车辆通行时间"); }).setParallelism(1).name("同窗口内每个路口的车辆通行时间");
SingleOutputStreamOperator<String> toJsonStream = trafficIndexStream.flatMap(new FlatMapFunction<List<RidIndexResultOfEastAndWest>, String>() { SingleOutputStreamOperator<CongestionResult> splitCongestionStream = trafficIndexStream.flatMap(new FlatMapFunction<List<CongestionResult>, CongestionResult>() {
@Override
public void flatMap(List<RidIndexResultOfEastAndWest> results, Collector<String> collector) throws Exception {
//根据crossId进行重新分组
Map<String, List<RidIndexResultOfEastAndWest>> groupMapByCrossId = results.stream().collect(Collectors.groupingBy(RidIndexResultOfEastAndWest::getCrossId));
String value = JSONObject.toJSONString(groupMapByCrossId);
collector.collect(value);
}
}).setParallelism(1).name("东西方向拥堵指数统计结果根据路口进行分组-路段拥堵指数统计转成json字符串");
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(properties.getProperty("bootstrap.servers"),
properties.getProperty("rid.traffic.index.analysis.topic"), new SimpleStringSchema());
toJsonStream.addSink(kafkaProducer).setParallelism(1).name("将拥堵指数指标发送至kafka");
//写入mysql
SingleOutputStreamOperator<RidIndexResultOfEastAndWest> splitCongestionStream = trafficIndexStream.flatMap(new FlatMapFunction<List<RidIndexResultOfEastAndWest>, RidIndexResultOfEastAndWest>() {
@Override @Override
public void flatMap(List<RidIndexResultOfEastAndWest> results, Collector<RidIndexResultOfEastAndWest> collector) throws Exception { public void flatMap(List<CongestionResult> results, Collector<CongestionResult> collector) throws Exception {
for (RidIndexResultOfEastAndWest result : results) { for (CongestionResult result : results) {
collector.collect(result); collector.collect(result);
} }
} }
}).setParallelism(1).name("将组合后的数据重新拆分开"); }).setParallelism(1).name("将组合后的数据重新拆分开");
SingleOutputStreamOperator<RidIndexResultOfEastAndWest> trafficEvents = splitCongestionStream.filter(new FilterFunction<RidIndexResultOfEastAndWest>() { SingleOutputStreamOperator<CongestionResult> finalCoordCongestion = splitCongestionStream
.keyBy(CongestionResult::getRid)
.process(new CoordCongestionDataFunction())
.setParallelism(1)
.name("给拥堵场景增加开始时间和持续时长");
SingleOutputStreamOperator<String> toJsonStream = finalCoordCongestion.flatMap(new FlatMapFunction<CongestionResult, String>() {
@Override @Override
public boolean filter(RidIndexResultOfEastAndWest ridIndexResultOfEastAndWest) throws Exception { public void flatMap(CongestionResult result, Collector<String> collector) {
return !ridIndexResultOfEastAndWest.getCongestionType().equals(CongestEnum.NO_CONGEST.getType()); String value = JSONObject.toJSONString(result);
collector.collect(value);
} }
}).setParallelism(1).name("过滤掉非拥堵事件"); }).setParallelism(1).name("协调方向拥堵指数统计结果转json字符串");
JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
.withMaxRetries(3)//异常重试次数
.withBatchSize(1)//批量大小设置为1,表示一条数据都会写入mysql
.withBatchIntervalMs(0)//批次的间隔为0,表示关闭定时写入
.build();
JdbcConnectionOptions jdbcBuild = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://37.12.182.29:3306/holo_roadnet?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("Wanji300552")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withConnectionCheckTimeoutSeconds(60)
.build();
trafficEvents.addSink(JdbcSink.sink( FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(properties.getProperty("bootstrap.servers"),
"insert into t_event_info (detect_time, type, rid, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?,?)", properties.getProperty("dir.congestion.index.topic"), new SimpleStringSchema());
new JdbcStatementBuilder<RidIndexResultOfEastAndWest>() {
@Override
public void accept(PreparedStatement preparedStatement, RidIndexResultOfEastAndWest carRecordInfo) throws SQLException {
preparedStatement.setTimestamp(1, new Timestamp(carRecordInfo.getGlobalTimeStamp()));
String type = "501";
switch (carRecordInfo.getIndexName()){
case "畅通":
type = "501";
break;
case "轻度拥堵":
type = "502";
break;
case "中度拥堵":
type = "503";
break;
case "重度拥堵":
type = "504";
break;
}
preparedStatement.setString(2, type);
preparedStatement.setString(3, carRecordInfo.getRid());
preparedStatement.setString(4, carRecordInfo.getCrossId());
String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(5, eventSerialNumber);
preparedStatement.setString(6, "5");
preparedStatement.setTimestamp(7, new Timestamp(carRecordInfo.getStartTime()));
preparedStatement.setTimestamp(8, new Timestamp(carRecordInfo.getEndTime()));
preparedStatement.setInt(9, 1);
}
}, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1)
.name("写入mysql");
toJsonStream.addSink(kafkaProducer).setParallelism(1).name("将协调方向拥堵数据发送至kafka");
//计算南北方向饱和度 // 计算非协调方向饱和度
String lightStatusTopic = properties.getProperty("light.status.topic"); String lightStatusTopic = properties.getProperty("light.status.topic");
KafkaSource<String> lightStatusSource = KafkaSource.<String>builder() KafkaSource<String> lightStatusSource = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties()) .setProperties(instance.getConsumerProperties())
...@@ -377,7 +385,7 @@ public class CarTrajectoryIndexMain { ...@@ -377,7 +385,7 @@ public class CarTrajectoryIndexMain {
.setBootstrapServers(properties.getProperty("bootstrap.servers")) .setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(lightStatusTopic) .setTopics(lightStatusTopic)
.setGroupId(properties.getProperty("consumer.group.id") + "-cross-light-status") .setGroupId(properties.getProperty("consumer.group.id") + "-cross-light-status")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build(); .build();
...@@ -402,24 +410,82 @@ public class CarTrajectoryIndexMain { ...@@ -402,24 +410,82 @@ public class CarTrajectoryIndexMain {
.setParallelism(1) .setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定"); .name("全域轨迹帧数据-车道属性数据绑定");
SingleOutputStreamOperator<RidIndexResultOfEastAndWest> northAndSouthTrafficIndex = joinLaneStream SingleOutputStreamOperator<CongestionResult> finalNoCoordCongestion = joinLaneStream
.keyBy(CrossFrameModel::getCrossId) .keyBy(CrossFrameModel::getCrossId)
.process(new VehicleGapTimeProcessFunction()) .process(new VehicleGapTimeProcessFunction())
.setParallelism(1) .setParallelism(1)
.name("绿灯时间内,南北方向饱和度计算"); .name("非协调方向拥堵数据计算");
SingleOutputStreamOperator<String> toJsonStreamOfNoCoord = finalNoCoordCongestion.flatMap(new FlatMapFunction<CongestionResult, String>() {
@Override
public void flatMap(CongestionResult result, Collector<String> collector) throws Exception {
collector.collect(JSONObject.toJSONString(result));
}
}).setParallelism(1).name("协调方向拥堵数据转成json字符串");
toJsonStreamOfNoCoord.addSink(kafkaProducer).setParallelism(1).name("将非协调方向拥堵数据发送至kafka");
SingleOutputStreamOperator<String> toJsonStreamOfNorthAndSouth = northAndSouthTrafficIndex.flatMap(new FlatMapFunction<RidIndexResultOfEastAndWest, String>() { // 融合路口级别拥堵事件
SingleOutputStreamOperator<CongestionResult> mergedCongestion = finalCoordCongestion
.union(finalNoCoordCongestion)
.keyBy(CongestionResult::getCrossId)
.process(new MergeCongestionFunction())
.setParallelism(1)
.name("融合路口级别拥堵事件");
SingleOutputStreamOperator<String> toJsonStreamOfCrossCongestion = mergedCongestion.flatMap(new FlatMapFunction<CongestionResult, String>() {
@Override @Override
public void flatMap(RidIndexResultOfEastAndWest result, Collector<String> collector) throws Exception { public void flatMap(CongestionResult result, Collector<String> collector) throws Exception {
collector.collect(JSONObject.toJSONString(result)); collector.collect(JSONObject.toJSONString(result));
} }
}).setParallelism(1).name("南北饱和度计算指标转成json字符串"); }).setParallelism(1).name("路口级别拥堵数据转成json字符串");
FlinkKafkaProducer<String> crossCongestionKafkaProducer = new FlinkKafkaProducer<>(properties.getProperty("bootstrap.servers"),
properties.getProperty("cross.congestion.index.topic"), new SimpleStringSchema());
FlinkKafkaProducer<String> kafkaProducer2 = new FlinkKafkaProducer<>(properties.getProperty("bootstrap.servers"), toJsonStreamOfCrossCongestion.addSink(crossCongestionKafkaProducer).setParallelism(1).name("将路口级别拥堵数据发送至kafka");
properties.getProperty("rid.traffic.index.north.south.topic"), new SimpleStringSchema());
toJsonStreamOfNorthAndSouth.addSink(kafkaProducer2).setParallelism(1).name("将南北饱和度指标发送至kafka"); // 路口级别拥堵事件写入mysql
SingleOutputStreamOperator<CongestionResult> onlyCongestion = mergedCongestion
.filter(new FilterFunction<CongestionResult>() {
@Override
public boolean filter(CongestionResult value) {
return "拥堵".equals(value.getIndexName());
}
})
.setParallelism(1)
.name("只保留拥堵事件");
JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
.withMaxRetries(3)//异常重试次数
.withBatchSize(1)//批量大小设置为1,表示一条数据都会写入mysql
.withBatchIntervalMs(0)//批次的间隔为0,表示关闭定时写入
.build();
JdbcConnectionOptions jdbcBuild = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://37.12.182.29:3306/holo_roadnet?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("Wanji300552")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withConnectionCheckTimeoutSeconds(60)
.build();
onlyCongestion.addSink(JdbcSink.sink(
"insert into t_event_info (type, cross_id, category, start_time, duration, event_serial_number) values(?,?,?,?,?,?)",
new JdbcStatementBuilder<CongestionResult>() {
@Override
public void accept(PreparedStatement preparedStatement, CongestionResult carRecordInfo) throws SQLException {
preparedStatement.setString(1, carRecordInfo.getCongestionType());
preparedStatement.setString(2, carRecordInfo.getCrossId());
preparedStatement.setString(3, "5");
preparedStatement.setTimestamp(4, new Timestamp(carRecordInfo.getStartTime()));
preparedStatement.setInt(5, carRecordInfo.getDuration());
String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(6, eventSerialNumber);
}
}, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1)
.name("写入mysql");
//计算相位空放 //计算相位空放
SingleOutputStreamOperator<PhaseEmptyResult> phaseEmptyStream = joinLaneStream SingleOutputStreamOperator<PhaseEmptyResult> phaseEmptyStream = joinLaneStream
...@@ -446,7 +512,7 @@ public class CarTrajectoryIndexMain { ...@@ -446,7 +512,7 @@ public class CarTrajectoryIndexMain {
//相位空放结果写入mysql //相位空放结果写入mysql
phaseEmptyStream.addSink(JdbcSink.sink( phaseEmptyStream.addSink(JdbcSink.sink(
"insert into t_event_info (detect_time, type, rid, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?,?)", "insert into t_event_info (detect_time, type, rid, cross_id, event_serial_number, category) values(?,?,?,?,?,?)",
new JdbcStatementBuilder<PhaseEmptyResult>() { new JdbcStatementBuilder<PhaseEmptyResult>() {
@Override @Override
public void accept(PreparedStatement preparedStatement, PhaseEmptyResult phaseEmptyResult) throws SQLException { public void accept(PreparedStatement preparedStatement, PhaseEmptyResult phaseEmptyResult) throws SQLException {
...@@ -456,10 +522,7 @@ public class CarTrajectoryIndexMain { ...@@ -456,10 +522,7 @@ public class CarTrajectoryIndexMain {
preparedStatement.setString(4, phaseEmptyResult.getCrossId()); preparedStatement.setString(4, phaseEmptyResult.getCrossId());
String eventSerialNumber = UUID.randomUUID().toString().replace("-",""); String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(5, eventSerialNumber); preparedStatement.setString(5, eventSerialNumber);
preparedStatement.setString(6, "1"); preparedStatement.setString(6, "7");
preparedStatement.setTimestamp(7, new Timestamp(phaseEmptyResult.getStartTime()));
preparedStatement.setTimestamp(8, new Timestamp(phaseEmptyResult.getEndTime()));
preparedStatement.setInt(9, phaseEmptyResult.getDuration());
} }
}, jdbcExecutionOptions, jdbcBuild)) }, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1) .setParallelism(1)
...@@ -468,121 +531,25 @@ public class CarTrajectoryIndexMain { ...@@ -468,121 +531,25 @@ public class CarTrajectoryIndexMain {
//计算溢出 //计算溢出
SingleOutputStreamOperator<CarTrackModel> carTrackStreamInCrossRoad = frameModelStream.flatMap(new CarTrackInCrossRoadFlatMap()) SingleOutputStreamOperator<CarTrackModel> carTrackInCrossAndExit = frameModelStream
.flatMap(new CarTrackInCrossRoadFlatMap())
.setParallelism(1) .setParallelism(1)
.name("过滤出位于路口和指定路段区域带有车牌的车辆轨迹数据"); .name("过滤出位于路口和出口路段带有车牌的车辆轨迹数据");
//计算1分钟内的路口溢出指数和是否死锁 //计算1分钟内的路口溢出指数和是否死锁
double avgSpeedEvaluationIndex = Double.parseDouble(properties.getProperty("road.overflow.avg.speed")); SingleOutputStreamOperator<OverFlowAndDeadlockEvent> overFlowIndexStream = carTrackInCrossAndExit
String durationEvaluationIndex = properties.getProperty("road.overflow.duration");
int carNumber = Integer.parseInt(properties.getProperty("cross.road.deadlock.car.number"));
double crossAvgSpeedIndex = Double.parseDouble(properties.getProperty("cross.road.deadlock.avg.speed"));
SingleOutputStreamOperator<OverFlowIndexResult> overFlowIndexStream = carTrackStreamInCrossRoad
.assignTimestampsAndWatermarks( .assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1)) WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp())) .withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(carTrackModel -> carTrackModel.getRoadnet().getCrossId()) .keyBy(carTrackModel -> carTrackModel.getRoadnet().getCrossId())
.window(TumblingEventTimeWindows.of(Time.minutes(1))) .window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new ProcessWindowFunction<CarTrackModel, OverFlowIndexResult, String, TimeWindow>() { .process(new OverflowFunction())
@Override
public void process(String crossId, ProcessWindowFunction<CarTrackModel, OverFlowIndexResult, String, TimeWindow>.Context context, Iterable<CarTrackModel> iterable, Collector<OverFlowIndexResult> collector) throws Exception {
List<CarTrackModel> carInCrossRoadList = StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
if(!CollectionUtils.isEmpty(carInCrossRoadList)){
//过滤出位于指定区域内的数据
List<CarTrackModel> carTracesInSpecificArea = carInCrossRoadList.stream().filter(car -> car.getIsInSpecificArea() == 1).collect(Collectors.toList());
List<CarTrackModel> carTracesInCrossRoad = carInCrossRoadList.stream().filter(car -> car.getRoadnet().getInCrossFlag() == 1).collect(Collectors.toList());
//根据路口进行预取划分
Map<String, List<CarTrackModel>> groupMapInExitArea = carTracesInSpecificArea.stream().collect(Collectors.groupingBy(car -> car.getRoadnet().getRid()));
//对每个路口区域进行统计
if(!groupMapInExitArea.isEmpty()){
List<OverFlowDetail> details = new ArrayList<>();
groupMapInExitArea.forEach((rid, cars) -> {
//统计路口出口内的车辆数量
Map<String, List<CarTrackModel>> carMapGroupByPlate = cars.stream().collect(Collectors.groupingBy(CarTrackModel::getPicLicense));
if(carMapGroupByPlate.size() > 5){
//按照轨迹上报时间进行升序排列
List<CarTrackModel> sorted = cars.stream().sorted(Comparator.comparing(CarTrackModel::getGlobalTimeStamp)).collect(Collectors.toList());
//根据时间戳进行分组,找出同一时刻的轨迹数据
Map<Long, List<CarTrackModel>> groupMapByTimestamp = sorted.stream().collect(Collectors.groupingBy(CarTrackModel::getGlobalTimeStamp));
//存放时间戳和其对应的平均速度
Map<Long, Double> map = new HashMap<>();
groupMapByTimestamp.forEach((timestamp, traces) -> {
double avgSpeed = traces.stream().mapToDouble(CarTrackModel::getSpeed).summaryStatistics().getAverage();
map.put(timestamp, avgSpeed);
});
double ridAvgSpeed = map.values().stream().mapToDouble(number -> number).summaryStatistics().getAverage();
List<Map.Entry<Long, Double>> entryList = map.entrySet().stream().sorted(Map.Entry.comparingByKey()).collect(Collectors.toList());
int consecutiveSeconds = 0;//连续时间
long overflowTimestamp = entryList.get(0).getKey();//发生溢出的时间戳
for(int i = 0; i < entryList.size() - 1; i++){
Map.Entry<Long, Double> current = entryList.get(i);
Map.Entry<Long, Double> next = entryList.get(i + 1);
if(current.getValue() < avgSpeedEvaluationIndex && next.getValue() < avgSpeedEvaluationIndex){
consecutiveSeconds += Math.toIntExact((next.getKey() - current.getKey()));
overflowTimestamp = next.getKey();
}else{
consecutiveSeconds = 0;
}
}
if(consecutiveSeconds > Integer.parseInt(durationEvaluationIndex)){
//计算路口的溢出指数
double index = 30 / ridAvgSpeed;
if(index > 5){
//满足溢出判断的所有条件
OverFlowDetail detail = new OverFlowDetail();
detail.setRid(rid);
detail.setTimestamp(overflowTimestamp);
detail.setDatetime(DateUtil.toDateTime(overflowTimestamp, DateUtil.YYYY_MM_DD_HH_MM_SS));
detail.setIndex(index);
details.add(detail);
}
}
}
});
//存在路口发生溢出的情况
if(!details.isEmpty()){
OverFlowIndexResult result = new OverFlowIndexResult();
result.setCrossId(crossId);
result.setDetails(details);
result.setGlobalTimeStamp(context.window().getEnd());
result.setDateTime(DateUtil.toDateTime(result.getGlobalTimeStamp(), DateUtil.YYYY_MM_DD_HH_MM_SS));
result.setStartTime(context.window().getStart());
result.setEndTime(context.window().getEnd());
//判断该路口死锁
if(details.size() >= 2){
Map<String, List<CarTrackModel>> groupMapByPlate = carTracesInCrossRoad.stream().collect(Collectors.groupingBy(CarTrackModel::getPicLicense));
if(groupMapByPlate.size() > carNumber){
AtomicReference<Double> totalCarSpeed = new AtomicReference<>((double) 0);
groupMapByPlate.forEach((carPlate, traces) -> {
double carAvgSpeed = traces.stream().mapToDouble(CarTrackModel::getSpeed).summaryStatistics().getAverage();
totalCarSpeed.updateAndGet(v -> (v + carAvgSpeed));
});
double crossAvgSpeed = totalCarSpeed.get() / groupMapByPlate.size();
if(crossAvgSpeed < crossAvgSpeedIndex){
result.setDeadLock(true);
}
}
}
collector.collect(result);
}
}
}
}
})
.setParallelism(2) .setParallelism(2)
.name("统计每个路口的溢出指数"); .name("统计每个路口的溢出指数");
SingleOutputStreamOperator<String> jsonStreamOverFlowIndex = overFlowIndexStream.flatMap(new FlatMapFunction<OverFlowIndexResult, String>() { SingleOutputStreamOperator<String> jsonStreamOverFlowIndex = overFlowIndexStream.flatMap(new FlatMapFunction<OverFlowAndDeadlockEvent, String>() {
@Override @Override
public void flatMap(OverFlowIndexResult overFlowIndexResult, Collector<String> collector) throws Exception { public void flatMap(OverFlowAndDeadlockEvent overFlowIndexResult, Collector<String> collector) throws Exception {
collector.collect(JSONObject.toJSONString(overFlowIndexResult)); collector.collect(JSONObject.toJSONString(overFlowIndexResult));
} }
}); });
...@@ -595,46 +562,44 @@ public class CarTrajectoryIndexMain { ...@@ -595,46 +562,44 @@ public class CarTrajectoryIndexMain {
)).setParallelism(1).name("将路口溢出指标推送至kafka"); )).setParallelism(1).name("将路口溢出指标推送至kafka");
//将溢出和死锁事件写入mysql //将溢出和死锁事件写入mysql
SingleOutputStreamOperator<OverFlowIndexResult> deadLockStream = overFlowIndexStream.filter(new FilterFunction<OverFlowIndexResult>() { SingleOutputStreamOperator<OverFlowAndDeadlockEvent> deadLockStream = overFlowIndexStream.filter(new FilterFunction<OverFlowAndDeadlockEvent>() {
@Override @Override
public boolean filter(OverFlowIndexResult overFlowIndexResult) throws Exception { public boolean filter(OverFlowAndDeadlockEvent overFlowIndexResult) throws Exception {
return overFlowIndexResult.isDeadLock(); return overFlowIndexResult.isDeadlock();
} }
}).setParallelism(1).name("过滤出死锁事件"); }).setParallelism(1).name("过滤出死锁事件");
deadLockStream.addSink(JdbcSink.sink( deadLockStream.addSink(JdbcSink.sink(
"insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?)", "insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, duration) values(?,?,?,?,?,?,?)",
new JdbcStatementBuilder<OverFlowIndexResult>() { new JdbcStatementBuilder<OverFlowAndDeadlockEvent>() {
@Override @Override
public void accept(PreparedStatement preparedStatement, OverFlowIndexResult overFlowIndexResult) throws SQLException { public void accept(PreparedStatement preparedStatement, OverFlowAndDeadlockEvent overFlowAndDeadlockEvent) throws SQLException {
preparedStatement.setTimestamp(1, new Timestamp(overFlowIndexResult.getGlobalTimeStamp())); preparedStatement.setTimestamp(1, new Timestamp(overFlowAndDeadlockEvent.getGlobalTimeStamp()));
preparedStatement.setString(2, "704"); preparedStatement.setString(2, "704");
preparedStatement.setString(3, overFlowIndexResult.getCrossId()); preparedStatement.setString(3, overFlowAndDeadlockEvent.getCrossId());
String eventSerialNumber = UUID.randomUUID().toString().replace("-",""); String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(4, eventSerialNumber); preparedStatement.setString(4, eventSerialNumber);
preparedStatement.setString(5, "1"); preparedStatement.setString(5, "7");
preparedStatement.setTimestamp(6, new Timestamp(overFlowIndexResult.getStartTime())); preparedStatement.setTimestamp(6, new Timestamp(overFlowAndDeadlockEvent.getDeadlockStartTime()));
preparedStatement.setTimestamp(7, new Timestamp(overFlowIndexResult.getEndTime())); preparedStatement.setInt(7, overFlowAndDeadlockEvent.getDeadlockDuration());
preparedStatement.setInt(8, 1);
} }
}, jdbcExecutionOptions, jdbcBuild)) }, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1) .setParallelism(1)
.name("将死锁事件写入mysql"); .name("将死锁事件写入mysql");
overFlowIndexStream.addSink(JdbcSink.sink( overFlowIndexStream.addSink(JdbcSink.sink(
"insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?)", "insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, duration) values(?,?,?,?,?,?,?)",
new JdbcStatementBuilder<OverFlowIndexResult>() { new JdbcStatementBuilder<OverFlowAndDeadlockEvent>() {
@Override @Override
public void accept(PreparedStatement preparedStatement, OverFlowIndexResult overFlowIndexResult) throws SQLException { public void accept(PreparedStatement preparedStatement, OverFlowAndDeadlockEvent overFlowAndDeadlockEvent) throws SQLException {
preparedStatement.setTimestamp(1, new Timestamp(overFlowIndexResult.getGlobalTimeStamp())); preparedStatement.setTimestamp(1, new Timestamp(overFlowAndDeadlockEvent.getGlobalTimeStamp()));
preparedStatement.setString(2, "703"); preparedStatement.setString(2, "703");
preparedStatement.setString(3, overFlowIndexResult.getCrossId()); preparedStatement.setString(3, overFlowAndDeadlockEvent.getCrossId());
String eventSerialNumber = UUID.randomUUID().toString().replace("-",""); String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(4, eventSerialNumber); preparedStatement.setString(4, eventSerialNumber);
preparedStatement.setString(5, "1"); preparedStatement.setString(5, "7");
preparedStatement.setTimestamp(6, new Timestamp(overFlowIndexResult.getStartTime())); preparedStatement.setTimestamp(6, new Timestamp(overFlowAndDeadlockEvent.getOverflowStartTime()));
preparedStatement.setTimestamp(7, new Timestamp(overFlowIndexResult.getEndTime())); preparedStatement.setInt(7, overFlowAndDeadlockEvent.getOverflowDuration());
preparedStatement.setInt(8, 1);
} }
}, jdbcExecutionOptions, jdbcBuild)) }, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1) .setParallelism(1)
...@@ -642,36 +607,13 @@ public class CarTrajectoryIndexMain { ...@@ -642,36 +607,13 @@ public class CarTrajectoryIndexMain {
//计算失衡 //计算失衡
SingleOutputStreamOperator<UnbalanceResult> unbalanceIndexStream = splitCongestionStream SingleOutputStreamOperator<UnbalanceResult> unbalanceIndexStream = splitCongestionStream
.union(northAndSouthTrafficIndex) .union(finalNoCoordCongestion)
.assignTimestampsAndWatermarks( .assignTimestampsAndWatermarks(
WatermarkStrategy.<RidIndexResultOfEastAndWest>forBoundedOutOfOrderness(Duration.ofSeconds(1)) WatermarkStrategy.<CongestionResult>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp())) .withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(RidIndexResultOfEastAndWest::getCrossId) .keyBy(CongestionResult::getCrossId)
.window(TumblingEventTimeWindows.of(Time.minutes(1))) .window(TumblingEventTimeWindows.of(Time.minutes(1)))
.apply(new WindowFunction<RidIndexResultOfEastAndWest, UnbalanceResult, String, TimeWindow>() { .process(new UnbalanceFunction()).setParallelism(2).name("失衡事件");
@Override
public void apply(String crossId, TimeWindow timeWindow, Iterable<RidIndexResultOfEastAndWest> iterable, Collector<UnbalanceResult> collector) throws Exception {
List<RidIndexResultOfEastAndWest> collect = StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(collect) && collect.size() > 1) {
//获取最大拥堵指数
double max = collect.stream().mapToDouble(RidIndexResultOfEastAndWest::getIndex).max().getAsDouble();
double min = collect.stream().mapToDouble(RidIndexResultOfEastAndWest::getIndex).min().getAsDouble();
double index = max / min;
if (index >= 1.5) {
UnbalanceResult result = new UnbalanceResult();
result.setCrossId(crossId);
result.setIndex(index);
result.setGlobalTimeStamp(timeWindow.getEnd());
result.setTimestamp(DateUtil.toDateTime(result.getGlobalTimeStamp(), DateUtil.YYYY_MM_DD_HH_MM_SS));
result.setStartTime(timeWindow.getStart());
result.setEndTime(timeWindow.getEnd());
collector.collect(result);
}
}
}
}).setParallelism(2).name("统计路口失衡指数");
SingleOutputStreamOperator<String> jsonStreamOfUnbalance = unbalanceIndexStream.flatMap(new FlatMapFunction<UnbalanceResult, String>() { SingleOutputStreamOperator<String> jsonStreamOfUnbalance = unbalanceIndexStream.flatMap(new FlatMapFunction<UnbalanceResult, String>() {
@Override @Override
...@@ -690,7 +632,7 @@ public class CarTrajectoryIndexMain { ...@@ -690,7 +632,7 @@ public class CarTrajectoryIndexMain {
//将失衡结果写入mysql //将失衡结果写入mysql
unbalanceIndexStream.addSink(JdbcSink.sink( unbalanceIndexStream.addSink(JdbcSink.sink(
"insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?)", "insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, duration) values(?,?,?,?,?,?,?)",
new JdbcStatementBuilder<UnbalanceResult>() { new JdbcStatementBuilder<UnbalanceResult>() {
@Override @Override
public void accept(PreparedStatement preparedStatement, UnbalanceResult unbalanceResult) throws SQLException { public void accept(PreparedStatement preparedStatement, UnbalanceResult unbalanceResult) throws SQLException {
...@@ -699,10 +641,9 @@ public class CarTrajectoryIndexMain { ...@@ -699,10 +641,9 @@ public class CarTrajectoryIndexMain {
preparedStatement.setString(3, unbalanceResult.getCrossId()); preparedStatement.setString(3, unbalanceResult.getCrossId());
String eventSerialNumber = UUID.randomUUID().toString().replace("-",""); String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(4, eventSerialNumber); preparedStatement.setString(4, eventSerialNumber);
preparedStatement.setString(5, "1"); preparedStatement.setString(5, "7");
preparedStatement.setTimestamp(6, new Timestamp(unbalanceResult.getStartTime())); preparedStatement.setTimestamp(6, new Timestamp(unbalanceResult.getStartTime()));
preparedStatement.setTimestamp(7, new Timestamp(unbalanceResult.getEndTime())); preparedStatement.setInt(7, unbalanceResult.getDuration());
preparedStatement.setInt(8, 1);
} }
}, jdbcExecutionOptions, jdbcBuild)) }, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1) .setParallelism(1)
......
package com.wanji.indicators.task.trajectory; package com.wanji.indicators.task.trajectory;
import java.util.UUID; import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/** /**
* @author : jenny * @author : jenny
...@@ -10,7 +12,11 @@ import java.util.UUID; ...@@ -10,7 +12,11 @@ import java.util.UUID;
*/ */
public class Test2 { public class Test2 {
public static void main(String[] args) { public static void main(String[] args) {
String eventSerialNumber = UUID.randomUUID().toString().replace("-",""); List<String> list = new ArrayList<>();
System.out.println(eventSerialNumber); List<String> filteredList = list.stream()
.filter(element -> element.length() > 4)
.collect(Collectors.toList());
System.out.println(filteredList.size());
} }
} }
...@@ -88,6 +88,6 @@ public class CarDataValidatorFlatMap implements FlatMapFunction<FrameModel, CarT ...@@ -88,6 +88,6 @@ public class CarDataValidatorFlatMap implements FlatMapFunction<FrameModel, CarT
} }
public static boolean isQualifiedPlate(String plate){ public static boolean isQualifiedPlate(String plate){
return StringUtils.isNotEmpty(plate) && !Objects.equals("null", plate) && !Objects.equals("默A00000", plate); return StringUtils.isNotEmpty(plate) && !Objects.equals("null", plate) && !plate.contains("00000");
} }
} }
package com.wanji.indicators.task.trajectory.func;
import com.wanji.indicators.task.trajectory.CarTrajectoryIndexMain;
import com.wanji.indicators.task.trajectory.pojo.CongestionResult;
import com.wanji.indicators.task.trajectory.pojo.StartDuration;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
/**
* 协调方向拥堵数据
*
* @author Kent HAN
* @date 2023/11/14 10:02
*/
public class CoordCongestionDataFunction extends KeyedProcessFunction<String, CongestionResult, CongestionResult> {
// Key:方向
private MapState<Integer, StartDuration> dirState;
@Override
public void open(Configuration parameters) throws Exception {
dirState = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("dir_map",
Integer.class, StartDuration.class));
}
@Override
public void processElement(CongestionResult value,
KeyedProcessFunction<String, CongestionResult, CongestionResult>.Context ctx,
Collector<CongestionResult> out) throws Exception {
String rid = ctx.getCurrentKey();
Integer dir = CarTrajectoryIndexMain.ridEndIndirMap.get(rid);
value.setDir(dir);
Double index = value.getIndex();
boolean isCongestion = index > 1.5;
Integer stateKey = dir;
if (isCongestion) {
try {
if (!dirState.contains(stateKey)) {
StartDuration startDuration = new StartDuration();
// 获取当前的时间戳
Instant now = Instant.now();
// 获取1分钟前的时间戳
Instant oneMinuteBefore = now.minus(1, ChronoUnit.MINUTES);
// 转换成毫秒
long startTimeMilli = oneMinuteBefore.toEpochMilli();
value.setStartTime(startTimeMilli);
startDuration.setStartTime(startTimeMilli);
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
value.setDuration(durationInt);
startDuration.setDuration(durationInt);
dirState.put(stateKey, startDuration);
} else {
StartDuration startDuration = dirState.get(stateKey);
Long startTimeMilli = startDuration.getStartTime();
value.setStartTime(startTimeMilli);
Instant now = Instant.now();
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
startDuration.setDuration(durationInt);
value.setDuration(durationInt);
dirState.put(stateKey, startDuration);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
// 删除状态
try {
Iterator<Integer> mapStateIterator = dirState.keys().iterator();
while (mapStateIterator.hasNext()){
Integer key = mapStateIterator.next();
if(key.equals(stateKey)){
mapStateIterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
out.collect(value);
}
}
package com.wanji.indicators.task.trajectory.func;
import com.wanji.indicators.constant.CongestEnum;
import com.wanji.indicators.task.trajectory.CarTrajectoryIndexMain;
import com.wanji.indicators.task.trajectory.pojo.CongestionResult;
import com.wanji.indicators.task.trajectory.pojo.StartDuration;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Instant;
import java.util.*;
public class MergeCongestionFunction extends ProcessFunction<CongestionResult, CongestionResult> {
// 保存 dirCongestion 的状态,Key:路口Id-方向,Value:指数
private MapState<String, Double> dirState;
// 保存开始时间和持续时长。Key:路口ID
private MapState<String, StartDuration> crossState;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Double> dirStateDescriptor = new MapStateDescriptor<>(
"dirState", String.class, Double.class);
MapStateDescriptor<String, StartDuration> crossStateDescriptor = new MapStateDescriptor<>(
"crossState", String.class, StartDuration.class);
dirState = getRuntimeContext().getMapState(dirStateDescriptor);
crossState = getRuntimeContext().getMapState(crossStateDescriptor);
}
@Override
public void processElement(CongestionResult value,
ProcessFunction<CongestionResult, CongestionResult>.Context ctx,
Collector<CongestionResult> out) throws Exception {
String crossId = value.getCrossId();
Integer dir = value.getDir();
String stateKey = crossId + "-" + dir;
Double index = value.getIndex();
dirState.put(stateKey, index);
Iterable<String> dirKeys = dirState.keys();
List<String> crossDirs = CarTrajectoryIndexMain.crossDirs.get(crossId);
if (dirKeys != null) {
// 路口各方向数据是否齐全
Set<String> allKeys = new HashSet<>();
dirKeys.forEach(allKeys::add);
boolean containsAll = allKeys.containsAll(crossDirs);
if (containsAll) {
Map<String, Double> dirIndexMap = new HashMap<>();
for (String crossDir : crossDirs) {
Double v = dirState.get(crossDir);
dirIndexMap.put(crossDir, v);
}
// 加总 延时指数*车道数量
double numeratorSum = 0.0;
int laneCountsSum = 0;
boolean isAllDirsCongestion = true; // 所有路口是否拥堵
for (Map.Entry<String, Double> entry : dirIndexMap.entrySet()) {
Double dirIndex = entry.getValue();
// 车道数量
String key = entry.getKey();
Integer laneCounts = CarTrajectoryIndexMain.dirLaneCounts.get(key);
double numerator = dirIndex * laneCounts;
numeratorSum += numerator;
laneCountsSum += laneCounts;
if (dirIndex <= 0.6) {
isAllDirsCongestion = false;
}
}
double crossIndex = numeratorSum / laneCountsSum;
CongestionResult congestionResult = new CongestionResult();
congestionResult.setCrossId(crossId);
congestionResult.setIndex(crossIndex);
boolean isCrossCongestion = false;
if (crossIndex >= 2 && isAllDirsCongestion) {
isCrossCongestion = true;
}
String crossStateKey = crossId;
if (isCrossCongestion) {
String indexName;
String congestionType;
if (crossIndex <= 2.0) {
indexName = "轻度拥堵";
congestionType = CongestEnum.LIGHT_CONGEST.getType();
} else if (crossIndex <= 3.0) {
indexName = "中度拥堵";
congestionType = CongestEnum.MODERATE_CONGEST.getType();
} else {
indexName = "重度拥堵";
congestionType = CongestEnum.HEAVY_CONGEST.getType();
}
congestionResult.setIndexName(indexName);
congestionResult.setCongestionType(congestionType);
// 保存状态
try {
if (!crossState.contains(crossStateKey)) {
StartDuration startDuration = new StartDuration();
// 获取当前的时间戳
Instant now = Instant.now();
long nowTimeMilli = now.toEpochMilli();
congestionResult.setStartTime(nowTimeMilli);
startDuration.setStartTime(nowTimeMilli);
congestionResult.setDuration(0);
startDuration.setDuration(0);
crossState.put(crossStateKey, startDuration);
} else {
StartDuration startDuration = crossState.get(crossStateKey);
Long startTimeMilli = startDuration.getStartTime();
congestionResult.setStartTime(startTimeMilli);
Instant now = Instant.now();
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
startDuration.setDuration(durationInt);
congestionResult.setDuration(durationInt);
crossState.put(crossStateKey, startDuration);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
// 删除状态
try {
Iterator<String> mapStateIterator = crossState.keys().iterator();
while (mapStateIterator.hasNext()){
String key = mapStateIterator.next();
if(key.equals(crossStateKey)){
mapStateIterator.remove();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
out.collect(congestionResult);
}
}
}
}
\ No newline at end of file
package com.wanji.indicators.task.trajectory.func;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.task.trajectory.CarTrajectoryIndexMain;
import com.wanji.indicators.task.trajectory.pojo.OverFlowDetail;
import com.wanji.indicators.task.trajectory.pojo.OverFlowAndDeadlockEvent;
import com.wanji.indicators.task.trajectory.pojo.StartDuration;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.springframework.util.CollectionUtils;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* @author Kent HAN
* @date 2023/11/16 16:08
*/
public class OverflowFunction extends ProcessWindowFunction<CarTrackModel, OverFlowAndDeadlockEvent, String, TimeWindow> {
double avgSpeedEvaluationIndex = Double.parseDouble(CarTrajectoryIndexMain.properties.getProperty("road.overflow.avg.speed"));
int durationEvaluationIndex = Integer.parseInt(CarTrajectoryIndexMain.properties.getProperty("road.overflow.duration"));
int carNumber = Integer.parseInt(CarTrajectoryIndexMain.properties.getProperty("cross.road.deadlock.car.number"));
double crossAvgSpeedIndex = Double.parseDouble(CarTrajectoryIndexMain.properties.getProperty("cross.road.deadlock.avg.speed"));
// 方向溢出状态。Key:方向
private MapState<Integer, StartDuration> dirOverflowState;
// 路口溢出状态。Key:路口ID
private MapState<String, StartDuration> crossOverflowState;
// 路口死锁状态。Key:路口ID
private MapState<String, StartDuration> crossDeadlockState;
@Override
public void open(Configuration parameters) throws Exception {
dirOverflowState = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("dirOverflowState",
Integer.class, StartDuration.class));
crossOverflowState = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("crossOverflowState",
String.class, StartDuration.class));
crossDeadlockState = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("crossDeadlockState",
String.class, StartDuration.class));
}
@Override
public void process(String crossId, ProcessWindowFunction<CarTrackModel, OverFlowAndDeadlockEvent, String, TimeWindow>.Context context, Iterable<CarTrackModel> iterable, Collector<OverFlowAndDeadlockEvent> collector) {
List<CarTrackModel> carInCrossAndExitList = StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
if(!CollectionUtils.isEmpty(carInCrossAndExitList)){
List<CarTrackModel> carTracesInSpecificArea = carInCrossAndExitList.stream().filter(car -> car.getIsInSpecificArea() == 1).collect(Collectors.toList());
List<CarTrackModel> carTracesInCrossRoad = carInCrossAndExitList.stream().filter(car -> car.getRoadnet().getInCrossFlag() == 1).collect(Collectors.toList());
Map<String, List<CarTrackModel>> groupMapInExitArea = carTracesInSpecificArea.stream()
.collect(Collectors.groupingBy(car -> car.getRoadnet().getRid()));
// 对每个出口区域进行统计
if(!groupMapInExitArea.isEmpty()){
List<OverFlowDetail> details = new ArrayList<>();
groupMapInExitArea.forEach((rid, cars) -> {
// 统计路口出口内的车辆数量
Map<String, List<CarTrackModel>> carMapGroupByPlate = cars.stream()
.collect(Collectors.groupingBy(CarTrackModel::getPicLicense));
if(carMapGroupByPlate.size() > 5){
//按照轨迹上报时间进行升序排列
List<CarTrackModel> sorted = cars.stream()
.sorted(Comparator.comparing(CarTrackModel::getGlobalTimeStamp))
.collect(Collectors.toList());
//根据时间戳进行分组,找出同一时刻的轨迹数据
Map<Long, List<CarTrackModel>> groupMapByTimestamp = sorted.stream()
.collect(Collectors.groupingBy(CarTrackModel::getGlobalTimeStamp));
//存放时间戳和其对应的平均速度
Map<Long, Double> map = new HashMap<>();
groupMapByTimestamp.forEach((timestamp, traces) -> {
double avgSpeed = traces.stream().mapToDouble(CarTrackModel::getSpeed)
.summaryStatistics().getAverage();
map.put(timestamp, avgSpeed);
});
//出口车道的平均速度
double ridAvgSpeed = map.values().stream()
.mapToDouble(number -> number)
.summaryStatistics().getAverage();
List<Map.Entry<Long, Double>> entryList = map.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.collect(Collectors.toList());
int consecutiveSeconds = 0;//连续时间
long overflowTimestamp = entryList.get(0).getKey();//发生溢出的时间戳
for(int i = 0; i < entryList.size() - 1; i++){
Map.Entry<Long, Double> current = entryList.get(i);
Map.Entry<Long, Double> next = entryList.get(i + 1);
if(current.getValue() < avgSpeedEvaluationIndex && next.getValue() < avgSpeedEvaluationIndex){
consecutiveSeconds += Math.toIntExact((next.getKey() - current.getKey()));
overflowTimestamp = next.getKey();
}else{
consecutiveSeconds = 0;
}
}
double index = 30 / ridAvgSpeed;
Integer dir = CarTrajectoryIndexMain.ridStartOutdirMap.get(rid);
if(consecutiveSeconds > durationEvaluationIndex && index > 5){
// 方向发生溢出
OverFlowDetail detail = new OverFlowDetail();
detail.setRid(rid);
detail.setCrossId(crossId);
detail.setIndex(Double.parseDouble(String.format("%.2f", index)));
try {
if (!dirOverflowState.contains(dir)) {
StartDuration startDuration = new StartDuration();
// 获取当前的时间戳
Instant now = Instant.now();
// 获取1分钟前的时间戳
Instant oneMinuteBefore = now.minus(1, ChronoUnit.MINUTES);
// 转换成毫秒
long startTimeMilli = oneMinuteBefore.toEpochMilli();
detail.setStartTime(startTimeMilli);
startDuration.setStartTime(startTimeMilli);
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
detail.setDuration(durationInt);
startDuration.setDuration(durationInt);
dirOverflowState.put(dir, startDuration);
} else {
StartDuration startDuration = dirOverflowState.get(dir);
Long startTimeMilli = startDuration.getStartTime();
detail.setStartTime(startTimeMilli);
Instant now = Instant.now();
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
startDuration.setDuration(durationInt);
detail.setDuration(durationInt);
dirOverflowState.put(dir, startDuration);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
details.add(detail);
} else {
// 删除状态
try {
Iterator<Integer> mapStateIterator = dirOverflowState.keys().iterator();
while (mapStateIterator.hasNext()){
Integer key = mapStateIterator.next();
if(key.equals(dir)){
mapStateIterator.remove();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
});
// 存在路口发生溢出的情况
if(!details.isEmpty()){
OverFlowAndDeadlockEvent result = new OverFlowAndDeadlockEvent();
result.setCrossId(crossId);
result.setDetails(details);
//溢出上报时间戳
result.setGlobalTimeStamp(context.window().getEnd());
result.setDateTime(DateUtil.toDateTime(result.getGlobalTimeStamp(), DateUtil.YYYY_MM_DD_HH_MM_SS));
double avgIndex = details.stream()
.mapToDouble(OverFlowDetail::getIndex)
.average().orElse(0.0);
result.setOverflowIndex(avgIndex);
// 保存状态
try {
if (!crossOverflowState.contains(crossId)) {
StartDuration startDuration = new StartDuration();
// 获取当前的时间戳
Instant now = Instant.now();
// 获取1分钟前的时间戳
Instant oneMinuteBefore = now.minus(1, ChronoUnit.MINUTES);
// 转换成毫秒
long startTimeMilli = oneMinuteBefore.toEpochMilli();
result.setOverflowStartTime(startTimeMilli);
startDuration.setStartTime(startTimeMilli);
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
result.setOverflowDuration(durationInt);
startDuration.setDuration(durationInt);
crossOverflowState.put(crossId, startDuration);
} else {
StartDuration startDuration = crossOverflowState.get(crossId);
Long startTimeMilli = startDuration.getStartTime();
result.setOverflowStartTime(startTimeMilli);
Instant now = Instant.now();
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
startDuration.setDuration(durationInt);
result.setOverflowDuration(durationInt);
crossOverflowState.put(crossId, startDuration);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
//判断该路口死锁
Map<String, List<CarTrackModel>> groupMapByPlate = carTracesInCrossRoad.stream()
.collect(Collectors.groupingBy(CarTrackModel::getPicLicense));
double crossAvgSpeed = carTracesInCrossRoad.stream()
.mapToDouble(CarTrackModel::getSpeed)
.summaryStatistics().getAverage();
if(details.size() >= 2 && groupMapByPlate.size() > carNumber && crossAvgSpeed < crossAvgSpeedIndex){
// 发生死锁
result.setDeadlock(true);
// 保存状态
try {
if (!crossDeadlockState.contains(crossId)) {
StartDuration startDuration = new StartDuration();
// 获取当前的时间戳
Instant now = Instant.now();
// 获取1分钟前的时间戳
Instant oneMinuteBefore = now.minus(1, ChronoUnit.MINUTES);
// 转换成毫秒
long startTimeMilli = oneMinuteBefore.toEpochMilli();
result.setDeadlockStartTime(startTimeMilli);
startDuration.setStartTime(startTimeMilli);
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
result.setDeadlockDuration(durationInt);
startDuration.setDuration(durationInt);
crossDeadlockState.put(crossId, startDuration);
} else {
StartDuration startDuration = crossDeadlockState.get(crossId);
Long startTimeMilli = startDuration.getStartTime();
result.setDeadlockStartTime(startTimeMilli);
Instant now = Instant.now();
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
startDuration.setDuration(durationInt);
result.setDeadlockDuration(durationInt);
crossDeadlockState.put(crossId, startDuration);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
// 删除状态
try {
Iterator<String> mapStateIterator = crossDeadlockState.keys().iterator();
while (mapStateIterator.hasNext()){
String key = mapStateIterator.next();
if(key.equals(crossId)){
mapStateIterator.remove();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
collector.collect(result);
} else {
// 删除状态
try {
Iterator<String> mapStateIterator = crossOverflowState.keys().iterator();
while (mapStateIterator.hasNext()){
String key = mapStateIterator.next();
if(key.equals(crossId)){
mapStateIterator.remove();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}
}
...@@ -5,6 +5,7 @@ import com.wanji.indicators.constant.LightStatusEnum; ...@@ -5,6 +5,7 @@ import com.wanji.indicators.constant.LightStatusEnum;
import com.wanji.indicators.model.CarTrackModel; import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.CrossFrameModel; import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.CrossRidTurnLampStatusModel; import com.wanji.indicators.model.CrossRidTurnLampStatusModel;
import com.wanji.indicators.task.trajectory.CarTrajectoryIndexMain;
import com.wanji.indicators.task.trajectory.pojo.PhaseEmptyResult; import com.wanji.indicators.task.trajectory.pojo.PhaseEmptyResult;
import com.wanji.indicators.task.trajectory.pojo.PhaseEmptyTimeModel; import com.wanji.indicators.task.trajectory.pojo.PhaseEmptyTimeModel;
import com.wanji.indicators.util.GeomsConvertUtil; import com.wanji.indicators.util.GeomsConvertUtil;
...@@ -23,12 +24,13 @@ import java.util.stream.Collectors; ...@@ -23,12 +24,13 @@ import java.util.stream.Collectors;
* @create: 2023-11-08 5:06 下午 * @create: 2023-11-08 5:06 下午
*/ */
public class PhaseEmptyTimeProcessFunction extends KeyedProcessFunction<String, CrossFrameModel, PhaseEmptyResult> { public class PhaseEmptyTimeProcessFunction extends KeyedProcessFunction<String, CrossFrameModel, PhaseEmptyResult> {
// 3转向与15转向对应
private static final Map<String, Set<Integer>> lightTurnControlMap = new HashMap<>(); private static final Map<String, Set<Integer>> lightTurnControlMap = new HashMap<>();
//车道上离停止线最近的车辆缓存 //车道上离停止线最近的车辆缓存,key:crossId-dir-turn-lane
private MapState<String, CarTrackModel> mapState; private MapState<String, CarTrackModel> mapState;
//绿灯总时长 key:crossId-dir-turn //绿灯总时长 key:crossId-dir-turn
private MapState<String, Integer> greenTotalTimeState; private MapState<String, Integer> greenTotalTimeState;
//车道车头时距缓存 //车道车头时距缓存 key:crossId-dir-turn-lane
private MapState<String, PhaseEmptyTimeModel> resultState; private MapState<String, PhaseEmptyTimeModel> resultState;
static { static {
...@@ -64,7 +66,14 @@ public class PhaseEmptyTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -64,7 +66,14 @@ public class PhaseEmptyTimeProcessFunction extends KeyedProcessFunction<String,
.filter(light -> Objects.nonNull(light) && !StringUtils.isEmpty(light.getLampState()) && light.getLampState().trim().equals(LightStatusEnum.GREEN.getType())) .filter(light -> Objects.nonNull(light) && !StringUtils.isEmpty(light.getLampState()) && light.getLampState().trim().equals(LightStatusEnum.GREEN.getType()))
.collect(Collectors.toList()); .collect(Collectors.toList());
Set<String> sets = greenLights.stream().map(greenLight -> greenLight.getCrossId() + "-" + greenLight.getDir() + "-" + greenLight.getTurn()) // 过滤掉行人灯和右转灯
List<CrossRidTurnLampStatusModel> greenLightsExcludePedAndRight = greenLights.stream()
.filter(light -> !light.getTurn().equals(Constant.PEDESTRAIN_LIGHT_CODE))
.filter(light -> !light.getTurn().equals(Constant.RIGHT_LIGHT_CODE))
.collect(Collectors.toList());
Set<String> sets = greenLightsExcludePedAndRight.stream()
.map(greenLight -> greenLight.getCrossId() + "-" + greenLight.getDir() + "-" + greenLight.getTurn())
.collect(Collectors.toSet()); .collect(Collectors.toSet());
//存储绿灯倒计时结束的放行绿灯和时长 //存储绿灯倒计时结束的放行绿灯和时长
...@@ -78,79 +87,77 @@ public class PhaseEmptyTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -78,79 +87,77 @@ public class PhaseEmptyTimeProcessFunction extends KeyedProcessFunction<String,
} }
} }
for(CrossRidTurnLampStatusModel greenLight: greenLights){ for(CrossRidTurnLampStatusModel greenLight: greenLightsExcludePedAndRight){
//更新绿灯放行方向的时长和倒计时 //更新绿灯放行方向的时长和倒计时
String combinedKey = greenLight.getCrossId() + "-" + greenLight.getDir() + "-" + greenLight.getTurn(); String combinedKey = greenLight.getCrossId() + "-" + greenLight.getDir() + "-" + greenLight.getTurn();
if(!greenTotalTimeState.contains(combinedKey)){ if(!greenTotalTimeState.contains(combinedKey)){
greenTotalTimeState.put(combinedKey, greenLight.getCyclePhaseCountDown()); greenTotalTimeState.put(combinedKey, greenLight.getCyclePhaseCountDown());
} }
//排除行人灯 //绿灯放行方向进入路口的车辆
if(!greenLight.getTurn().equals("20")){ List<CarTrackModel> runList = trackList.stream()
//绿灯放行方向进入路口的车辆 .filter(car -> car.getRoadnet().getInCrossFlag() == Constant.CROSSING && greenLight.getDir().compareTo(car.getRoadnet().getRidDir8()) == 0)
List<CarTrackModel> runList = trackList.stream() .filter(car -> lightTurnControlMap.get(greenLight.getTurn()).contains(car.getRoadnet().getTurn()))
.filter(car -> car.getRoadnet().getInCrossFlag() == Constant.CROSSING && greenLight.getDir().compareTo(car.getRoadnet().getRidDir8()) == 0) .collect(Collectors.toList());
.filter(car -> lightTurnControlMap.get(greenLight.getTurn()).contains(car.getRoadnet().getTurn()))
.collect(Collectors.toList()); if(!runList.isEmpty()){
//按照车道进行分组
if(!runList.isEmpty()){ Map<String, List<CarTrackModel>> groupByLane = runList.stream()
//按照车道进行分组 .collect(Collectors.groupingBy(O -> O.getRoadnet().getLaneId()));
Map<String, List<CarTrackModel>> groupByLane = runList.stream().collect(Collectors.groupingBy(O -> O.getRoadnet().getLaneId()));
for (Map.Entry<String, List<CarTrackModel>> entry1 : groupByLane.entrySet()) { for (Map.Entry<String, List<CarTrackModel>> entry1 : groupByLane.entrySet()) {
List<CarTrackModel> laneCarList = entry1.getValue(); List<CarTrackModel> laneCarList = entry1.getValue();
//计算路口内车辆到停止线距离 //计算路口内车辆到停止线距离
laneCarList.forEach(o -> { laneCarList.forEach(o -> {
String laneEndPoint = o.getRoadnet().getLaneEndPoint(); String laneEndPoint = o.getRoadnet().getLaneEndPoint();
if (Objects.nonNull(laneEndPoint)) { if (Objects.nonNull(laneEndPoint)) {
String[] sps = laneEndPoint.split(","); String[] sps = laneEndPoint.split(",");
//距离停止线距离 //距离停止线距离
double stopLineDist = GeomsConvertUtil.getDistance(o.getLongitude(), o.getLatitude(), Double.parseDouble(sps[0]), Double.parseDouble(sps[1])); double stopLineDist = GeomsConvertUtil.getDistance(o.getLongitude(), o.getLatitude(), Double.parseDouble(sps[0]), Double.parseDouble(sps[1]));
o.setStopLineDist(stopLineDist); o.setStopLineDist(stopLineDist);
} }
}); });
//按距离排序 //按距离排序
laneCarList = laneCarList.stream().sorted(Comparator.comparing(CarTrackModel::getStopLineDist)).collect(Collectors.toList()); laneCarList = laneCarList.stream().sorted(Comparator.comparing(CarTrackModel::getStopLineDist)).collect(Collectors.toList());
//取靠近停止线的一辆车 //取靠近停止线的一辆车
CarTrackModel lastCar = laneCarList.get(0); CarTrackModel lastCar = laneCarList.get(0);
String laneId = lastCar.getRoadnet().getLaneId(); String laneId = lastCar.getRoadnet().getLaneId();
String key = combinedKey + "-" + laneId; String key = combinedKey + "-" + laneId;
CarTrackModel agoCar = mapState.get(key); CarTrackModel agoCar = mapState.get(key);
//绿灯区间内计算 //绿灯区间内计算
if(agoCar != null){ if(agoCar != null){
Integer agoTrackId = agoCar.getId(); Integer agoTrackId = agoCar.getId();
Integer trackId = lastCar.getId(); Integer trackId = lastCar.getId();
//前车已经驶出停止线 // 后车已进入路口
if(!Objects.equals(agoTrackId, trackId)){ if(!Objects.equals(agoTrackId, trackId)){
//计算车辆通过停止线的时间间隔 double gapThroughTime = (lastCar.getGlobalTimeStamp() - agoCar.getGlobalTimeStamp()) / 1000.0;
double gapThroughTime = (lastCar.getGlobalTimeStamp() - agoCar.getGlobalTimeStamp()) / 1000.0; // 累加车辆连续通过停止线时间
if(gapThroughTime < 8){ if(gapThroughTime < 8){
PhaseEmptyTimeModel agoTimeModel = resultState.get(key); PhaseEmptyTimeModel agoTimeModel = resultState.get(key);
if(agoTimeModel != null){ if(agoTimeModel != null){
gapThroughTime += agoTimeModel.getHeadTime(); gapThroughTime += agoTimeModel.getHeadTime();
}
PhaseEmptyTimeModel phaseEmptyTimeModel = new PhaseEmptyTimeModel();
phaseEmptyTimeModel.setCrossId(crossId);
phaseEmptyTimeModel.setRid(lastCar.getRoadnet().getRid());
phaseEmptyTimeModel.setLaneId(laneId);
phaseEmptyTimeModel.setDir(greenLight.getDir());
phaseEmptyTimeModel.setTurn(greenLight.getTurn());
phaseEmptyTimeModel.setHeadTime(gapThroughTime);
phaseEmptyTimeModel.setGreenTime(greenTotalTimeState.get(combinedKey));
resultState.put(key, phaseEmptyTimeModel);
} }
mapState.put(key, lastCar);
PhaseEmptyTimeModel phaseEmptyTimeModel = new PhaseEmptyTimeModel();
phaseEmptyTimeModel.setCrossId(crossId);
phaseEmptyTimeModel.setRid(lastCar.getRoadnet().getRid());
phaseEmptyTimeModel.setLaneId(laneId);
phaseEmptyTimeModel.setDir(greenLight.getDir());
phaseEmptyTimeModel.setTurn(greenLight.getTurn());
phaseEmptyTimeModel.setHeadTime(gapThroughTime);
phaseEmptyTimeModel.setGreenTime(greenTotalTimeState.get(combinedKey));
resultState.put(key, phaseEmptyTimeModel);
} }
}else{
mapState.put(key, lastCar); mapState.put(key, lastCar);
} }
}else{
mapState.put(key, lastCar);
} }
} }
} }
} }
//存在绿灯放行倒计时结束的情况 //存在绿灯放行倒计时结束的情况
if(!expiredGreenLightMap.isEmpty()){ if(!expiredGreenLightMap.isEmpty()){
...@@ -164,6 +171,7 @@ public class PhaseEmptyTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -164,6 +171,7 @@ public class PhaseEmptyTimeProcessFunction extends KeyedProcessFunction<String,
Iterator<String> resultStateIterator = resultState.keys().iterator(); Iterator<String> resultStateIterator = resultState.keys().iterator();
while (resultStateIterator.hasNext()){ while (resultStateIterator.hasNext()){
String key = resultStateIterator.next(); String key = resultStateIterator.next();
// key:crossId-dir-turn-lane;combinedKey:crossId-dir-turn
if(key.startsWith(combinedKey)){ if(key.startsWith(combinedKey)){
stats.add(resultState.get(key)); stats.add(resultState.get(key));
resultStateIterator.remove(); resultStateIterator.remove();
...@@ -179,33 +187,43 @@ public class PhaseEmptyTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -179,33 +187,43 @@ public class PhaseEmptyTimeProcessFunction extends KeyedProcessFunction<String,
} }
} }
//对于非行人绿灯判断是否相位空放 if(!stats.isEmpty()){
if(!splits[2].equals("20") && !stats.isEmpty()){ double average = stats.stream()
double average = stats.stream().mapToDouble(PhaseEmptyTimeModel::getHeadTime).summaryStatistics().getAverage(); .mapToDouble(PhaseEmptyTimeModel::getHeadTime)
.summaryStatistics()
.getAverage();
double index = (totalGreenLightTime - average) / totalGreenLightTime; double index = (totalGreenLightTime - average) / totalGreenLightTime;
if(index > 0.2){ if(index > 0.2){
//判断该相位内其他行人灯是否<=20s // 判断该相位内行人灯是否满足最小通过时长
List<CrossRidTurnLampStatusModel> pedestrainGreenLights = greenLights.stream() // combinedKey:路口ID-方向-转向
.filter(light -> light.getTurn().equals("20") && light.getCyclePhaseCountDown() <= 20) Integer pedGreenSeconds = CarTrajectoryIndexMain.crossDirTurnPedGreen.get(combinedKey);
.collect(Collectors.toList()); if (pedGreenSeconds != null) {
if(!pedestrainGreenLights.isEmpty()){ if (totalGreenLightTime > pedGreenSeconds) {
//满足相位空放的条件 // 有行人绿灯,且可压缩,满足相位空放条件
PhaseEmptyResult phaseEmptyResult = new PhaseEmptyResult(); collectResult(context, collector, crossId, totalGreenLightTime, splits, stats, index);
phaseEmptyResult.setCrossId(crossId); }
phaseEmptyResult.setRid(stats.get(0).getRid()); } else {
phaseEmptyResult.setDirection(splits[1]); // 无行人绿灯,满足相位空放条件
phaseEmptyResult.setTurn(splits[2]); collectResult(context, collector, crossId, totalGreenLightTime, splits, stats, index);
phaseEmptyResult.setIndex(index);
phaseEmptyResult.setGlobalTimeStamp(context.timestamp());
phaseEmptyResult.setEndTime(context.timestamp());
phaseEmptyResult.setStartTime(phaseEmptyResult.getEndTime() - totalGreenLightTime * 1000);
phaseEmptyResult.setDuration(totalGreenLightTime / 60);
collector.collect(phaseEmptyResult);
} }
} }
} }
} }
} }
} }
private void collectResult(KeyedProcessFunction<String, CrossFrameModel, PhaseEmptyResult>.Context context,
Collector<PhaseEmptyResult> collector, String crossId, Integer totalGreenLightTime,
String[] splits, List<PhaseEmptyTimeModel> stats, double index) {
PhaseEmptyResult phaseEmptyResult = new PhaseEmptyResult();
phaseEmptyResult.setCrossId(crossId);
phaseEmptyResult.setRid(stats.get(0).getRid());
phaseEmptyResult.setDirection(splits[1]);
phaseEmptyResult.setTurn(splits[2]);
phaseEmptyResult.setIndex(index);
phaseEmptyResult.setGlobalTimeStamp(context.timestamp());
collector.collect(phaseEmptyResult);
}
} }
package com.wanji.indicators.task.trajectory.func; package com.wanji.indicators.task.trajectory.func;
import com.wanji.indicators.model.CarTrackModel; import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.task.trajectory.pojo.OverFlowIndexResult; import com.wanji.indicators.task.trajectory.pojo.OverFlowAndDeadlockEvent;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
...@@ -9,9 +9,7 @@ import org.apache.flink.util.Collector; ...@@ -9,9 +9,7 @@ import org.apache.flink.util.Collector;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
/** /**
...@@ -20,7 +18,7 @@ import java.util.stream.StreamSupport; ...@@ -20,7 +18,7 @@ import java.util.stream.StreamSupport;
* @createTime : 2023/10/27 10:39 * @createTime : 2023/10/27 10:39
* @Description : * @Description :
*/ */
public class ProcessOverFlowFunction extends ProcessWindowFunction<CarTrackModel, OverFlowIndexResult, String, TimeWindow> { public class ProcessOverFlowFunction extends ProcessWindowFunction<CarTrackModel, OverFlowAndDeadlockEvent, String, TimeWindow> {
private Integer agvSpeed = 5; private Integer agvSpeed = 5;
private Integer duration = 3; private Integer duration = 3;
...@@ -42,7 +40,7 @@ public class ProcessOverFlowFunction extends ProcessWindowFunction<CarTrackModel ...@@ -42,7 +40,7 @@ public class ProcessOverFlowFunction extends ProcessWindowFunction<CarTrackModel
} }
@Override @Override
public void process(String s, ProcessWindowFunction<CarTrackModel, OverFlowIndexResult, String, TimeWindow>.Context context, Iterable<CarTrackModel> iterable, Collector<OverFlowIndexResult> collector) throws Exception { public void process(String s, ProcessWindowFunction<CarTrackModel, OverFlowAndDeadlockEvent, String, TimeWindow>.Context context, Iterable<CarTrackModel> iterable, Collector<OverFlowAndDeadlockEvent> collector) throws Exception {
List<CarTrackModel> carInCrossRoadList = StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList()); List<CarTrackModel> carInCrossRoadList = StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
} }
} }
package com.wanji.indicators.task.trajectory.func;
import com.wanji.indicators.constant.CongestEnum;
import com.wanji.indicators.task.trajectory.pojo.CongestionResult;
import com.wanji.indicators.task.trajectory.pojo.StartDuration;
import com.wanji.indicators.task.trajectory.pojo.UnbalanceResult;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.springframework.util.CollectionUtils;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* @author Kent HAN
* @date 2023/11/16 17:45
*/
public class UnbalanceFunction extends ProcessWindowFunction<CongestionResult, UnbalanceResult, String, TimeWindow> {
// 开始时间和持续时长状态。Key:路口ID
private MapState<String, StartDuration> crossState;
@Override
public void open(Configuration parameters) {
crossState = getRuntimeContext()
.getMapState(new MapStateDescriptor<>(
"crossState", String.class, StartDuration.class));
}
@Override
public void process(String crossId, ProcessWindowFunction<CongestionResult, UnbalanceResult, String, TimeWindow>.Context context, Iterable<CongestionResult> iterable, Collector<UnbalanceResult> collector) throws Exception {
List<CongestionResult> collect = StreamSupport.stream(iterable.spliterator(), false)
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(collect) && collect.size() > 1) {
// 获取拥堵指数最大和最小的元素
Optional<CongestionResult> maxElement = collect.stream()
.max(Comparator.comparingDouble(CongestionResult::getIndex));
CongestionResult maxCongestionResult = maxElement.get();
Optional<CongestionResult> minElement = collect.stream()
.min(Comparator.comparingDouble(CongestionResult::getIndex));
CongestionResult minCongestionResult = minElement.get();
Double maxIndex = maxCongestionResult.getIndex();
Double minIndex = minCongestionResult.getIndex();
String maxName = maxCongestionResult.getIndexName();
String minName = minCongestionResult.getIndexName();
double index = maxIndex / minIndex;
String name = CongestEnum.NO_CONGEST.getName();
if (index >= 1.5 && !name.equals(maxName) && !name.equals(minName)) {
// 出现失衡
UnbalanceResult result = new UnbalanceResult();
result.setCrossId(crossId);
result.setIndex(index);
result.setGlobalTimeStamp(new Date().getTime());
result.setTimestamp(DateUtil.toDateTime(result.getGlobalTimeStamp(), DateUtil.YYYY_MM_DD_HH_MM_SS));
try {
if (!crossState.contains(crossId)) {
StartDuration startDuration = new StartDuration();
// 获取当前的时间戳
Instant now = Instant.now();
// 获取1分钟前的时间戳
Instant before = now.minus(1, ChronoUnit.MINUTES);
// 转换成毫秒
long beforeMilli = before.toEpochMilli();
result.setStartTime(beforeMilli);
startDuration.setStartTime(beforeMilli);
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - beforeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
result.setDuration(durationInt);
startDuration.setDuration(durationInt);
crossState.put(crossId, startDuration);
} else {
StartDuration startDuration = crossState.get(crossId);
Long startTimeMilli = startDuration.getStartTime();
result.setStartTime(startTimeMilli);
Instant now = Instant.now();
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
startDuration.setDuration(durationInt);
result.setDuration(durationInt);
crossState.put(crossId, startDuration);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
collector.collect(result);
} else {
// 删除状态
try {
Iterator<String> iter = crossState.keys().iterator();
while (iter.hasNext()){
String key = iter.next();
if(key.equals(crossId)){
iter.remove();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}
package com.wanji.indicators.task.trajectory.func; package com.wanji.indicators.task.trajectory.func;
import com.wanji.indicators.constant.CongestEnum;
import com.wanji.indicators.constant.Constant; import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.constant.LightStatusEnum; import com.wanji.indicators.constant.LightStatusEnum;
import com.wanji.indicators.constant.TurnEnum; import com.wanji.indicators.constant.TurnEnum;
...@@ -7,8 +8,8 @@ import com.wanji.indicators.model.CarTrackModel; ...@@ -7,8 +8,8 @@ import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.CrossFrameModel; import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.CrossRidTurnLampStatusModel; import com.wanji.indicators.model.CrossRidTurnLampStatusModel;
import com.wanji.indicators.task.lightstatus.model.VehicleHeadTimeModel; import com.wanji.indicators.task.lightstatus.model.VehicleHeadTimeModel;
import com.wanji.indicators.task.trajectory.pojo.RidIndexResultOfEastAndWest; import com.wanji.indicators.task.trajectory.pojo.CongestionResult;
import com.wanji.indicators.util.DateUtil; import com.wanji.indicators.task.trajectory.pojo.StartDuration;
import com.wanji.indicators.util.GeomsConvertUtil; import com.wanji.indicators.util.GeomsConvertUtil;
import com.wanji.indicators.util.PtInPolyUtil; import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapState;
...@@ -18,6 +19,8 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction; ...@@ -18,6 +19,8 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
...@@ -28,7 +31,7 @@ import java.util.stream.StreamSupport; ...@@ -28,7 +31,7 @@ import java.util.stream.StreamSupport;
* @createTime : 2023/11/1 16:16 * @createTime : 2023/11/1 16:16
* @Description : * @Description :
*/ */
public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, CrossFrameModel, RidIndexResultOfEastAndWest> { public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, CrossFrameModel, CongestionResult> {
//车道上离停止线最近的车辆缓存 //车道上离停止线最近的车辆缓存
private MapState<String, CarTrackModel> mapState; private MapState<String, CarTrackModel> mapState;
//绿灯总时长 //绿灯总时长
...@@ -36,16 +39,19 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -36,16 +39,19 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
//车道车头时距缓存 //车道车头时距缓存
private MapState<String, VehicleHeadTimeModel> resultState; private MapState<String, VehicleHeadTimeModel> resultState;
// 开始时间和持续时长状态。Key:方向
private MapState<Integer, StartDuration> dirState;
@Override @Override
public void open(Configuration parameters) { public void open(Configuration parameters) {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("light_status_map", String.class, CarTrackModel.class)); mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("light_status_map", String.class, CarTrackModel.class));
resultState = getRuntimeContext().getMapState(new MapStateDescriptor<>("light_result_map", String.class, VehicleHeadTimeModel.class)); resultState = getRuntimeContext().getMapState(new MapStateDescriptor<>("light_result_map", String.class, VehicleHeadTimeModel.class));
greenTotalTimeState = getRuntimeContext().getMapState(new MapStateDescriptor<>("light_total_time_map", String.class, Integer.class)); greenTotalTimeState = getRuntimeContext().getMapState(new MapStateDescriptor<>("light_total_time_map", String.class, Integer.class));
dirState = getRuntimeContext().getMapState(new MapStateDescriptor<>("dir_map", Integer.class, StartDuration.class));
} }
@Override @Override
public void processElement(CrossFrameModel value, Context ctx, Collector<RidIndexResultOfEastAndWest> out) throws Exception { public void processElement(CrossFrameModel value, Context ctx, Collector<CongestionResult> out) throws Exception {
String crossId = ctx.getCurrentKey(); String crossId = ctx.getCurrentKey();
List<CarTrackModel> trackList = value.getTrackList(); List<CarTrackModel> trackList = value.getTrackList();
//灯态数据 //灯态数据
...@@ -54,17 +60,19 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -54,17 +60,19 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
Map<Integer, List<CrossRidTurnLampStatusModel>> groupLightStatus = Map<Integer, List<CrossRidTurnLampStatusModel>> groupLightStatus =
ridLightStatusList.stream() ridLightStatusList.stream()
.filter(o -> Objects.nonNull(o) && !StringUtils.isEmpty(o.getLampState()) && o.getLampState().trim().equals(LightStatusEnum.GREEN.getType())) .filter(o -> Objects.nonNull(o) && !StringUtils.isEmpty(o.getLampState()) && o.getLampState().trim().equals(LightStatusEnum.GREEN.getType()))
.filter(o -> o.getDir() == 1 || o.getDir() == 4 || o.getDir() == 5)//南北方向 .filter(o -> o.getDir() == 1 || o.getDir() == 4 || o.getDir() == 5)//非协调方向
.filter(o -> !o.getTurn().equals(Constant.PEDESTRAIN_LIGHT_CODE)) // 非行人灯
.filter(o -> !o.getTurn().equals(Constant.RIGHT_LIGHT_CODE)) // 非右转灯
.collect(Collectors.groupingBy(CrossRidTurnLampStatusModel::getDir)); .collect(Collectors.groupingBy(CrossRidTurnLampStatusModel::getDir));
//存储当前南北方向绿灯结束的路口和绿灯时长 //存储当前非协调方向绿灯结束的路口和绿灯时长
Map<Integer, Integer> expiredGreenLightMap = new HashMap<>(); Map<Integer, Integer> expiredGreenLightMap = new HashMap<>();
if(!greenTotalTimeState.isEmpty()){ if(!greenTotalTimeState.isEmpty()){
Iterable<String> keys = greenTotalTimeState.keys(); Iterable<String> keys = greenTotalTimeState.keys();
for(String key: keys){ for(String key: keys){
String[] split = key.trim().split("-"); String[] split = key.trim().split("-"); // "路口ID-方向"
int dir = Integer.parseInt(split[1]); int dir = Integer.parseInt(split[1]);
if(groupLightStatus.containsKey(dir)){ if(!groupLightStatus.containsKey(dir)){
expiredGreenLightMap.put(dir, greenTotalTimeState.get(key)); expiredGreenLightMap.put(dir, greenTotalTimeState.get(key));
} }
} }
...@@ -72,9 +80,9 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -72,9 +80,9 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
for (Map.Entry<Integer, List<CrossRidTurnLampStatusModel>> entry : groupLightStatus.entrySet()) { for (Map.Entry<Integer, List<CrossRidTurnLampStatusModel>> entry : groupLightStatus.entrySet()) {
Integer dir = entry.getKey(); Integer dir = entry.getKey();
List<CrossRidTurnLampStatusModel> value1 = entry.getValue();//同个路段上绿灯放行的转向 List<CrossRidTurnLampStatusModel> value1 = entry.getValue(); // 非协调方向绿灯信息
//更新进口道绿灯时长和倒计时 // 更新绿灯相位周期总时长
String combinedKey = crossId + "-" + dir; String combinedKey = crossId + "-" + dir;
if(!greenTotalTimeState.contains(combinedKey)){ if(!greenTotalTimeState.contains(combinedKey)){
greenTotalTimeState.put(combinedKey, value1.get(0).getCyclePhaseCountDown()); greenTotalTimeState.put(combinedKey, value1.get(0).getCyclePhaseCountDown());
...@@ -91,7 +99,7 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -91,7 +99,7 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
for (Map.Entry<String, List<CarTrackModel>> entry1 : groupByLane.entrySet()) { for (Map.Entry<String, List<CarTrackModel>> entry1 : groupByLane.entrySet()) {
List<CarTrackModel> laneCarList = entry1.getValue(); List<CarTrackModel> laneCarList = entry1.getValue();
//计算路口内车辆到停止线距离 // 计算路口内车辆到停止线距离
laneCarList.forEach(o -> { laneCarList.forEach(o -> {
String laneEndPoint = o.getRoadnet().getLaneEndPoint(); String laneEndPoint = o.getRoadnet().getLaneEndPoint();
if (Objects.nonNull(laneEndPoint)) { if (Objects.nonNull(laneEndPoint)) {
...@@ -140,7 +148,7 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -140,7 +148,7 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
VehicleHeadTimeModel timeModel = new VehicleHeadTimeModel(); VehicleHeadTimeModel timeModel = new VehicleHeadTimeModel();
timeModel.setLaneId(laneId); timeModel.setLaneId(laneId);
timeModel.setCrossId(lastCar.getRoadnet().getCrossId()); timeModel.setCrossId(crossId);
timeModel.setRid(lastCar.getRoadnet().getRid()); timeModel.setRid(lastCar.getRoadnet().getRid());
timeModel.setDir(dir); timeModel.setDir(dir);
timeModel.setHeadTime(time); timeModel.setHeadTime(time);
...@@ -158,13 +166,13 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -158,13 +166,13 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
} }
} }
//存在绿灯倒计时结束 // 存在绿灯倒计时结束
if(!expiredGreenLightMap.isEmpty()){ if(!expiredGreenLightMap.isEmpty()){
for(Map.Entry<Integer, Integer> expiredLight: expiredGreenLightMap.entrySet()){ for(Map.Entry<Integer, Integer> expiredLight: expiredGreenLightMap.entrySet()){
Integer dir = expiredLight.getKey(); Integer dir = expiredLight.getKey();
Integer totalGreenLightTime = expiredLight.getValue(); Integer totalGreenLightTime = expiredLight.getValue();
//筛选出当前进口车道的统计结果 // 筛选当前方向的统计结果
List<VehicleHeadTimeModel> stats = StreamSupport.stream(resultState.values().spliterator(), false) List<VehicleHeadTimeModel> stats = StreamSupport.stream(resultState.values().spliterator(), false)
.filter(stat -> stat.getDir().compareTo(dir) == 0) .filter(stat -> stat.getDir().compareTo(dir) == 0)
.collect(Collectors.toList()); .collect(Collectors.toList());
...@@ -188,31 +196,112 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, ...@@ -188,31 +196,112 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
} }
if(!stats.isEmpty()){ if(!stats.isEmpty()){
double average = stats.stream().mapToDouble(VehicleHeadTimeModel::getHeadTime).summaryStatistics().getAverage(); double average = stats.stream()
.mapToDouble(VehicleHeadTimeModel::getHeadTime)
.summaryStatistics()
.getAverage();
double index = (totalGreenLightTime - average) / totalGreenLightTime; double index = (totalGreenLightTime - average) / totalGreenLightTime;
double convertedIndex = convertIndex(index); // 转换非协调方向拥堵指数
String indexName; String indexName;
if(index < 0.6){ String congestionType;
if(convertedIndex <= 1.5){
indexName = "畅通"; indexName = "畅通";
}else if(index < 0.8){ congestionType = CongestEnum.NO_CONGEST.getType();
indexName = "缓行"; }else if(convertedIndex <= 2.0){
}else { indexName = "轻度拥堵";
indexName = "拥堵"; congestionType = CongestEnum.LIGHT_CONGEST.getType();
}else if(convertedIndex < 3.0){
indexName = "中度拥堵";
congestionType = CongestEnum.MODERATE_CONGEST.getType();
}else{
indexName = "重度拥堵";
congestionType = CongestEnum.HEAVY_CONGEST.getType();
} }
RidIndexResultOfEastAndWest stat = new RidIndexResultOfEastAndWest(); CongestionResult stat = new CongestionResult();
stat.setCrossId(crossId); stat.setCrossId(crossId);
stat.setRid(stats.get(0).getRid()); stat.setRid(stats.get(0).getRid());
stat.setIndex(Double.parseDouble(String.format("%.2f", index))); stat.setIndex(Double.parseDouble(String.format("%.2f", convertedIndex)));
stat.setIndexName(indexName); stat.setIndexName(indexName);
stat.setGlobalTimeStamp(ctx.timestamp()); stat.setCongestionType(congestionType);
stat.setTimestamp(DateUtil.toDateTime(stat.getGlobalTimeStamp(), DateUtil.YYYY_MM_DD_HH_MM_SS)); stat.setGlobalTimeStamp(new Date().getTime());
stat.setEndTime(ctx.timestamp());
stat.setStartTime(stat.getEndTime() - totalGreenLightTime * 1000); stat.setDir(dir);
boolean isCongestion = convertedIndex > 1.5;
Integer stateKey = dir;
if (isCongestion) {
try {
if (!dirState.contains(stateKey)) {
StartDuration startDuration = new StartDuration();
// 获取当前的时间戳
Instant now = Instant.now();
// 获取绿灯前的时间戳
Instant before = now.minus(totalGreenLightTime, ChronoUnit.SECONDS);
// 转换成毫秒
long beforeMilli = before.toEpochMilli();
stat.setStartTime(beforeMilli);
startDuration.setStartTime(beforeMilli);
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - beforeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
stat.setDuration(durationInt);
startDuration.setDuration(durationInt);
dirState.put(stateKey, startDuration);
} else {
StartDuration startDuration = dirState.get(stateKey);
Long startTimeMilli = startDuration.getStartTime();
stat.setStartTime(startTimeMilli);
Instant now = Instant.now();
long nowMilli = now.toEpochMilli();
long durationMilli = nowMilli - startTimeMilli;
int durationInt = (int) durationMilli / (1000 * 60);
startDuration.setDuration(durationInt);
stat.setDuration(durationInt);
dirState.put(stateKey, startDuration);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
// 删除状态
try {
Iterator<Integer> dirStateKeysIter = dirState.keys().iterator();
while (dirStateKeysIter.hasNext()){
Integer key = dirStateKeysIter.next();
if(key.equals(stateKey)){
dirStateKeysIter.remove();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
out.collect(stat); out.collect(stat);
} }
} }
} }
}
private double convertIndex(double saturation) {
if (saturation < 0.6) {
return 1.5;
} else if (saturation < 0.7) {
return linearInterpolation(saturation, 0.6, 0.7, 1.5, 2.0);
} else if (saturation < 0.8) {
return linearInterpolation(saturation, 0.7, 0.8, 2.0, 3.0);
} else {
return 3.0;
}
}
private double linearInterpolation(double x, double x0, double x1, double y0, double y1) {
return y0 + (x - x0) * ((y1 - y0) / (x1 - x0));
} }
} }
...@@ -14,7 +14,6 @@ import org.springframework.util.CollectionUtils; ...@@ -14,7 +14,6 @@ import org.springframework.util.CollectionUtils;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -36,7 +35,8 @@ public class CarTrackInCrossRoadFlatMap implements FlatMapFunction<FrameModel, C ...@@ -36,7 +35,8 @@ public class CarTrackInCrossRoadFlatMap implements FlatMapFunction<FrameModel, C
//判断车辆轨迹数据在指定区域内 //判断车辆轨迹数据在指定区域内
Map<String, List<CrossExitInfo>> crossExitMap = CarTrajectoryIndexMain.crossExitMap; Map<String, List<CrossExitInfo>> crossExitMap = CarTrajectoryIndexMain.crossExitMap;
for(Map.Entry<String, List<CrossExitInfo>> entry: crossExitMap.entrySet()){ for(Map.Entry<String, List<CrossExitInfo>> entry: crossExitMap.entrySet()){
Map<String, String> ridAndAreaCoordinates = entry.getValue().stream().collect(Collectors.toMap(CrossExitInfo::getRid, CrossExitInfo::getCoordinates)); Map<String, String> ridAndAreaCoordinates = entry.getValue().stream()
.collect(Collectors.toMap(CrossExitInfo::getRid, CrossExitInfo::getCoordinates));
if(ridAndAreaCoordinates.containsKey(car.getRoadnet().getRid())){ if(ridAndAreaCoordinates.containsKey(car.getRoadnet().getRid())){
String coordinates = ridAndAreaCoordinates.get(car.getRoadnet().getRid()); String coordinates = ridAndAreaCoordinates.get(car.getRoadnet().getRid());
boolean inPolygon = GeomsConvertUtil.isInPolygon(car.getLongitude(), car.getLatitude(), coordinates); boolean inPolygon = GeomsConvertUtil.isInPolygon(car.getLongitude(), car.getLatitude(), coordinates);
......
...@@ -9,7 +9,10 @@ import lombok.Data; ...@@ -9,7 +9,10 @@ import lombok.Data;
* @Description : * @Description :
*/ */
@Data @Data
public class RidIndexResultOfEastAndWest { public class CongestionResult {
private Integer dir;
private String rid; private String rid;
//该路段的结束路口 //该路段的结束路口
private String crossId; private String crossId;
...@@ -27,4 +30,7 @@ public class RidIndexResultOfEastAndWest { ...@@ -27,4 +30,7 @@ public class RidIndexResultOfEastAndWest {
private Long startTime; private Long startTime;
private Long endTime; private Long endTime;
private Integer duration;
} }
package com.wanji.indicators.task.trajectory.pojo;
import lombok.Data;
import java.util.List;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/27 10:03
* @Description :
*/
@Data
public class OverFlowAndDeadlockEvent {
private String crossId;
private List<OverFlowDetail> details;
private boolean deadlock;
private Long overflowStartTime;
private Integer overflowDuration;
private Double overflowIndex;
private Long deadlockStartTime;
private Integer deadlockDuration;
private Long globalTimeStamp;
private String dateTime;
private Long endTime;
}
...@@ -12,6 +12,13 @@ import lombok.Data; ...@@ -12,6 +12,13 @@ import lombok.Data;
public class OverFlowDetail { public class OverFlowDetail {
//出现溢出的路口出口所在路段名称 //出现溢出的路口出口所在路段名称
private String rid; private String rid;
private String crossId;
private Long startTime;
private Integer duration;
//溢出指数 //溢出指数
private Double index; private Double index;
//溢出发生时间戳 //溢出发生时间戳
......
package com.wanji.indicators.task.trajectory.pojo;
import java.util.List;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/27 10:03
* @Description :
*/
public class OverFlowIndexResult {
//发生溢出的路口id
private String crossId;
private List<OverFlowDetail> details;
private boolean isDeadLock;
private Long globalTimeStamp;
private String dateTime;
private Long startTime;
private Long endTime;
public String getCrossId() {
return crossId;
}
public void setCrossId(String crossId) {
this.crossId = crossId;
}
public List<OverFlowDetail> getDetails() {
return details;
}
public void setDetails(List<OverFlowDetail> details) {
this.details = details;
}
public boolean isDeadLock() {
return isDeadLock;
}
public void setDeadLock(boolean deadLock) {
isDeadLock = deadLock;
}
public Long getGlobalTimeStamp() {
return globalTimeStamp;
}
public void setGlobalTimeStamp(Long globalTimeStamp) {
this.globalTimeStamp = globalTimeStamp;
}
public String getDateTime() {
return dateTime;
}
public void setDateTime(String dateTime) {
this.dateTime = dateTime;
}
public Long getStartTime() {
return startTime;
}
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
public Long getEndTime() {
return endTime;
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
}
...@@ -19,7 +19,7 @@ public class PhaseEmptyResult { ...@@ -19,7 +19,7 @@ public class PhaseEmptyResult {
private String turn; private String turn;
//转向空放时间/绿灯总时长 //转向空放时间/绿灯总时长
private double index; private Double index;
private Long globalTimeStamp; private Long globalTimeStamp;
...@@ -27,5 +27,5 @@ public class PhaseEmptyResult { ...@@ -27,5 +27,5 @@ public class PhaseEmptyResult {
private Long endTime; private Long endTime;
private int duration; private Integer duration;
} }
package com.wanji.indicators.task.trajectory.pojo;
import lombok.Data;
@Data
public class StartDuration {
// 开始时间(毫秒时间戳)
private Long startTime;
// 持续时长(分钟)
private Integer duration;
}
...@@ -16,6 +16,8 @@ public class UnbalanceResult { ...@@ -16,6 +16,8 @@ public class UnbalanceResult {
//失衡发生时间 //失衡发生时间
private String timestamp; private String timestamp;
private Integer duration;
private Long globalTimeStamp; private Long globalTimeStamp;
private Long startTime; private Long startTime;
......
...@@ -44,19 +44,24 @@ cross.event.data.topic=analysis.cross.event ...@@ -44,19 +44,24 @@ cross.event.data.topic=analysis.cross.event
plate.prefix= plate.prefix=
#路段默认自由流速度配置值 km/h #路段默认自由流速度配置值 km/h
rid.default.free.speed=80 rid.default.free.speed=80
#东西方向路段拥堵指数统计 # 方向级别拥堵数据
rid.traffic.index.analysis.topic=rid.traffic.index.analysis dir.congestion.index.topic=dir.congestion.index
#南北方向饱和度计算 # 路口级别拥堵数据
cross.congestion.index.topic=cross.congestion.index
#非协调方向饱和度计算
rid.traffic.index.north.south.topic=rid.traffic.index.north.south.analysis rid.traffic.index.north.south.topic=rid.traffic.index.north.south.analysis
#东西方向路段的rid和方向 # rid和终点路口进口方向
east.west.rid.direction.list=13NED0B5Q9013NF80B5QN00:6,13NGH0B5RC013NF80B5QN00:2,13NI00B5RM013NGH0B5RC00:3,13NF80B5QN013NGH0B5RC00:6,13NGH0B5RC013NI00B5RM00:7,13NID0B5RM013NI00B5RM00:3 rid.end.indir.list=13NED0B5Q9013NF80B5QN00:6,13NGH0B5RC013NF80B5QN00:2,13NI00B5RM013NGH0B5RC00:3,13NF80B5QN013NGH0B5RC00:6,13NGH0B5RC013NI00B5RM00:7,13NID0B5RM013NI00B5RM00:3,13NG40B5SK013NI00B5RM00:1,13NEH0B5RJ013NGH0B5RC00:1,13NEP0B5QJ013NGH0B5RC00:5,13NDG0B5RI013NF80B5QN00:1,13NDT0B5Q9013NF80B5QN00:4
#南北方向路段的rid和方向 # rid和起点路口出口方向
north.south.rid.direction.list=13NG40B5SK013NI00B5RM00:1,13NEH0B5RJ013NGH0B5RC00:1,13NEP0B5QJ013NGH0B5RC00:5,13NDG0B5RI013NF80B5QN00:1,13NDT0B5Q9013NF80B5QN00:4 rid.start.outdir.list=13NF80B5QN013NED0B5Q900:6,13NF80B5QN013NGH0B5RC00:2,13NGH0B5RC013NI00B5RM00:3,13NGH0B5RC013NF80B5QN00:6,13NI00B5RM013NGH0B5RC00:7,13NI00B5RM013NID0B5RM00:3,13NI00B5RM013NG40B5SK00:1,13NGH0B5RC013NEH0B5RJ00:1,13NGH0B5RC013NEP0B5QJ00:5,13NF80B5QN013NDG0B5RI00:1,13NF80B5QN013NDT0B5Q900:4
# 路口ID-方向-转向对应的行人灯通过时长
cross.dir.turn.ped.green=13NF80B5QN0-1-2:20,13NF80B5QN0-2-2:15,13NF80B5QN0-4-2:20,13NF80B5QN0-6-2:15,13NGH0B5RC0-1-2:30,13NGH0B5RC0-3-2:30,13NGH0B5RC0-5-2:30,13NGH0B5RC0-6-2:25,13NI00B5RM0-3-2:30,13NI00B5RM0-7-2:30,13NI00B5RM0-1-1:30
#路口溢出评价指标 #路口溢出评价指标
road.overflow.avg.speed=5.0 road.overflow.avg.speed=5.0
road.overflow.duration=3 road.overflow.duration=3000
road.overflow.deadlock.index.analysis.topic=crossroad.overflow.deadlock.index.analysis road.overflow.deadlock.index.analysis.topic=crossroad.overflow.deadlock.index.analysis
#路口失衡topic #路口失衡topic
...@@ -69,8 +74,10 @@ cross.road.deadlock.avg.speed=5.0 ...@@ -69,8 +74,10 @@ cross.road.deadlock.avg.speed=5.0
#相位灯的状态数据 #相位灯的状态数据
light.status.topic=cross_lights_status light.status.topic=cross_lights_status
#虚拟路口区域 # 虚拟路口区域
# 霞景路西,旅游路与回龙山路交叉口
virtual.crossroad.13NED0B5Q90=13NED0B5Q90:117.08503591467242,36.64125732273356;117.08495255127629,36.641426722875224;117.08499878952986,36.641454206982246;117.08508543702352,36.641286700399476 virtual.crossroad.13NED0B5Q90=13NED0B5Q90:117.08503591467242,36.64125732273356;117.08495255127629,36.641426722875224;117.08499878952986,36.641454206982246;117.08508543702352,36.641286700399476
# 福地街东,旅游路与隧道交叉口
virtual.crossroad.13NH20B5RH0=13NH20B5RH0:117.09669255282627,36.644871615002884;117.09669985552095,36.645055610398025;117.09675474612689,36.64505324101935;117.09674606214631,36.64486930675771 virtual.crossroad.13NH20B5RH0=13NH20B5RH0:117.09669255282627,36.644871615002884;117.09669985552095,36.645055610398025;117.09675474612689,36.64505324101935;117.09674606214631,36.64486930675771
#相位空放topic #相位空放topic
......
...@@ -5,9 +5,9 @@ ...@@ -5,9 +5,9 @@
## 2. 指标说明 ## 2. 指标说明
### 2.1 拥堵 ### 2.1 拥堵
拥堵指标计算分为东西方向和南北方向 拥堵指标计算分为协调方向方向和非协调方向
- 东西方向拥堵指数计算会针对每个路口的进口路段进行计算,统计每个路段的拥堵指数,并根据拥堵指数进行阈值判断后,获取该路段的交通状态 - 协调方向拥堵指数计算会针对每个路口的进口路段进行计算,统计每个路段的拥堵指数,并根据拥堵指数进行阈值判断后,获取该路段的交通状态
- 南北方向的拥堵指数计算是根据饱和度计算绿灯时间内,车辆空档时间。根据算法文档给出的计算公式进行相关计算后,判断其阈值范围,从而获取该路段的交通状态 - 非协调方向的拥堵指数计算:根据绿灯时间内车辆空档时间,计算饱和度。根据算法文档阈值判断拥堵等级。
### 2.2 失衡 ### 2.2 失衡
- 根据算法指标计算文档给出的计算公式,计算每个路口是否处于失衡状态 - 根据算法指标计算文档给出的计算公式,计算每个路口是否处于失衡状态
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment