Commit 545be8ca authored by hanbing's avatar hanbing

保存方向级别拥堵信息

parent 9b89cea1
......@@ -78,6 +78,11 @@ public class Constant {
*/
public static final String PEDESTRAIN_LIGHT_CODE = "20";
/**
* 右转灯转向
*/
public static final String RIGHT_LIGHT_CODE = "3";
/**
* 入侵结果
*/
......
......@@ -10,12 +10,13 @@ import com.wanji.indicators.source.LaneInfoSource;
import com.wanji.indicators.task.lightstatus.BindCrossLaneInfoCoFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.task.trajectory.func.CarDataValidatorFlatMap;
import com.wanji.indicators.task.trajectory.func.CoordCongestionDataFunction;
import com.wanji.indicators.task.trajectory.func.CrossRoadLightStatusCoFlatMap;
import com.wanji.indicators.task.trajectory.func.PhaseEmptyTimeProcessFunction;
import com.wanji.indicators.task.trajectory.func.VehicleGapTimeProcessFunction;
import com.wanji.indicators.task.trajectory.helper.FileReadingHelper;
import com.wanji.indicators.task.trajectory.pojo.*;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.task.trajectory.pojo.CrossExitInfo;
import com.wanji.indicators.task.trajectory.pojo.RidIndexResult;
import com.wanji.indicators.task.trajectory.pojo.SingleCarInfo;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
......@@ -26,8 +27,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
......@@ -39,9 +38,7 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
......@@ -50,13 +47,9 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.util.CollectionUtils;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
......@@ -71,7 +64,7 @@ public class CarTrajectoryIndexMain {
public static final Map<String,String> virtualCrossRoadArea = new HashMap<>();
private static final Map<String, Double> ridFreeTimeMap = new HashMap<>();
public static final Map<String, Double> ridFreeTimeMap = new HashMap<>();
public static final Map<String, List<CrossExitInfo>> crossExitMap = new ConcurrentHashMap<>();
......@@ -79,6 +72,10 @@ public class CarTrajectoryIndexMain {
private static final PropertiesHelper instance = PropertiesHelper.getInstance();
// 路段ID和方向对应关系
public static final Map<String, Integer> ridDirMap = new ConcurrentHashMap<>();
static {
String listOfCrossId = properties.getProperty("full.area.cross.list");
crossIdSet = Arrays.stream(listOfCrossId.split(",")).collect(Collectors.toSet());
......@@ -129,6 +126,18 @@ public class CarTrajectoryIndexMain {
ridFreeTimeMap.put("13NF80B5QN013NED0B5Q900", 5.0);
}
static {
String str = properties.getProperty("coord.rid.direction.list");
String[] pairs = str.split(",");
for (String pair : pairs) {
String[] keyValue = pair.split(":");
String key = keyValue[0];
Integer value = Integer.valueOf(keyValue[1]);
ridDirMap.put(key, value);
}
}
public static void main(String[] args) {
try {
//获取配置文件中的kafka消费topic
......@@ -162,7 +171,7 @@ public class CarTrajectoryIndexMain {
SingleOutputStreamOperator<CarTrackModel> qualifiedStream = frameModelStream.flatMap(new CarDataValidatorFlatMap())
.setParallelism(1)
.name("校验轨迹帧数据-ToCarTrackModel");
.name("完善轨迹帧数据字段-ToCarTrackModel");
SingleOutputStreamOperator<CarTrackModel> filteredStream = qualifiedStream.filter(new FilterFunction<CarTrackModel>() {
@Override
......@@ -215,9 +224,9 @@ public class CarTrajectoryIndexMain {
}).setParallelism(2).name("统计每辆车通过路口的通行时间");
//对同一窗口的统计数据进行处理
SingleOutputStreamOperator<List<RidIndexResultOfEastAndWest>> trafficIndexStream = singleCarStatStream
SingleOutputStreamOperator<List<RidIndexResult>> trafficIndexStream = singleCarStatStream
.keyBy(SingleCarInfo::getWindowEndTime)
.process(new KeyedProcessFunction<Long, SingleCarInfo, List<RidIndexResultOfEastAndWest>>() {
.process(new KeyedProcessFunction<Long, SingleCarInfo, List<RidIndexResult>>() {
private ListState<SingleCarInfo> listState;
@Override
......@@ -226,7 +235,7 @@ public class CarTrajectoryIndexMain {
}
@Override
public void processElement(SingleCarInfo singleCarInfo, Context context, Collector<List<RidIndexResultOfEastAndWest>> collector) throws Exception {
public void processElement(SingleCarInfo singleCarInfo, Context context, Collector<List<RidIndexResult>> collector) throws Exception {
//将数据保存到列表中
listState.add(singleCarInfo);
//注册windowEnd + 1ms的定时器
......@@ -234,19 +243,20 @@ public class CarTrajectoryIndexMain {
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<RidIndexResultOfEastAndWest>> out) throws Exception {
public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<RidIndexResult>> out) throws Exception {
//存储每个路口进口车道的拥堵统计结果
List<RidIndexResultOfEastAndWest> results = new ArrayList<>();
List<RidIndexResult> results = new ArrayList<>();
long windowEnd = timestamp - 1;
//从列表中获取所有统计数据
List<SingleCarInfo> list = StreamSupport.stream(listState.get().spliterator(), false).collect(Collectors.toList());
//清空资源
listState.clear();
Map<String, List<SingleCarInfo>> mapGroupByRid = list.stream().collect(Collectors.groupingBy(SingleCarInfo::getRid));
Map<String, List<SingleCarInfo>> mapGroupByRid = list.stream()
.collect(Collectors.groupingBy(SingleCarInfo::getRid));
mapGroupByRid.forEach((rid, value) -> {
if(ridFreeTimeMap.containsKey(rid)){
RidIndexResultOfEastAndWest result = new RidIndexResultOfEastAndWest();
RidIndexResult result = new RidIndexResult();
result.setRid(rid);
result.setCrossId(value.get(0).getCrossId());
double avgIndex = value.stream().mapToLong(SingleCarInfo::getTransitTime).summaryStatistics().getAverage() / 1000;
......@@ -256,13 +266,13 @@ public class CarTrajectoryIndexMain {
String indexName;
String congestionType;
int count = value.size();//通过当前路段的车辆数
if(count > 10 && index > 1.8 && index <= 2.5){
if(count > 10 && index > 1.5 && index <= 2.0){
indexName = "轻度拥堵";
congestionType = CongestEnum.LIGHT_CONGEST.getType();
}else if(count > 10 && index > 2.5 && index <= 3.5){
}else if(count > 10 && index > 2.0 && index <= 3.0){
indexName = "中度拥堵";
congestionType = CongestEnum.MODERATE_CONGEST.getType();
}else if(count > 10 && index > 3.5){
}else if(count > 10 && index > 3.0){
indexName = "重度拥堵";
congestionType = CongestEnum.HEAVY_CONGEST.getType();
}else{
......@@ -271,10 +281,6 @@ public class CarTrajectoryIndexMain {
}
result.setIndexName(indexName);
result.setCongestionType(congestionType);
result.setTimestamp(DateUtil.toDateTime(windowEnd, DateUtil.YYYY_MM_DD_HH_MM_SS));
result.setGlobalTimeStamp(windowEnd);
result.setStartTime(windowEnd - 60000);
result.setEndTime(windowEnd);
results.add(result);
}
......@@ -283,39 +289,36 @@ public class CarTrajectoryIndexMain {
}
}).setParallelism(1).name("同窗口内每个路口的车辆通行时间");
SingleOutputStreamOperator<String> toJsonStream = trafficIndexStream.flatMap(new FlatMapFunction<List<RidIndexResultOfEastAndWest>, String>() {
SingleOutputStreamOperator<RidIndexResult> splitCongestionStream = trafficIndexStream.flatMap(new FlatMapFunction<List<RidIndexResult>, RidIndexResult>() {
@Override
public void flatMap(List<RidIndexResult> results, Collector<RidIndexResult> collector) throws Exception {
for (RidIndexResult result : results) {
collector.collect(result);
}
}
}).setParallelism(1).name("将组合后的数据重新拆分开");
SingleOutputStreamOperator<RidIndexResult> finalCoordCongestion = splitCongestionStream
.keyBy(RidIndexResult::getRid)
.process(new CoordCongestionDataFunction())
.setParallelism(1)
.name("给拥堵场景增加开始时间和持续时长");
SingleOutputStreamOperator<String> toJsonStream = finalCoordCongestion.flatMap(new FlatMapFunction<RidIndexResult, String>() {
@Override
public void flatMap(List<RidIndexResultOfEastAndWest> results, Collector<String> collector) throws Exception {
//根据crossId进行重新分组
Map<String, List<RidIndexResultOfEastAndWest>> groupMapByCrossId = results.stream().collect(Collectors.groupingBy(RidIndexResultOfEastAndWest::getCrossId));
String value = JSONObject.toJSONString(groupMapByCrossId);
public void flatMap(RidIndexResult result, Collector<String> collector) {
String value = JSONObject.toJSONString(result);
collector.collect(value);
}
}).setParallelism(1).name("东西方向拥堵指数统计结果根据路口进行分组-路段拥堵指数统计转成json字符串");
}).setParallelism(1).name("协调方向拥堵指数统计结果转json字符串");
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(properties.getProperty("bootstrap.servers"),
properties.getProperty("rid.traffic.index.analysis.topic"), new SimpleStringSchema());
toJsonStream.addSink(kafkaProducer).setParallelism(1).name("将拥堵指数指标发送至kafka");
toJsonStream.addSink(kafkaProducer).setParallelism(1).name("将协调方向拥堵数据发送至kafka");
//写入mysql
SingleOutputStreamOperator<RidIndexResultOfEastAndWest> splitCongestionStream = trafficIndexStream.flatMap(new FlatMapFunction<List<RidIndexResultOfEastAndWest>, RidIndexResultOfEastAndWest>() {
@Override
public void flatMap(List<RidIndexResultOfEastAndWest> results, Collector<RidIndexResultOfEastAndWest> collector) throws Exception {
for (RidIndexResultOfEastAndWest result : results) {
collector.collect(result);
}
}
}).setParallelism(1).name("将组合后的数据重新拆分开");
SingleOutputStreamOperator<RidIndexResultOfEastAndWest> trafficEvents = splitCongestionStream.filter(new FilterFunction<RidIndexResultOfEastAndWest>() {
@Override
public boolean filter(RidIndexResultOfEastAndWest ridIndexResultOfEastAndWest) throws Exception {
return !ridIndexResultOfEastAndWest.getCongestionType().equals(CongestEnum.NO_CONGEST.getType());
}
}).setParallelism(1).name("过滤掉非拥堵事件");
JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
.withMaxRetries(3)//异常重试次数
......@@ -331,43 +334,43 @@ public class CarTrajectoryIndexMain {
.withConnectionCheckTimeoutSeconds(60)
.build();
trafficEvents.addSink(JdbcSink.sink(
"insert into t_event_info (detect_time, type, rid, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?,?)",
new JdbcStatementBuilder<RidIndexResultOfEastAndWest>() {
@Override
public void accept(PreparedStatement preparedStatement, RidIndexResultOfEastAndWest carRecordInfo) throws SQLException {
preparedStatement.setTimestamp(1, new Timestamp(carRecordInfo.getGlobalTimeStamp()));
String type = "501";
switch (carRecordInfo.getIndexName()){
case "畅通":
type = "501";
break;
case "轻度拥堵":
type = "502";
break;
case "中度拥堵":
type = "503";
break;
case "重度拥堵":
type = "504";
break;
}
preparedStatement.setString(2, type);
preparedStatement.setString(3, carRecordInfo.getRid());
preparedStatement.setString(4, carRecordInfo.getCrossId());
String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(5, eventSerialNumber);
preparedStatement.setString(6, "5");
preparedStatement.setTimestamp(7, new Timestamp(carRecordInfo.getStartTime()));
preparedStatement.setTimestamp(8, new Timestamp(carRecordInfo.getEndTime()));
preparedStatement.setInt(9, 1);
}
}, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1)
.name("写入mysql");
//计算南北方向饱和度
// trafficEvents.addSink(JdbcSink.sink(
// "insert into t_event_info (detect_time, type, rid, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?,?)",
// new JdbcStatementBuilder<RidIndexResult>() {
// @Override
// public void accept(PreparedStatement preparedStatement, RidIndexResult carRecordInfo) throws SQLException {
// preparedStatement.setTimestamp(1, new Timestamp(carRecordInfo.getGlobalTimeStamp()));
// String type = "501";
// switch (carRecordInfo.getIndexName()){
// case "畅通":
// type = "501";
// break;
// case "轻度拥堵":
// type = "502";
// break;
// case "中度拥堵":
// type = "503";
// break;
// case "重度拥堵":
// type = "504";
// break;
// }
// preparedStatement.setString(2, type);
// preparedStatement.setString(3, carRecordInfo.getRid());
// preparedStatement.setString(4, carRecordInfo.getCrossId());
// String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
// preparedStatement.setString(5, eventSerialNumber);
// preparedStatement.setString(6, "5");
// preparedStatement.setTimestamp(7, new Timestamp(carRecordInfo.getStartTime()));
// preparedStatement.setTimestamp(8, new Timestamp(carRecordInfo.getEndTime()));
// preparedStatement.setInt(9, 1);
// }
// }, jdbcExecutionOptions, jdbcBuild))
// .setParallelism(1)
// .name("写入mysql");
// 计算非协调方向饱和度
String lightStatusTopic = properties.getProperty("light.status.topic");
KafkaSource<String> lightStatusSource = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
......@@ -402,308 +405,307 @@ public class CarTrajectoryIndexMain {
.setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定");
SingleOutputStreamOperator<RidIndexResultOfEastAndWest> northAndSouthTrafficIndex = joinLaneStream
SingleOutputStreamOperator<RidIndexResult> finalNoCoordCongestion = joinLaneStream
.keyBy(CrossFrameModel::getCrossId)
.process(new VehicleGapTimeProcessFunction())
.setParallelism(1)
.name("绿灯时间内,南北方向饱和度计算");
.name("非协调方向拥堵数据计算");
SingleOutputStreamOperator<String> toJsonStreamOfNorthAndSouth = northAndSouthTrafficIndex.flatMap(new FlatMapFunction<RidIndexResultOfEastAndWest, String>() {
SingleOutputStreamOperator<String> toJsonStreamOfNoCoord = finalNoCoordCongestion.flatMap(new FlatMapFunction<RidIndexResult, String>() {
@Override
public void flatMap(RidIndexResultOfEastAndWest result, Collector<String> collector) throws Exception {
public void flatMap(RidIndexResult result, Collector<String> collector) throws Exception {
collector.collect(JSONObject.toJSONString(result));
}
}).setParallelism(1).name("南北饱和度计算指标转成json字符串");
FlinkKafkaProducer<String> kafkaProducer2 = new FlinkKafkaProducer<>(properties.getProperty("bootstrap.servers"),
properties.getProperty("rid.traffic.index.north.south.topic"), new SimpleStringSchema());
toJsonStreamOfNorthAndSouth.addSink(kafkaProducer2).setParallelism(1).name("将南北饱和度指标发送至kafka");
//计算相位空放
SingleOutputStreamOperator<PhaseEmptyResult> phaseEmptyStream = joinLaneStream
.keyBy(CrossFrameModel::getCrossId)
.process(new PhaseEmptyTimeProcessFunction())
.setParallelism(1)
.name("相位空放计算-每个路口绿灯放行方向");
SingleOutputStreamOperator<String> phaseEmptyJsonStream = phaseEmptyStream.flatMap(new FlatMapFunction<PhaseEmptyResult, String>() {
@Override
public void flatMap(PhaseEmptyResult phaseEmptyResult, Collector<String> collector) throws Exception {
String value = JSONObject.toJSONString(phaseEmptyResult);
collector.collect(value);
}
}).setParallelism(1).name("相位空放结果转成json字符串");
String phaseEmptyTopic = properties.getProperty("phase.emptiness.topic");
phaseEmptyJsonStream.addSink(new FlinkKafkaProducer<String>(
properties.getProperty("bootstrap.servers"),
phaseEmptyTopic,
new SimpleStringSchema()))
.setParallelism(1)
.name("将相位空放计算结果发送至kafka");
//相位空放结果写入mysql
phaseEmptyStream.addSink(JdbcSink.sink(
"insert into t_event_info (detect_time, type, rid, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?,?)",
new JdbcStatementBuilder<PhaseEmptyResult>() {
@Override
public void accept(PreparedStatement preparedStatement, PhaseEmptyResult phaseEmptyResult) throws SQLException {
preparedStatement.setTimestamp(1, new Timestamp(phaseEmptyResult.getGlobalTimeStamp()));
preparedStatement.setString(2, "701");
preparedStatement.setString(3, phaseEmptyResult.getRid());
preparedStatement.setString(4, phaseEmptyResult.getCrossId());
String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(5, eventSerialNumber);
preparedStatement.setString(6, "1");
preparedStatement.setTimestamp(7, new Timestamp(phaseEmptyResult.getStartTime()));
preparedStatement.setTimestamp(8, new Timestamp(phaseEmptyResult.getEndTime()));
preparedStatement.setInt(9, phaseEmptyResult.getDuration());
}
}, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1)
.name("将相位空放事件写入mysql");
//计算溢出
SingleOutputStreamOperator<CarTrackModel> carTrackStreamInCrossRoad = frameModelStream
.flatMap(new CarTrackInCrossRoadFlatMap())
.setParallelism(1)
.name("过滤出位于路口和指定路段区域带有车牌的车辆轨迹数据");
//计算1分钟内的路口溢出指数和是否死锁
double avgSpeedEvaluationIndex = Double.parseDouble(properties.getProperty("road.overflow.avg.speed"));
int durationEvaluationIndex = Integer.parseInt(properties.getProperty("road.overflow.duration"));
int carNumber = Integer.parseInt(properties.getProperty("cross.road.deadlock.car.number"));
double crossAvgSpeedIndex = Double.parseDouble(properties.getProperty("cross.road.deadlock.avg.speed"));
SingleOutputStreamOperator<OverFlowIndexResult> overFlowIndexStream = carTrackStreamInCrossRoad
.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(carTrackModel -> carTrackModel.getRoadnet().getCrossId())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new ProcessWindowFunction<CarTrackModel, OverFlowIndexResult, String, TimeWindow>() {
@Override
public void process(String crossId, ProcessWindowFunction<CarTrackModel, OverFlowIndexResult, String, TimeWindow>.Context context, Iterable<CarTrackModel> iterable, Collector<OverFlowIndexResult> collector) throws Exception {
List<CarTrackModel> carInCrossRoadList = StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
if(!CollectionUtils.isEmpty(carInCrossRoadList)){
//过滤出位于指定区域内的数据
List<CarTrackModel> carTracesInSpecificArea = carInCrossRoadList.stream().filter(car -> car.getIsInSpecificArea() == 1).collect(Collectors.toList());
List<CarTrackModel> carTracesInCrossRoad = carInCrossRoadList.stream().filter(car -> car.getRoadnet().getInCrossFlag() == 1).collect(Collectors.toList());
//根据路口进行区域划分
Map<String, List<CarTrackModel>> groupMapInExitArea = carTracesInSpecificArea.stream().collect(Collectors.groupingBy(car -> car.getRoadnet().getRid()));
//对每个路口区域进行统计
if(!groupMapInExitArea.isEmpty()){
List<OverFlowDetail> details = new ArrayList<>();
groupMapInExitArea.forEach((rid, cars) -> {
//统计路口出口内的车辆数量
Map<String, List<CarTrackModel>> carMapGroupByPlate = cars.stream().collect(Collectors.groupingBy(CarTrackModel::getPicLicense));
if(carMapGroupByPlate.size() > 5){
//按照轨迹上报时间进行升序排列
List<CarTrackModel> sorted = cars.stream().sorted(Comparator.comparing(CarTrackModel::getGlobalTimeStamp)).collect(Collectors.toList());
//根据时间戳进行分组,找出同一时刻的轨迹数据
Map<Long, List<CarTrackModel>> groupMapByTimestamp = sorted.stream().collect(Collectors.groupingBy(CarTrackModel::getGlobalTimeStamp));
//存放时间戳和其对应的平均速度
Map<Long, Double> map = new HashMap<>();
groupMapByTimestamp.forEach((timestamp, traces) -> {
double avgSpeed = traces.stream().mapToDouble(CarTrackModel::getSpeed).summaryStatistics().getAverage();
map.put(timestamp, avgSpeed);
});
//出口车道的平均速度
double ridAvgSpeed = map.values().stream().mapToDouble(number -> number).summaryStatistics().getAverage();
List<Map.Entry<Long, Double>> entryList = map.entrySet().stream().sorted(Map.Entry.comparingByKey()).collect(Collectors.toList());
int consecutiveSeconds = 0;//连续时间
long overflowTimestamp = entryList.get(0).getKey();//发生溢出的时间戳
for(int i = 0; i < entryList.size() - 1; i++){
Map.Entry<Long, Double> current = entryList.get(i);
Map.Entry<Long, Double> next = entryList.get(i + 1);
if(current.getValue() < avgSpeedEvaluationIndex && next.getValue() < avgSpeedEvaluationIndex){
consecutiveSeconds += Math.toIntExact((next.getKey() - current.getKey()));
overflowTimestamp = next.getKey();
}else{
consecutiveSeconds = 0;
}
}
if(consecutiveSeconds > durationEvaluationIndex){
//计算路口的溢出指数
double index = 30 / ridAvgSpeed;
if(index > 5){
//满足溢出判断的所有条件
OverFlowDetail detail = new OverFlowDetail();
detail.setRid(rid);
detail.setTimestamp(overflowTimestamp);
detail.setDatetime(DateUtil.toDateTime(overflowTimestamp, DateUtil.YYYY_MM_DD_HH_MM_SS));
detail.setIndex(Double.parseDouble(String.format("%.2f", index)));
details.add(detail);
}
}
}
});
//存在路口发生溢出的情况
if(!details.isEmpty()){
OverFlowIndexResult result = new OverFlowIndexResult();
result.setCrossId(crossId);
result.setDetails(details);
//溢出上报时间戳
result.setGlobalTimeStamp(context.window().getEnd());
result.setDateTime(DateUtil.toDateTime(result.getGlobalTimeStamp(), DateUtil.YYYY_MM_DD_HH_MM_SS));
result.setStartTime(context.window().getStart());
result.setEndTime(context.window().getEnd());
//判断该路口死锁
if(details.size() >= 2){
Map<String, List<CarTrackModel>> groupMapByPlate = carTracesInCrossRoad.stream().collect(Collectors.groupingBy(CarTrackModel::getPicLicense));
if(groupMapByPlate.size() > carNumber){
double crossAvgSpeed = carTracesInCrossRoad.stream().mapToDouble(CarTrackModel::getSpeed).summaryStatistics().getAverage();
if(crossAvgSpeed < crossAvgSpeedIndex){
result.setDeadLock(true);
}
}
}
collector.collect(result);
}
}
}
}
})
.setParallelism(2)
.name("统计每个路口的溢出指数");
SingleOutputStreamOperator<String> jsonStreamOverFlowIndex = overFlowIndexStream.flatMap(new FlatMapFunction<OverFlowIndexResult, String>() {
@Override
public void flatMap(OverFlowIndexResult overFlowIndexResult, Collector<String> collector) throws Exception {
collector.collect(JSONObject.toJSONString(overFlowIndexResult));
}
});
String topicOfOverFlow = properties.getProperty("road.overflow.deadlock.index.analysis.topic");
jsonStreamOverFlowIndex.addSink(new FlinkKafkaProducer<String>(
properties.getProperty("bootstrap.servers"),
topicOfOverFlow,
new SimpleStringSchema()
)).setParallelism(1).name("将路口溢出指标推送至kafka");
//将溢出和死锁事件写入mysql
SingleOutputStreamOperator<OverFlowIndexResult> deadLockStream = overFlowIndexStream.filter(new FilterFunction<OverFlowIndexResult>() {
@Override
public boolean filter(OverFlowIndexResult overFlowIndexResult) throws Exception {
return overFlowIndexResult.isDeadLock();
}
}).setParallelism(1).name("过滤出死锁事件");
deadLockStream.addSink(JdbcSink.sink(
"insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?)",
new JdbcStatementBuilder<OverFlowIndexResult>() {
@Override
public void accept(PreparedStatement preparedStatement, OverFlowIndexResult overFlowIndexResult) throws SQLException {
preparedStatement.setTimestamp(1, new Timestamp(overFlowIndexResult.getGlobalTimeStamp()));
preparedStatement.setString(2, "704");
preparedStatement.setString(3, overFlowIndexResult.getCrossId());
String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(4, eventSerialNumber);
preparedStatement.setString(5, "1");
preparedStatement.setTimestamp(6, new Timestamp(overFlowIndexResult.getStartTime()));
preparedStatement.setTimestamp(7, new Timestamp(overFlowIndexResult.getEndTime()));
preparedStatement.setInt(8, 1);
}
}, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1)
.name("将死锁事件写入mysql");
overFlowIndexStream.addSink(JdbcSink.sink(
"insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?)",
new JdbcStatementBuilder<OverFlowIndexResult>() {
@Override
public void accept(PreparedStatement preparedStatement, OverFlowIndexResult overFlowIndexResult) throws SQLException {
preparedStatement.setTimestamp(1, new Timestamp(overFlowIndexResult.getGlobalTimeStamp()));
preparedStatement.setString(2, "703");
preparedStatement.setString(3, overFlowIndexResult.getCrossId());
String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(4, eventSerialNumber);
preparedStatement.setString(5, "1");
preparedStatement.setTimestamp(6, new Timestamp(overFlowIndexResult.getStartTime()));
preparedStatement.setTimestamp(7, new Timestamp(overFlowIndexResult.getEndTime()));
preparedStatement.setInt(8, 1);
}
}, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1)
.name("将溢出事件写入mysql");
//计算失衡
SingleOutputStreamOperator<UnbalanceResult> unbalanceIndexStream = splitCongestionStream
.union(northAndSouthTrafficIndex)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<RidIndexResultOfEastAndWest>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(RidIndexResultOfEastAndWest::getCrossId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.apply(new WindowFunction<RidIndexResultOfEastAndWest, UnbalanceResult, String, TimeWindow>() {
@Override
public void apply(String crossId, TimeWindow timeWindow, Iterable<RidIndexResultOfEastAndWest> iterable, Collector<UnbalanceResult> collector) throws Exception {
List<RidIndexResultOfEastAndWest> collect = StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(collect) && collect.size() > 1) {
//获取最大拥堵指数
double max = collect.stream().mapToDouble(RidIndexResultOfEastAndWest::getIndex).max().getAsDouble();
double min = collect.stream().mapToDouble(RidIndexResultOfEastAndWest::getIndex).min().getAsDouble();
double index = max / min;
if (index >= 1.5) {
UnbalanceResult result = new UnbalanceResult();
result.setCrossId(crossId);
result.setIndex(index);
result.setGlobalTimeStamp(timeWindow.getEnd());
result.setTimestamp(DateUtil.toDateTime(result.getGlobalTimeStamp(), DateUtil.YYYY_MM_DD_HH_MM_SS));
result.setStartTime(timeWindow.getStart());
result.setEndTime(timeWindow.getEnd());
collector.collect(result);
}
}
}
}).setParallelism(2).name("统计路口失衡指数");
SingleOutputStreamOperator<String> jsonStreamOfUnbalance = unbalanceIndexStream.flatMap(new FlatMapFunction<UnbalanceResult, String>() {
@Override
public void flatMap(UnbalanceResult unbalanceResult, Collector<String> collector) throws Exception {
String value = JSONObject.toJSONString(unbalanceResult);
collector.collect(value);
}
}).setParallelism(1).name("将失衡指标数据转成json字符串");
String topicOfUnbalanceIndexCalculation = properties.getProperty("road.unbalance.index.analysis.topic");
jsonStreamOfUnbalance.addSink(new FlinkKafkaProducer<String>(
properties.getProperty("bootstrap.servers"),
topicOfUnbalanceIndexCalculation,
new SimpleStringSchema()
)).setParallelism(1).name("将失衡指数发送至kafka");
//将失衡结果写入mysql
unbalanceIndexStream.addSink(JdbcSink.sink(
"insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?)",
new JdbcStatementBuilder<UnbalanceResult>() {
@Override
public void accept(PreparedStatement preparedStatement, UnbalanceResult unbalanceResult) throws SQLException {
preparedStatement.setTimestamp(1, new Timestamp(unbalanceResult.getGlobalTimeStamp()));
preparedStatement.setString(2, "702");
preparedStatement.setString(3, unbalanceResult.getCrossId());
String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
preparedStatement.setString(4, eventSerialNumber);
preparedStatement.setString(5, "1");
preparedStatement.setTimestamp(6, new Timestamp(unbalanceResult.getStartTime()));
preparedStatement.setTimestamp(7, new Timestamp(unbalanceResult.getEndTime()));
preparedStatement.setInt(8, 1);
}
}, jdbcExecutionOptions, jdbcBuild))
.setParallelism(1)
.name("将失衡事件写入mysql");
}).setParallelism(1).name("协调方向拥堵数据转成json字符串");
toJsonStreamOfNoCoord.addSink(kafkaProducer).setParallelism(1).name("将非协调方向拥堵数据发送至kafka");
// 融合路口级别事件
// //计算相位空放
// SingleOutputStreamOperator<PhaseEmptyResult> phaseEmptyStream = joinLaneStream
// .keyBy(CrossFrameModel::getCrossId)
// .process(new PhaseEmptyTimeProcessFunction())
// .setParallelism(1)
// .name("相位空放计算-每个路口绿灯放行方向");
//
// SingleOutputStreamOperator<String> phaseEmptyJsonStream = phaseEmptyStream.flatMap(new FlatMapFunction<PhaseEmptyResult, String>() {
// @Override
// public void flatMap(PhaseEmptyResult phaseEmptyResult, Collector<String> collector) throws Exception {
// String value = JSONObject.toJSONString(phaseEmptyResult);
// collector.collect(value);
// }
// }).setParallelism(1).name("相位空放结果转成json字符串");
//
// String phaseEmptyTopic = properties.getProperty("phase.emptiness.topic");
// phaseEmptyJsonStream.addSink(new FlinkKafkaProducer<String>(
// properties.getProperty("bootstrap.servers"),
// phaseEmptyTopic,
// new SimpleStringSchema()))
// .setParallelism(1)
// .name("将相位空放计算结果发送至kafka");
//
// //相位空放结果写入mysql
// phaseEmptyStream.addSink(JdbcSink.sink(
// "insert into t_event_info (detect_time, type, rid, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?,?)",
// new JdbcStatementBuilder<PhaseEmptyResult>() {
// @Override
// public void accept(PreparedStatement preparedStatement, PhaseEmptyResult phaseEmptyResult) throws SQLException {
// preparedStatement.setTimestamp(1, new Timestamp(phaseEmptyResult.getGlobalTimeStamp()));
// preparedStatement.setString(2, "701");
// preparedStatement.setString(3, phaseEmptyResult.getRid());
// preparedStatement.setString(4, phaseEmptyResult.getCrossId());
// String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
// preparedStatement.setString(5, eventSerialNumber);
// preparedStatement.setString(6, "1");
// preparedStatement.setTimestamp(7, new Timestamp(phaseEmptyResult.getStartTime()));
// preparedStatement.setTimestamp(8, new Timestamp(phaseEmptyResult.getEndTime()));
// preparedStatement.setInt(9, phaseEmptyResult.getDuration());
// }
// }, jdbcExecutionOptions, jdbcBuild))
// .setParallelism(1)
// .name("将相位空放事件写入mysql");
//
//
//
// //计算溢出
// SingleOutputStreamOperator<CarTrackModel> carTrackStreamInCrossRoad = frameModelStream
// .flatMap(new CarTrackInCrossRoadFlatMap())
// .setParallelism(1)
// .name("过滤出位于路口和指定路段区域带有车牌的车辆轨迹数据");
//
// //计算1分钟内的路口溢出指数和是否死锁
// double avgSpeedEvaluationIndex = Double.parseDouble(properties.getProperty("road.overflow.avg.speed"));
// int durationEvaluationIndex = Integer.parseInt(properties.getProperty("road.overflow.duration"));
// int carNumber = Integer.parseInt(properties.getProperty("cross.road.deadlock.car.number"));
// double crossAvgSpeedIndex = Double.parseDouble(properties.getProperty("cross.road.deadlock.avg.speed"));
//
// SingleOutputStreamOperator<OverFlowIndexResult> overFlowIndexStream = carTrackStreamInCrossRoad
// .assignTimestampsAndWatermarks(
// WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
// .withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
// .keyBy(carTrackModel -> carTrackModel.getRoadnet().getCrossId())
// .window(TumblingEventTimeWindows.of(Time.minutes(1)))
// .process(new ProcessWindowFunction<CarTrackModel, OverFlowIndexResult, String, TimeWindow>() {
// @Override
// public void process(String crossId, ProcessWindowFunction<CarTrackModel, OverFlowIndexResult, String, TimeWindow>.Context context, Iterable<CarTrackModel> iterable, Collector<OverFlowIndexResult> collector) throws Exception {
// List<CarTrackModel> carInCrossRoadList = StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
// if(!CollectionUtils.isEmpty(carInCrossRoadList)){
// //过滤出位于指定区域内的数据
// List<CarTrackModel> carTracesInSpecificArea = carInCrossRoadList.stream().filter(car -> car.getIsInSpecificArea() == 1).collect(Collectors.toList());
// List<CarTrackModel> carTracesInCrossRoad = carInCrossRoadList.stream().filter(car -> car.getRoadnet().getInCrossFlag() == 1).collect(Collectors.toList());
// //根据路口进行区域划分
// Map<String, List<CarTrackModel>> groupMapInExitArea = carTracesInSpecificArea.stream().collect(Collectors.groupingBy(car -> car.getRoadnet().getRid()));
//
// //对每个路口区域进行统计
// if(!groupMapInExitArea.isEmpty()){
// List<OverFlowDetail> details = new ArrayList<>();
// groupMapInExitArea.forEach((rid, cars) -> {
// //统计路口出口内的车辆数量
// Map<String, List<CarTrackModel>> carMapGroupByPlate = cars.stream().collect(Collectors.groupingBy(CarTrackModel::getPicLicense));
// if(carMapGroupByPlate.size() > 5){
// //按照轨迹上报时间进行升序排列
// List<CarTrackModel> sorted = cars.stream().sorted(Comparator.comparing(CarTrackModel::getGlobalTimeStamp)).collect(Collectors.toList());
// //根据时间戳进行分组,找出同一时刻的轨迹数据
// Map<Long, List<CarTrackModel>> groupMapByTimestamp = sorted.stream().collect(Collectors.groupingBy(CarTrackModel::getGlobalTimeStamp));
// //存放时间戳和其对应的平均速度
// Map<Long, Double> map = new HashMap<>();
// groupMapByTimestamp.forEach((timestamp, traces) -> {
// double avgSpeed = traces.stream().mapToDouble(CarTrackModel::getSpeed).summaryStatistics().getAverage();
// map.put(timestamp, avgSpeed);
// });
//
// //出口车道的平均速度
// double ridAvgSpeed = map.values().stream().mapToDouble(number -> number).summaryStatistics().getAverage();
//
// List<Map.Entry<Long, Double>> entryList = map.entrySet().stream().sorted(Map.Entry.comparingByKey()).collect(Collectors.toList());
// int consecutiveSeconds = 0;//连续时间
// long overflowTimestamp = entryList.get(0).getKey();//发生溢出的时间戳
// for(int i = 0; i < entryList.size() - 1; i++){
// Map.Entry<Long, Double> current = entryList.get(i);
// Map.Entry<Long, Double> next = entryList.get(i + 1);
// if(current.getValue() < avgSpeedEvaluationIndex && next.getValue() < avgSpeedEvaluationIndex){
// consecutiveSeconds += Math.toIntExact((next.getKey() - current.getKey()));
// overflowTimestamp = next.getKey();
// }else{
// consecutiveSeconds = 0;
// }
// }
// if(consecutiveSeconds > durationEvaluationIndex){
// //计算路口的溢出指数
// double index = 30 / ridAvgSpeed;
// if(index > 5){
// //满足溢出判断的所有条件
// OverFlowDetail detail = new OverFlowDetail();
// detail.setRid(rid);
// detail.setTimestamp(overflowTimestamp);
// detail.setDatetime(DateUtil.toDateTime(overflowTimestamp, DateUtil.YYYY_MM_DD_HH_MM_SS));
// detail.setIndex(Double.parseDouble(String.format("%.2f", index)));
//
// details.add(detail);
// }
// }
// }
//
// });
// //存在路口发生溢出的情况
// if(!details.isEmpty()){
// OverFlowIndexResult result = new OverFlowIndexResult();
// result.setCrossId(crossId);
// result.setDetails(details);
// //溢出上报时间戳
// result.setGlobalTimeStamp(context.window().getEnd());
// result.setDateTime(DateUtil.toDateTime(result.getGlobalTimeStamp(), DateUtil.YYYY_MM_DD_HH_MM_SS));
// result.setStartTime(context.window().getStart());
// result.setEndTime(context.window().getEnd());
//
// //判断该路口死锁
// if(details.size() >= 2){
// Map<String, List<CarTrackModel>> groupMapByPlate = carTracesInCrossRoad.stream().collect(Collectors.groupingBy(CarTrackModel::getPicLicense));
// if(groupMapByPlate.size() > carNumber){
// double crossAvgSpeed = carTracesInCrossRoad.stream().mapToDouble(CarTrackModel::getSpeed).summaryStatistics().getAverage();
// if(crossAvgSpeed < crossAvgSpeedIndex){
// result.setDeadLock(true);
// }
// }
// }
// collector.collect(result);
// }
// }
// }
// }
// })
// .setParallelism(2)
// .name("统计每个路口的溢出指数");
//
// SingleOutputStreamOperator<String> jsonStreamOverFlowIndex = overFlowIndexStream.flatMap(new FlatMapFunction<OverFlowIndexResult, String>() {
// @Override
// public void flatMap(OverFlowIndexResult overFlowIndexResult, Collector<String> collector) throws Exception {
// collector.collect(JSONObject.toJSONString(overFlowIndexResult));
// }
// });
//
// String topicOfOverFlow = properties.getProperty("road.overflow.deadlock.index.analysis.topic");
// jsonStreamOverFlowIndex.addSink(new FlinkKafkaProducer<String>(
// properties.getProperty("bootstrap.servers"),
// topicOfOverFlow,
// new SimpleStringSchema()
// )).setParallelism(1).name("将路口溢出指标推送至kafka");
//
// //将溢出和死锁事件写入mysql
// SingleOutputStreamOperator<OverFlowIndexResult> deadLockStream = overFlowIndexStream.filter(new FilterFunction<OverFlowIndexResult>() {
// @Override
// public boolean filter(OverFlowIndexResult overFlowIndexResult) throws Exception {
// return overFlowIndexResult.isDeadLock();
// }
// }).setParallelism(1).name("过滤出死锁事件");
//
// deadLockStream.addSink(JdbcSink.sink(
// "insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?)",
// new JdbcStatementBuilder<OverFlowIndexResult>() {
// @Override
// public void accept(PreparedStatement preparedStatement, OverFlowIndexResult overFlowIndexResult) throws SQLException {
// preparedStatement.setTimestamp(1, new Timestamp(overFlowIndexResult.getGlobalTimeStamp()));
// preparedStatement.setString(2, "704");
// preparedStatement.setString(3, overFlowIndexResult.getCrossId());
// String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
// preparedStatement.setString(4, eventSerialNumber);
// preparedStatement.setString(5, "1");
// preparedStatement.setTimestamp(6, new Timestamp(overFlowIndexResult.getStartTime()));
// preparedStatement.setTimestamp(7, new Timestamp(overFlowIndexResult.getEndTime()));
// preparedStatement.setInt(8, 1);
// }
// }, jdbcExecutionOptions, jdbcBuild))
// .setParallelism(1)
// .name("将死锁事件写入mysql");
//
// overFlowIndexStream.addSink(JdbcSink.sink(
// "insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?)",
// new JdbcStatementBuilder<OverFlowIndexResult>() {
// @Override
// public void accept(PreparedStatement preparedStatement, OverFlowIndexResult overFlowIndexResult) throws SQLException {
// preparedStatement.setTimestamp(1, new Timestamp(overFlowIndexResult.getGlobalTimeStamp()));
// preparedStatement.setString(2, "703");
// preparedStatement.setString(3, overFlowIndexResult.getCrossId());
// String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
// preparedStatement.setString(4, eventSerialNumber);
// preparedStatement.setString(5, "1");
// preparedStatement.setTimestamp(6, new Timestamp(overFlowIndexResult.getStartTime()));
// preparedStatement.setTimestamp(7, new Timestamp(overFlowIndexResult.getEndTime()));
// preparedStatement.setInt(8, 1);
// }
// }, jdbcExecutionOptions, jdbcBuild))
// .setParallelism(1)
// .name("将溢出事件写入mysql");
//
// //计算失衡
// SingleOutputStreamOperator<UnbalanceResult> unbalanceIndexStream = splitCongestionStream
// .union(finalNoCoordCongestion)
// .assignTimestampsAndWatermarks(
// WatermarkStrategy.<RidIndexResult>forBoundedOutOfOrderness(Duration.ofSeconds(1))
// .withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
// .keyBy(RidIndexResult::getCrossId)
// .window(TumblingEventTimeWindows.of(Time.minutes(1)))
// .apply(new WindowFunction<RidIndexResult, UnbalanceResult, String, TimeWindow>() {
// @Override
// public void apply(String crossId, TimeWindow timeWindow, Iterable<RidIndexResult> iterable, Collector<UnbalanceResult> collector) throws Exception {
// List<RidIndexResult> collect = StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
// if (!CollectionUtils.isEmpty(collect) && collect.size() > 1) {
// //获取最大拥堵指数
// double max = collect.stream().mapToDouble(RidIndexResult::getIndex).max().getAsDouble();
// double min = collect.stream().mapToDouble(RidIndexResult::getIndex).min().getAsDouble();
//
// double index = max / min;
// if (index >= 1.5) {
// UnbalanceResult result = new UnbalanceResult();
// result.setCrossId(crossId);
// result.setIndex(index);
// result.setGlobalTimeStamp(timeWindow.getEnd());
// result.setTimestamp(DateUtil.toDateTime(result.getGlobalTimeStamp(), DateUtil.YYYY_MM_DD_HH_MM_SS));
// result.setStartTime(timeWindow.getStart());
// result.setEndTime(timeWindow.getEnd());
//
// collector.collect(result);
// }
// }
// }
// }).setParallelism(2).name("统计路口失衡指数");
//
// SingleOutputStreamOperator<String> jsonStreamOfUnbalance = unbalanceIndexStream.flatMap(new FlatMapFunction<UnbalanceResult, String>() {
// @Override
// public void flatMap(UnbalanceResult unbalanceResult, Collector<String> collector) throws Exception {
// String value = JSONObject.toJSONString(unbalanceResult);
// collector.collect(value);
// }
// }).setParallelism(1).name("将失衡指标数据转成json字符串");
//
// String topicOfUnbalanceIndexCalculation = properties.getProperty("road.unbalance.index.analysis.topic");
// jsonStreamOfUnbalance.addSink(new FlinkKafkaProducer<String>(
// properties.getProperty("bootstrap.servers"),
// topicOfUnbalanceIndexCalculation,
// new SimpleStringSchema()
// )).setParallelism(1).name("将失衡指数发送至kafka");
//
// //将失衡结果写入mysql
// unbalanceIndexStream.addSink(JdbcSink.sink(
// "insert into t_event_info (detect_time, type, cross_id, event_serial_number, category, start_time, end_time, duration) values(?,?,?,?,?,?,?,?)",
// new JdbcStatementBuilder<UnbalanceResult>() {
// @Override
// public void accept(PreparedStatement preparedStatement, UnbalanceResult unbalanceResult) throws SQLException {
// preparedStatement.setTimestamp(1, new Timestamp(unbalanceResult.getGlobalTimeStamp()));
// preparedStatement.setString(2, "702");
// preparedStatement.setString(3, unbalanceResult.getCrossId());
// String eventSerialNumber = UUID.randomUUID().toString().replace("-","");
// preparedStatement.setString(4, eventSerialNumber);
// preparedStatement.setString(5, "1");
// preparedStatement.setTimestamp(6, new Timestamp(unbalanceResult.getStartTime()));
// preparedStatement.setTimestamp(7, new Timestamp(unbalanceResult.getEndTime()));
// preparedStatement.setInt(8, 1);
// }
// }, jdbcExecutionOptions, jdbcBuild))
// .setParallelism(1)
// .name("将失衡事件写入mysql");
env.execute("计算拥堵-溢出-失衡-死锁-相位空放");
......
......@@ -88,6 +88,6 @@ public class CarDataValidatorFlatMap implements FlatMapFunction<FrameModel, CarT
}
public static boolean isQualifiedPlate(String plate){
return StringUtils.isNotEmpty(plate) && !Objects.equals("null", plate) && !Objects.equals("默A00000", plate);
return StringUtils.isNotEmpty(plate) && !Objects.equals("null", plate) && !plate.contains("00000");
}
}
package com.wanji.indicators.task.trajectory.func;
import com.wanji.indicators.task.trajectory.CarTrajectoryIndexMain;
import com.wanji.indicators.task.trajectory.pojo.RidIndexResult;
import com.wanji.indicators.task.trajectory.pojo.StartDuration;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
/**
* 协调方向拥堵数据
*
* @author Kent HAN
* @date 2023/11/14 10:02
*/
public class CoordCongestionDataFunction extends KeyedProcessFunction<String, RidIndexResult, RidIndexResult> {
// Key:方向
private MapState<Integer, StartDuration> dirState;
@Override
public void open(Configuration parameters) throws Exception {
dirState = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("dir_map",
Integer.class, StartDuration.class));
}
@Override
public void processElement(RidIndexResult value,
KeyedProcessFunction<String, RidIndexResult, RidIndexResult>.Context ctx,
Collector<RidIndexResult> out) throws Exception {
String rid = ctx.getCurrentKey();
Integer dir = CarTrajectoryIndexMain.ridDirMap.get(rid);
value.setDir(dir);
Double index = value.getIndex();
boolean isCongestion = index > 1.5;
Integer stateKey = dir;
if (isCongestion) {
try {
if (!dirState.contains(stateKey)) {
StartDuration startDuration = new StartDuration();
// 获取当前的时间戳
Instant now = Instant.now();
// 获取1分钟前的时间戳
Instant oneMinuteBefore = now.minus(1, ChronoUnit.MINUTES);
// 转换成毫秒
long timestampOneMinuteBefore = oneMinuteBefore.toEpochMilli();
value.setStartTime(timestampOneMinuteBefore);
startDuration.setStartTime(timestampOneMinuteBefore);
value.setDuration(1);
startDuration.setDuration(1);
dirState.put(stateKey, startDuration);
} else {
StartDuration startDuration = dirState.get(stateKey);
value.setStartTime(startDuration.getStartTime());
Integer duration = startDuration.getDuration();
duration += 1;
startDuration.setDuration(duration);
value.setDuration(duration);
dirState.put(stateKey, startDuration);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
// 删除状态
try {
Iterator<Integer> mapStateIterator = dirState.keys().iterator();
while (mapStateIterator.hasNext()){
Integer key = mapStateIterator.next();
if(key.equals(stateKey)){
mapStateIterator.remove();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
out.collect(value);
}
}
package com.wanji.indicators.task.trajectory.func;
import com.wanji.indicators.constant.CongestEnum;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.constant.LightStatusEnum;
import com.wanji.indicators.constant.TurnEnum;
......@@ -7,8 +8,8 @@ import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.CrossRidTurnLampStatusModel;
import com.wanji.indicators.task.lightstatus.model.VehicleHeadTimeModel;
import com.wanji.indicators.task.trajectory.pojo.RidIndexResultOfEastAndWest;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.task.trajectory.pojo.RidIndexResult;
import com.wanji.indicators.task.trajectory.pojo.StartDuration;
import com.wanji.indicators.util.GeomsConvertUtil;
import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.flink.api.common.state.MapState;
......@@ -18,6 +19,8 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.springframework.util.StringUtils;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
......@@ -28,7 +31,7 @@ import java.util.stream.StreamSupport;
* @createTime : 2023/11/1 16:16
* @Description :
*/
public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, CrossFrameModel, RidIndexResultOfEastAndWest> {
public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String, CrossFrameModel, RidIndexResult> {
//车道上离停止线最近的车辆缓存
private MapState<String, CarTrackModel> mapState;
//绿灯总时长
......@@ -36,6 +39,9 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
//车道车头时距缓存
private MapState<String, VehicleHeadTimeModel> resultState;
// 开始时间和持续时长状态。Key:方向
private MapState<Integer, StartDuration> dirState;
@Override
public void open(Configuration parameters) {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("light_status_map", String.class, CarTrackModel.class));
......@@ -45,7 +51,7 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
}
@Override
public void processElement(CrossFrameModel value, Context ctx, Collector<RidIndexResultOfEastAndWest> out) throws Exception {
public void processElement(CrossFrameModel value, Context ctx, Collector<RidIndexResult> out) throws Exception {
String crossId = ctx.getCurrentKey();
List<CarTrackModel> trackList = value.getTrackList();
//灯态数据
......@@ -54,16 +60,17 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
Map<Integer, List<CrossRidTurnLampStatusModel>> groupLightStatus =
ridLightStatusList.stream()
.filter(o -> Objects.nonNull(o) && !StringUtils.isEmpty(o.getLampState()) && o.getLampState().trim().equals(LightStatusEnum.GREEN.getType()))
.filter(o -> o.getDir() == 1 || o.getDir() == 4 || o.getDir() == 5)//南北方向
.filter(o -> !o.getTurn().equals(Constant.PEDESTRAIN_LIGHT_CODE))//非行人灯
.filter(o -> o.getDir() == 1 || o.getDir() == 4 || o.getDir() == 5)//非协调方向
.filter(o -> !o.getTurn().equals(Constant.PEDESTRAIN_LIGHT_CODE)) // 非行人灯
.filter(o -> !o.getTurn().equals(Constant.RIGHT_LIGHT_CODE)) // 非右转灯
.collect(Collectors.groupingBy(CrossRidTurnLampStatusModel::getDir));
//存储当前南北方向绿灯结束的路口和绿灯时长
//存储当前非协调方向绿灯结束的路口和绿灯时长
Map<Integer, Integer> expiredGreenLightMap = new HashMap<>();
if(!greenTotalTimeState.isEmpty()){
Iterable<String> keys = greenTotalTimeState.keys();
for(String key: keys){
String[] split = key.trim().split("-");
String[] split = key.trim().split("-"); // "路口ID-方向"
int dir = Integer.parseInt(split[1]);
if(!groupLightStatus.containsKey(dir)){
expiredGreenLightMap.put(dir, greenTotalTimeState.get(key));
......@@ -73,9 +80,9 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
for (Map.Entry<Integer, List<CrossRidTurnLampStatusModel>> entry : groupLightStatus.entrySet()) {
Integer dir = entry.getKey();
List<CrossRidTurnLampStatusModel> value1 = entry.getValue();//同个路段上绿灯放行的转向
List<CrossRidTurnLampStatusModel> value1 = entry.getValue(); // 非协调方向绿灯信息
//更新进口道绿灯时长和倒计时
// 更新绿灯相位周期总时长
String combinedKey = crossId + "-" + dir;
if(!greenTotalTimeState.contains(combinedKey)){
greenTotalTimeState.put(combinedKey, value1.get(0).getCyclePhaseCountDown());
......@@ -92,7 +99,7 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
for (Map.Entry<String, List<CarTrackModel>> entry1 : groupByLane.entrySet()) {
List<CarTrackModel> laneCarList = entry1.getValue();
//计算路口内车辆到停止线距离
// 计算路口内车辆到停止线距离
laneCarList.forEach(o -> {
String laneEndPoint = o.getRoadnet().getLaneEndPoint();
if (Objects.nonNull(laneEndPoint)) {
......@@ -159,13 +166,13 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
}
}
//存在绿灯倒计时结束
// 存在绿灯倒计时结束
if(!expiredGreenLightMap.isEmpty()){
for(Map.Entry<Integer, Integer> expiredLight: expiredGreenLightMap.entrySet()){
Integer dir = expiredLight.getKey();
Integer totalGreenLightTime = expiredLight.getValue();
//筛选出当前进口车道的统计结果
// 筛选当前方向的统计结果
List<VehicleHeadTimeModel> stats = StreamSupport.stream(resultState.values().spliterator(), false)
.filter(stat -> stat.getDir().compareTo(dir) == 0)
.collect(Collectors.toList());
......@@ -189,31 +196,102 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
}
if(!stats.isEmpty()){
double average = stats.stream().mapToDouble(VehicleHeadTimeModel::getHeadTime).summaryStatistics().getAverage();
double average = stats.stream().mapToDouble(VehicleHeadTimeModel::getHeadTime)
.summaryStatistics()
.getAverage();
double index = (totalGreenLightTime - average) / totalGreenLightTime;
double convertedIndex = convertIndex(index); // 转换非协调方向拥堵指数
String indexName;
if(index < 0.6){
String congestionType;
if(convertedIndex > 1.5 && convertedIndex <= 2.0){
indexName = "轻度拥堵";
congestionType = CongestEnum.LIGHT_CONGEST.getType();
}else if(convertedIndex > 2.0 && convertedIndex <= 3.0){
indexName = "中度拥堵";
congestionType = CongestEnum.MODERATE_CONGEST.getType();
}else if(convertedIndex > 3.0){
indexName = "重度拥堵";
congestionType = CongestEnum.HEAVY_CONGEST.getType();
}else{
indexName = "畅通";
}else if(index < 0.8){
indexName = "缓行";
}else {
indexName = "拥堵";
congestionType = CongestEnum.NO_CONGEST.getType();
}
RidIndexResultOfEastAndWest stat = new RidIndexResultOfEastAndWest();
RidIndexResult stat = new RidIndexResult();
stat.setCrossId(crossId);
stat.setRid(stats.get(0).getRid());
stat.setIndex(Double.parseDouble(String.format("%.2f", index)));
stat.setIndexName(indexName);
stat.setGlobalTimeStamp(ctx.timestamp());
stat.setTimestamp(DateUtil.toDateTime(stat.getGlobalTimeStamp(), DateUtil.YYYY_MM_DD_HH_MM_SS));
stat.setEndTime(ctx.timestamp());
stat.setStartTime(stat.getEndTime() - totalGreenLightTime * 1000);
stat.setCongestionType(congestionType);
stat.setDir(dir);
boolean isCongestion = convertedIndex > 1.5;
Integer stateKey = dir;
if (isCongestion) {
try {
if (!dirState.contains(stateKey)) {
StartDuration startDuration = new StartDuration();
// 获取当前的时间戳
Instant now = Instant.now();
// 获取1分钟前的时间戳
Instant oneMinuteBefore = now.minus(1, ChronoUnit.MINUTES);
// 转换成毫秒
long timestampOneMinuteBefore = oneMinuteBefore.toEpochMilli();
stat.setStartTime(timestampOneMinuteBefore);
startDuration.setStartTime(timestampOneMinuteBefore);
stat.setDuration(totalGreenLightTime / 60);
startDuration.setDuration(1);
dirState.put(stateKey, startDuration);
} else {
StartDuration startDuration = dirState.get(stateKey);
stat.setStartTime(startDuration.getStartTime());
Integer duration = startDuration.getDuration();
duration += totalGreenLightTime / 60;
startDuration.setDuration(duration);
stat.setDuration(duration);
dirState.put(stateKey, startDuration);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
// 删除状态
try {
Iterator<Integer> dirStateKeysIter = dirState.keys().iterator();
while (dirStateKeysIter.hasNext()){
Integer key = dirStateKeysIter.next();
if(key.equals(stateKey)){
dirStateKeysIter.remove();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
out.collect(stat);
}
}
}
}
private double convertIndex(double saturation) {
if (saturation < 0.6) {
return 1.5;
} else if (saturation < 0.7) {
return linearInterpolation(saturation, 0.6, 0.7, 1.5, 2.0);
} else if (saturation < 0.8) {
return linearInterpolation(saturation, 0.7, 0.8, 2.0, 3.0);
} else {
return 3.0;
}
}
private double linearInterpolation(double x, double x0, double x1, double y0, double y1) {
return y0 + (x - x0) * ((y1 - y0) / (x1 - x0));
}
}
......@@ -9,7 +9,10 @@ import lombok.Data;
* @Description :
*/
@Data
public class RidIndexResultOfEastAndWest {
public class RidIndexResult {
private Integer dir;
private String rid;
//该路段的结束路口
private String crossId;
......@@ -27,4 +30,7 @@ public class RidIndexResultOfEastAndWest {
private Long startTime;
private Long endTime;
private Integer duration;
}
package com.wanji.indicators.task.trajectory.pojo;
import lombok.Data;
@Data
public class StartDuration {
// 开始时间(毫秒时间戳)
private Long startTime;
// 持续时长(分钟)
private Integer duration;
}
......@@ -44,15 +44,15 @@ cross.event.data.topic=analysis.cross.event
plate.prefix=
#路段默认自由流速度配置值 km/h
rid.default.free.speed=80
#东西方向路段拥堵指数统计
#协调方向路段拥堵指数统计
rid.traffic.index.analysis.topic=rid.traffic.index.analysis
#南北方向饱和度计算
#非协调方向饱和度计算
rid.traffic.index.north.south.topic=rid.traffic.index.north.south.analysis
#东西方向路段的rid和方向
east.west.rid.direction.list=13NED0B5Q9013NF80B5QN00:6,13NGH0B5RC013NF80B5QN00:2,13NI00B5RM013NGH0B5RC00:3,13NF80B5QN013NGH0B5RC00:6,13NGH0B5RC013NI00B5RM00:7,13NID0B5RM013NI00B5RM00:3
#南北方向路段的rid和方向
north.south.rid.direction.list=13NG40B5SK013NI00B5RM00:1,13NEH0B5RJ013NGH0B5RC00:1,13NEP0B5QJ013NGH0B5RC00:5,13NDG0B5RI013NF80B5QN00:1,13NDT0B5Q9013NF80B5QN00:4
# 协调方向路段的rid和方向
coord.rid.direction.list=13NED0B5Q9013NF80B5QN00:6,13NGH0B5RC013NF80B5QN00:2,13NI00B5RM013NGH0B5RC00:3,13NF80B5QN013NGH0B5RC00:6,13NGH0B5RC013NI00B5RM00:7,13NID0B5RM013NI00B5RM00:3
# 非协调方向路段的rid和方向
no.coord.rid.direction.list=13NG40B5SK013NI00B5RM00:1,13NEH0B5RJ013NGH0B5RC00:1,13NEP0B5QJ013NGH0B5RC00:5,13NDG0B5RI013NF80B5QN00:1,13NDT0B5Q9013NF80B5QN00:4
#路口溢出评价指标
road.overflow.avg.speed=5.0
......@@ -69,8 +69,10 @@ cross.road.deadlock.avg.speed=5.0
#相位灯的状态数据
light.status.topic=cross_lights_status
#虚拟路口区域
# 虚拟路口区域
# 霞景路西,旅游路与回龙山路交叉口
virtual.crossroad.13NED0B5Q90=13NED0B5Q90:117.08503591467242,36.64125732273356;117.08495255127629,36.641426722875224;117.08499878952986,36.641454206982246;117.08508543702352,36.641286700399476
# 福地街东,旅游路与隧道交叉口
virtual.crossroad.13NH20B5RH0=13NH20B5RH0:117.09669255282627,36.644871615002884;117.09669985552095,36.645055610398025;117.09675474612689,36.64505324101935;117.09674606214631,36.64486930675771
#相位空放topic
......
......@@ -5,9 +5,9 @@
## 2. 指标说明
### 2.1 拥堵
拥堵指标计算分为东西方向和南北方向
- 东西方向拥堵指数计算会针对每个路口的进口路段进行计算,统计每个路段的拥堵指数,并根据拥堵指数进行阈值判断后,获取该路段的交通状态
- 南北方向的拥堵指数计算是根据饱和度计算绿灯时间内,车辆空档时间。根据算法文档给出的计算公式进行相关计算后,判断其阈值范围,从而获取该路段的交通状态
拥堵指标计算分为协调方向方向和非协调方向
- 协调方向拥堵指数计算会针对每个路口的进口路段进行计算,统计每个路段的拥堵指数,并根据拥堵指数进行阈值判断后,获取该路段的交通状态
- 非协调方向的拥堵指数计算:根据绿灯时间内车辆空档时间,计算饱和度。根据算法文档阈值判断拥堵等级。
### 2.2 失衡
- 根据算法指标计算文档给出的计算公式,计算每个路口是否处于失衡状态
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment