Commit 165157b8 authored by zhoushiguang's avatar zhoushiguang

干线指数计算

parent 18b5ccdf
...@@ -54,6 +54,8 @@ public class BaseLaneCache { ...@@ -54,6 +54,8 @@ public class BaseLaneCache {
private static Map<String, List<BaseCrossTurnInfo>> crossTurnInfoMap = new ConcurrentHashMap<>(); private static Map<String, List<BaseCrossTurnInfo>> crossTurnInfoMap = new ConcurrentHashMap<>();
private static Map<String, Map<String, Double>> freeSpeedMap = new ConcurrentHashMap<>();
private static BaseLaneCache instance; private static BaseLaneCache instance;
static PropertiesHelper property = PropertiesHelper.getInstance(); static PropertiesHelper property = PropertiesHelper.getInstance();
...@@ -68,6 +70,8 @@ public class BaseLaneCache { ...@@ -68,6 +70,8 @@ public class BaseLaneCache {
loadTurnRelation(); loadTurnRelation();
loadFreeSpeed();
} }
return instance; return instance;
} }
...@@ -92,14 +96,14 @@ public class BaseLaneCache { ...@@ -92,14 +96,14 @@ public class BaseLaneCache {
segmentLaneNumberMap.put(entry.getKey(), entry.getValue().size()); segmentLaneNumberMap.put(entry.getKey(), entry.getValue().size());
} }
Map<String, List<BaseLaneInfo>> ridLaneList = collection.stream().collect(Collectors.groupingBy(o -> o.getRid()+"_"+o.getType())); Map<String, List<BaseLaneInfo>> ridLaneList = collection.stream().collect(Collectors.groupingBy(o -> o.getRid() + "_" + o.getType()));
for (Map.Entry<String, List<BaseLaneInfo>> entry : ridLaneList.entrySet()) { for (Map.Entry<String, List<BaseLaneInfo>> entry : ridLaneList.entrySet()) {
List<BaseLaneInfo> value = entry.getValue(); List<BaseLaneInfo> value = entry.getValue();
double totalLength = value.stream().mapToDouble(o->o.getLength()).summaryStatistics().getSum(); double totalLength = value.stream().mapToDouble(o -> o.getLength()).summaryStatistics().getSum();
//路段上车道总长度 //路段上车道总长度
ridLaneLengthMap.put(entry.getKey(), new Double[]{totalLength,(double)value.size()}); ridLaneLengthMap.put(entry.getKey(), new Double[]{totalLength, (double) value.size()});
} }
} }
...@@ -124,6 +128,19 @@ public class BaseLaneCache { ...@@ -124,6 +128,19 @@ public class BaseLaneCache {
} }
} }
private static void loadFreeSpeed() {
ApplicationContext beanConf = new ClassPathXmlApplicationContext("spring-container.xml");
BaseLaneInfoService baseLaneInfoService = beanConf.getBean(BaseLaneInfoServiceImpl.class);
String crossList = property.getProperties().getProperty("full.area.cross.list");
Double defaultFreeSpeed = Double.parseDouble(property.getProperties().getProperty("rid.default.free.speed"));
Map<String, Map<String, Double>> ridFreeSpeed = baseLaneInfoService.getRidFreeSpeed(crossList, defaultFreeSpeed);
freeSpeedMap.putAll(ridFreeSpeed);
}
public Map<String, BaseLaneInfo> getBaseLaneInfoMap() { public Map<String, BaseLaneInfo> getBaseLaneInfoMap() {
return baseLaneInfoMap; return baseLaneInfoMap;
} }
...@@ -147,4 +164,8 @@ public class BaseLaneCache { ...@@ -147,4 +164,8 @@ public class BaseLaneCache {
public static Map<String, Double[]> getRidLaneLengthMap() { public static Map<String, Double[]> getRidLaneLengthMap() {
return ridLaneLengthMap; return ridLaneLengthMap;
} }
public Map<String, Map<String, Double>> getFreeSpeedMap() {
return freeSpeedMap;
}
} }
...@@ -51,7 +51,7 @@ public class AreaFreeSpeedSource extends RichParallelSourceFunction<Map<String, ...@@ -51,7 +51,7 @@ public class AreaFreeSpeedSource extends RichParallelSourceFunction<Map<String,
Map<String, Map<String, Double>> ridFreeSpeed = baseLaneInfoService.getRidFreeSpeed(crossList, defaultFreeSpeed); Map<String, Map<String, Double>> ridFreeSpeed = baseLaneInfoService.getRidFreeSpeed(crossList, defaultFreeSpeed);
sourceContext.collect(ridFreeSpeed); sourceContext.collect(ridFreeSpeed);
TimeUnit.MINUTES.sleep(1); TimeUnit.SECONDS.sleep(5);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("查询车道信息异常: " + e.getMessage(), e); log.error("查询车道信息异常: " + e.getMessage(), e);
......
...@@ -149,7 +149,7 @@ public class AllTaskMain { ...@@ -149,7 +149,7 @@ public class AllTaskMain {
/**********************************************************************************/ /**********************************************************************************/
SingleOutputStreamOperator<CarTrackModel> joinFreeSpeedStream = carTrackModelStream SingleOutputStreamOperator<CarTrackModel> joinFreeSpeedStream = carTrackModelStream
.connect(env.addSource(new AreaFreeSpeedSource()).broadcast()) //.connect(env.addSource(new AreaFreeSpeedSource()).broadcast())
.flatMap(new RidFreeSpeedCoFlatMap()) .flatMap(new RidFreeSpeedCoFlatMap())
.setParallelism(1) .setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定"); .name("全域轨迹帧数据-车道属性数据绑定");
......
...@@ -115,7 +115,7 @@ public class AreaIndicatorMain { ...@@ -115,7 +115,7 @@ public class AreaIndicatorMain {
.name("轨迹帧数据解析-ToCarTrackModel"); .name("轨迹帧数据解析-ToCarTrackModel");
SingleOutputStreamOperator<CarTrackModel> joinLaneStream = carTrackModelStream SingleOutputStreamOperator<CarTrackModel> joinLaneStream = carTrackModelStream
.connect(env.addSource(new AreaFreeSpeedSource()).broadcast()) //.connect(env.addSource(new AreaFreeSpeedSource()).broadcast())
.flatMap(new RidFreeSpeedCoFlatMap()) .flatMap(new RidFreeSpeedCoFlatMap())
.setParallelism(1) .setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定"); .name("全域轨迹帧数据-车道属性数据绑定");
......
...@@ -82,7 +82,7 @@ public class OfflineTestIndicatorTaskMain { ...@@ -82,7 +82,7 @@ public class OfflineTestIndicatorTaskMain {
try { try {
SingleOutputStreamOperator<CarTrackModel> joinLaneStream = carTrackModelStream SingleOutputStreamOperator<CarTrackModel> joinLaneStream = carTrackModelStream
.connect(env.addSource(new AreaFreeSpeedSource()).broadcast()) //.connect(env.addSource(new AreaFreeSpeedSource()).broadcast())
.flatMap(new RidFreeSpeedCoFlatMap()) .flatMap(new RidFreeSpeedCoFlatMap())
.setParallelism(1) .setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定"); .name("全域轨迹帧数据-车道属性数据绑定");
......
package com.wanji.indicators.task.area.service.func; package com.wanji.indicators.task.area.service.func;
import com.wanji.indicators.cache.BaseLaneCache;
import com.wanji.indicators.constant.Constant; import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.constant.RoadClassMap; import com.wanji.indicators.constant.RoadClassMap;
import com.wanji.indicators.entity.BaseLaneInfo; import com.wanji.indicators.entity.BaseLaneInfo;
import com.wanji.indicators.model.CarTrackModel; import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.CrossFrameModel; import com.wanji.indicators.model.CrossFrameModel;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.util.*; import java.util.*;
public class RidFreeSpeedCoFlatMap implements CoFlatMapFunction<CarTrackModel, Map<String, Map<String,Double>>, CarTrackModel> { public class RidFreeSpeedCoFlatMap implements FlatMapFunction<CarTrackModel, CarTrackModel> {
private Map<String, Map<String,Double>> map = new HashMap(); private Map<String, Map<String,Double>> map = new HashMap();
@Override @Override
public void flatMap1(CarTrackModel carTrackModel, Collector<CarTrackModel> collector) throws Exception { public void flatMap(CarTrackModel carTrackModel, Collector<CarTrackModel> collector) throws Exception {
map = BaseLaneCache.getInstance().getFreeSpeedMap();
if (map != null && !map.isEmpty()) { if (map != null && !map.isEmpty()) {
String rid = carTrackModel.getRoadnet().getRid(); String key = carTrackModel.getRoadnet().getRid();
String crossId = carTrackModel.getRoadnet().getCrossId();
String key = rid;
if (!map.containsKey(key)) { if (!map.containsKey(key)) {
return; return;
} }
Double freeSpeed = map.get(key).get("freeSpeed"); Double freeSpeed = map.get(key).get("freeSpeed");
Double ridLevel = map.get(key).get("level")/*==null? RoadClassMap.CIYAO_DAO.getType():map.get(key).get("level")*/; Double ridLevel = map.get(key).get("level")/*==null? RoadClassMap.CIYAO_DAO.getType():map.get(key).get("level")*/;
Double ridLength = map.get(key).get("ridLength"); Double ridLength = map.get(key).get("ridLength");
Collection<Map<String,Double>> collection = map.values(); Collection<Map<String,Double>> collection = map.values();
double areaFreeSpeed = collection.stream().mapToDouble(o->o.get("freeSpeed")).summaryStatistics().getAverage(); double areaFreeSpeed = collection.stream().mapToDouble(o->o.get("freeSpeed")).summaryStatistics().getAverage();
carTrackModel.setAreaFreeSpeed(areaFreeSpeed); carTrackModel.setAreaFreeSpeed(areaFreeSpeed);
...@@ -38,8 +44,4 @@ public class RidFreeSpeedCoFlatMap implements CoFlatMapFunction<CarTrackModel, ...@@ -38,8 +44,4 @@ public class RidFreeSpeedCoFlatMap implements CoFlatMapFunction<CarTrackModel,
} }
@Override
public void flatMap2( Map<String, Map<String,Double>> hashMap, Collector<CarTrackModel> collector) throws Exception {
this.map = hashMap;
}
} }
...@@ -102,7 +102,7 @@ public class RidRoadStatusStartMain { ...@@ -102,7 +102,7 @@ public class RidRoadStatusStartMain {
.name("轨迹帧数据解析-ToCarTrackModel"); .name("轨迹帧数据解析-ToCarTrackModel");
SingleOutputStreamOperator<CarTrackModel> joinLaneStream = carTrackModelStream SingleOutputStreamOperator<CarTrackModel> joinLaneStream = carTrackModelStream
.connect(env.addSource(new AreaFreeSpeedSource()).broadcast()) //.connect(env.addSource(new AreaFreeSpeedSource()).broadcast())
.flatMap(new RidFreeSpeedCoFlatMap()) .flatMap(new RidFreeSpeedCoFlatMap())
.setParallelism(1) .setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定"); .name("全域轨迹帧数据-车道属性数据绑定");
......
...@@ -102,31 +102,37 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT ...@@ -102,31 +102,37 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT
if (arr != null) { if (arr != null) {
laneCount = arr[1]; laneCount = arr[1];
totalLength = arr[0]; totalLength = arr[0];
//2分钟路段停满车辆数 单车道30%系数
carSizeThreshold = ArithOfBigDecmial.div(totalLength, 5, 0) / count / 3;
carSizeThreshold = ArithOfBigDecmial.div(totalLength, 7, 0) ;
} }
long segmentCarSize = list.stream().mapToInt(o -> o.getId()).distinct().count(); long segmentCarSize = list.stream().mapToInt(o -> o.getId()).distinct().count();
//过滤轨迹覆盖度较低情况 //过滤轨迹覆盖度较低情况
if (avgSpeed > 0 && segmentCarSize > count) { if (avgSpeed > 0 && segmentCarSize > 10) {
trafficIndex = ArithOfBigDecmial.div(freeSpeed, avgSpeed); trafficIndex = ArithOfBigDecmial.div(freeSpeed, avgSpeed);
double tti = 0;
double ttl = 0; double ttiFree = 0;
double ttlFree = 0;
Map<String, List<CarTrackModel>> groupBy = list.stream().collect(Collectors.groupingBy(o -> o.getRoadnet().getSegmentId())); Map<String, List<CarTrackModel>> groupBy = list.stream().collect(Collectors.groupingBy(o -> o.getRoadnet().getSegmentId()));
for (Map.Entry<String, List<CarTrackModel>> entry : groupBy.entrySet()) { Map<String, Integer> segmentList = BaseLaneCache.getInstance().getSegmentLaneNumberMap();
long segCarCount = entry.getValue().stream().map(o->o.getId()).distinct().count(); for (Map.Entry<String, Integer> entry : segmentList.entrySet()) {
List<CarTrackModel> carlist= new ArrayList<>() ;
double avgSpd = this.getAvgSpeed(entry.getValue(), freeSpeed); if (groupBy.containsKey(entry.getKey()))
carlist = groupBy.get(entry.getKey());
double length = entry.getValue().get(0).getRoadnet().getLaneLength(); //渠化段上所有车辆
long segCarCount = carlist.stream().map(o->o.getId()).distinct().count();
//渠化段车辆平均行驶速度
double avgSpd = this.getAvgSpeed(carlist, freeSpeed);
//渠化段长度
double length = carlist.isEmpty()?0: carlist.get(0).getRoadnet().getLaneLength();
//渠化段ttl计算
double travelTimeIndex = (length/1000 / avgSpd) * segCarCount; double travelTimeIndex = (length/1000 / avgSpd) * segCarCount;
ttl += travelTimeIndex; double freeTimeIndex = (length/1000/freeSpeed) * segCarCount;;
ttlFree += (length/1000/freeSpeed) * segCarCount;
}
trafficIndex = ttl / ttlFree;
tti += travelTimeIndex;
ttiFree += freeTimeIndex;
}
if (ttiFree>0) {
trafficIndex = tti / ttiFree;
}
} }
...@@ -142,25 +148,35 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT ...@@ -142,25 +148,35 @@ public class RidProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneT
if (ridLevel == RoadClassMap.MAIN_DAO.getType()) { if (ridLevel == RoadClassMap.MAIN_DAO.getType()) {
if (trafficIndex <= 1.8) { if (trafficIndex <= 1.8) {
trafficState = CongestEnum.NO_CONGEST.getType(); trafficState = CongestEnum.NO_CONGEST.getType();
} else if (trafficIndex > 1.8 && trafficIndex <= 2.5) { } else if (trafficIndex <= 2.5) {
trafficState = CongestEnum.LIGHT_CONGEST.getType(); trafficState = CongestEnum.LIGHT_CONGEST.getType();
} else if (trafficIndex > 2.5 && trafficIndex <= 3.5) { } else if (trafficIndex <= 3.5) {
trafficState = CongestEnum.MODERATE_CONGEST.getType(); trafficState = CongestEnum.MODERATE_CONGEST.getType();
} else if (trafficIndex > 3.5) { } else {
trafficState = CongestEnum.HEAVY_CONGEST.getType(); trafficState = CongestEnum.HEAVY_CONGEST.getType();
} }
} else if (ridLevel == RoadClassMap.CIYAO_DAO.getType()) { } else if (ridLevel == RoadClassMap.CIYAO_DAO.getType()) {
//次要道路 //次要道路
if (trafficIndex <= 1.5) { if (trafficIndex <= 1.5) {
trafficState = CongestEnum.NO_CONGEST.getType(); trafficState = CongestEnum.NO_CONGEST.getType();
} else if (trafficIndex > 1.5 && trafficIndex <= 2) { } else if (trafficIndex <= 2) {
trafficState = CongestEnum.LIGHT_CONGEST.getType(); trafficState = CongestEnum.LIGHT_CONGEST.getType();
} else if (trafficIndex > 2 && trafficIndex <= 3) { } else if (trafficIndex <= 3) {
trafficState = CongestEnum.MODERATE_CONGEST.getType(); trafficState = CongestEnum.MODERATE_CONGEST.getType();
} else if (trafficIndex > 3) { } else {
trafficState = CongestEnum.HEAVY_CONGEST.getType(); trafficState = CongestEnum.HEAVY_CONGEST.getType();
} }
} }
//采用tti判定范围
if (trafficIndex <= 1.35) {
trafficState = CongestEnum.NO_CONGEST.getType();
} else if (trafficIndex < 1.55) {
trafficState = CongestEnum.LIGHT_CONGEST.getType();
} else if (trafficIndex < 1.85) {
trafficState = CongestEnum.MODERATE_CONGEST.getType();
} else {
trafficState = CongestEnum.HEAVY_CONGEST.getType();
}
LaneTrafficStateResult outModel = new LaneTrafficStateResult(); LaneTrafficStateResult outModel = new LaneTrafficStateResult();
outModel.setId("Rid_Traffic_State"); outModel.setId("Rid_Traffic_State");
......
package com.wanji.indicators.task.trajectory;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.AtomicDouble;
import com.wanji.indicators.cache.BaseLaneCache;
import com.wanji.indicators.constant.CongestEnum;
import com.wanji.indicators.constant.EventAbnormalEnum;
import com.wanji.indicators.constant.EventLevelEnum;
import com.wanji.indicators.constant.VehicleTypeEnum;
import com.wanji.indicators.event.stream.func.CrossFrameFlatMap;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.source.AreaFreeSpeedSource;
import com.wanji.indicators.task.area.service.func.RidFreeSpeedCoFlatMap;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.task.trafficstate.func.BoundCarRoadNetInfoCoFlatMap;
import com.wanji.indicators.task.trajectory.func.CarDataValidatorFlatMap;
import com.wanji.indicators.task.trajectory.func.CoordCongestionDataFunction;
import com.wanji.indicators.task.trajectory.func.CrossRoadLightStatusCoFlatMap;
import com.wanji.indicators.task.trajectory.func.LineCongestionDuration;
import com.wanji.indicators.task.trajectory.func.LineCongestionFunction;
import com.wanji.indicators.task.trajectory.func.MergeCongestionFunction;
import com.wanji.indicators.task.trajectory.func.OverflowFunction;
import com.wanji.indicators.task.trajectory.func.PhaseEmptyTimeProcessFunction;
import com.wanji.indicators.task.trajectory.func.RichMapWithCrossLaneInfo;
import com.wanji.indicators.task.trajectory.func.UnbalanceFunction;
import com.wanji.indicators.task.trajectory.func.VehicleGapTimeProcessFunction;
import com.wanji.indicators.task.trajectory.helper.FileReadingHelper;
import com.wanji.indicators.task.trajectory.pojo.CarTrackInCrossRoadFlatMap;
import com.wanji.indicators.task.trajectory.pojo.CrossEventResult;
import com.wanji.indicators.task.trajectory.pojo.CrossExitInfo;
import com.wanji.indicators.task.trajectory.pojo.LineCongestion;
import com.wanji.indicators.task.trajectory.pojo.PhaseEmptyResult;
import com.wanji.indicators.task.trajectory.pojo.SingleCarInfo;
import com.wanji.indicators.util.ArithOfBigDecmial;
import com.wanji.indicators.util.PropertiesHelper;
import com.wanji.indicators.util.Tools;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/25 14:03
* @Description :
*/
@Slf4j
public class LineCongestionIndexMain {
public static final Map<String, Map<String, Object>> greenWaveMap = new ConcurrentHashMap<>();
public static final Properties properties = PropertiesHelper.getInstance().getProperties();
private static final PropertiesHelper instance = PropertiesHelper.getInstance();
static {
//绿波路段配置信息
String greenWaveInfo = properties.getProperty("green.wave.info");
if (StringUtils.isNotEmpty(greenWaveInfo)) {
String[] linesps = greenWaveInfo.split("@");
for (String line : linesps) {
String[] sps = line.split(",");
String greenId = sps[0];
String greenDir = sps[1];
Map<String, Object> greenMap = new HashMap<>();
greenMap.put("greenId", greenId);
greenMap.put("greenDir", greenDir);
String rids = sps[2];
String[] sps1 = StringUtils.split(rids, "_");
for (String rid : sps1) {
greenWaveMap.put(rid, greenMap);
}
}
}
}
public static void main(String[] args) {
try {
//获取配置文件中的kafka消费topic
String sourceTopic = properties.getProperty("consumer.topic");
String kafkaServerAddress = properties.getProperty("bootstrap.servers");
String lineSinkTopic = properties.getProperty("line.congestion.sink.topic");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(kafkaServerAddress)
.setTopics(sourceTopic)
.setGroupId(properties.getProperty("consumer.group.id") + "-car-trace-test")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-car-trajectory-data-source");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("轨迹帧数据-JsonToObject");
SingleOutputStreamOperator<CarTrackModel> carTrackModelStream =
frameModelStream.
flatMap(new CarTrackFlatMap(false))
.filter(o -> VehicleTypeEnum.isMotorVehicles(o.getOriginalType().shortValue()))
.setParallelism(1)
.name("轨迹帧数据解析-ToCarTrackModel");
SingleOutputStreamOperator<CarTrackModel> joinFreeFlowStream = carTrackModelStream
//.connect(env.addSource(new AreaFreeSpeedSource()).broadcast())
.flatMap(new RidFreeSpeedCoFlatMap())
.setParallelism(1)
.name("车道属性数据绑定");
SingleOutputStreamOperator<CarTrackModel> joinLaneStream = joinFreeFlowStream
.flatMap(new BoundCarRoadNetInfoCoFlatMap())
.filter(o -> o.getRoadnet().getInCrossFlag() == 1 || (o.getRoadnet().getLaneType() != null && o.getRoadnet().getLaneType() == 1))
.setParallelism(1)
.name("车道扩展属性数据绑定");
//单车行程时间计算:根据车辆上报时间每隔1分钟统计10分钟的数据
SingleOutputStreamOperator<SingleCarInfo> singleCarStatStream = joinLaneStream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(o -> o.getRoadnet().getRid() + o.getId())
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.process(new ProcessWindowFunction<CarTrackModel, SingleCarInfo, String, TimeWindow>() {
@Override
public void process(String id, Context context, Iterable<CarTrackModel> iterable, Collector<SingleCarInfo> collector) throws Exception {
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
List<CarTrackModel> models = StreamSupport.stream(iterable.spliterator(), false)
.collect(Collectors.toList());
List<CarTrackModel> sorted = models.stream()
.sorted(Comparator.comparing(CarTrackModel::getGlobalTimeStamp))
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(sorted) && sorted.size() > 1) {
CarTrackModel start = sorted.get(0);
CarTrackModel end = sorted.get(sorted.size() - 1);
//单车旅行时间
if (start.getRoadnet().getLaneType() != null && start.getRoadnet().getLaneType() == 1 && end.getRoadnet().getInCrossFlag() == 1) {
SingleCarInfo singleCarInfo = new SingleCarInfo();
singleCarInfo.setId(end.getId());
singleCarInfo.setCrossId(end.getRoadnet().getCrossId());
singleCarInfo.setRid(end.getRoadnet().getRid());
singleCarInfo.setLaneId(end.getRoadnet().getLaneId());
singleCarInfo.setSegmentId(end.getRoadnet().getSegmentId());
singleCarInfo.setStartTime(start.getGlobalTimeStamp());
singleCarInfo.setEndTime(end.getGlobalTimeStamp());
//上游路口到达该路口进口车道的通行时间
singleCarInfo.setTransitTime(end.getGlobalTimeStamp() - start.getGlobalTimeStamp());
singleCarInfo.setWindowStartTime(windowStart);
singleCarInfo.setWindowEndTime(windowEnd);
singleCarInfo.setRidLength(end.getRoadnet().getRidLength());
//秒
Double freeTime = ArithOfBigDecmial.div(end.getRoadnet().getRidLength(), end.getRoadnet().getFreeSpeed() / 3.6);
singleCarInfo.setFreeTime(freeTime);
Map<String, Object> greenInfo = greenWaveMap.get(singleCarInfo.getRid());
if (greenInfo != null) {
singleCarInfo.setGreenWaveId(Integer.valueOf(greenInfo.get("greenId").toString()));
singleCarInfo.setGreenWaveDir(Integer.valueOf(greenInfo.get("greenDir").toString()));
collector.collect(singleCarInfo);
}
}
}
}
}).setParallelism(2).name("统计每辆车通过路口的通行时间");
//对同一窗口的统计数据进行处理
SingleOutputStreamOperator<LineCongestion> trafficIndexStream = singleCarStatStream
.keyBy(o -> o.getGreenWaveId() + "_" + o.getGreenWaveDir() + "_" + o.getStartTime())
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction<SingleCarInfo, LineCongestion, String, TimeWindow>() {
@Override
public void process(String aLong, Context context, Iterable<SingleCarInfo> elements, Collector<LineCongestion> out) throws Exception {
//从列表中获取所有统计数据
List<SingleCarInfo> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList());
String[] sps = aLong.split("_");
String waveId = sps[0];
String dir = sps[1];
String time = sps[2];
AtomicDouble atomicDoubleIndex = new AtomicDouble();
AtomicDouble atomicDoubleTime = new AtomicDouble();
AtomicDouble atomicDoubleLength = new AtomicDouble();
//干线子路段分组
Map<String, List<SingleCarInfo>> groupByRid = list.stream().collect(Collectors.groupingBy(o -> o.getRid()));
Set<String> ridList1 = new HashSet<>();
for (String gk : greenWaveMap.keySet()) {
Map<String, Object> map = greenWaveMap.get(gk);
String gdir = Tools.getMapValue("dir", map);
if (Objects.equals(dir, gdir)) {
ridList1.add(gk);
}
}
Set<String> ridList2 = groupByRid.keySet();
ridList1.retainAll(ridList2);
//无指标数据rid
for (String rid : ridList1) {
atomicDoubleIndex.addAndGet(1);
}
groupByRid.forEach((k1, v1) -> {
double avgTransitTime = v1.stream().mapToDouble(SingleCarInfo::getTransitTime).average().orElse(0) / 1000;
double freeTime = v1.get(0).getFreeTime();
double ridIndex = avgTransitTime / freeTime;
atomicDoubleIndex.addAndGet(ridIndex);
atomicDoubleTime.addAndGet(avgTransitTime);
atomicDoubleLength.addAndGet(v1.get(0).getRidLength());
log.info("路段交通指数rid:{},index:{}", k1, ridIndex);
});
double index = atomicDoubleIndex.get() / (groupByRid.size()+ridList1.size());
if (index < 1) {
index = 1;
}
LineCongestion lineCongestion = new LineCongestion();
lineCongestion.setRids(ridList2);
lineCongestion.setIndex(index);
lineCongestion.setTransitTime(atomicDoubleTime.get());
String congestionType = null;
if (index <= 1.5) {
congestionType = CongestEnum.NO_CONGEST.getCode();
} else if (index <= 2.0) {
congestionType = CongestEnum.LIGHT_CONGEST.getCode();
} else if (index <= 3.0) {
congestionType = CongestEnum.MODERATE_CONGEST.getCode();
} else {
congestionType = CongestEnum.HEAVY_CONGEST.getCode();
}
lineCongestion.setCongestionType(congestionType);
lineCongestion.setGreenWaveId(Integer.valueOf(sps[0]));
lineCongestion.setGreenWaveDir(Integer.valueOf(sps[1]));
lineCongestion.setSpeed(atomicDoubleLength.get() / lineCongestion.getTransitTime() * 3.6); //km/h
String value = JSONObject.toJSONString(lineCongestion);
log.info("干线交通指数:{}\r\n", value);
out.collect(lineCongestion);
}
})
.setParallelism(1).name("同窗口内每个路口的车辆通行时间");
SingleOutputStreamOperator<String> lineCongestionResult = trafficIndexStream
.flatMap(new FlatMapFunction<LineCongestion, String>() {
@Override
public void flatMap(LineCongestion result, Collector<String> collector) {
String value = JSONObject.toJSONString(result);
collector.collect(value);
}
})
.setParallelism(1).name("将干线指标转成json字符串");
lineCongestionResult.addSink(new FlinkKafkaProducer<String>(
kafkaServerAddress,
lineSinkTopic,
new SimpleStringSchema()
)).setParallelism(1).name("将干线指标发送至kafka");
env.execute("计算干线指标");
} catch (Exception e) {
e.printStackTrace();
}
}
}
...@@ -2,11 +2,18 @@ package com.wanji.indicators.task.trajectory.pojo; ...@@ -2,11 +2,18 @@ package com.wanji.indicators.task.trajectory.pojo;
import lombok.Data; import lombok.Data;
import java.io.Serializable;
import java.util.Set;
/** /**
* 干线拥堵事件 * 干线拥堵事件
*/ */
@Data @Data
public class LineCongestion { public class LineCongestion implements Serializable {
private Integer greenWaveId;
private Integer greenWaveDir;
private Double index; private Double index;
...@@ -28,4 +35,10 @@ public class LineCongestion { ...@@ -28,4 +35,10 @@ public class LineCongestion {
// 检测时间 // 检测时间
private Long detectTime; private Long detectTime;
//拥堵状态
private String congestionType;
private Double speed;
private Set<String> rids;
} }
...@@ -34,4 +34,12 @@ public class SingleCarInfo { ...@@ -34,4 +34,12 @@ public class SingleCarInfo {
private Long windowStartTime; private Long windowStartTime;
private Long windowEndTime; private Long windowEndTime;
//绿波ID
private Integer greenWaveId;
//绿波方向 0正向 1方向
private Integer greenWaveDir;
//rid自由流速度
private Double freeTime;
//rid length
private Double ridLength;
} }
...@@ -169,4 +169,6 @@ citybrain.minidata.sink.topic=MatchResultMiniDataCityBrain ...@@ -169,4 +169,6 @@ citybrain.minidata.sink.topic=MatchResultMiniDataCityBrain
citybrain.snapshot.sink.topic=JN050001LaneSnapshotDataCityBrain citybrain.snapshot.sink.topic=JN050001LaneSnapshotDataCityBrain
#\u81EA\u5B9A\u4E49\u884C\u4EBA\u8F66\u9053\u7F16\u53F7 #\u81EA\u5B9A\u4E49\u884C\u4EBA\u8F66\u9053\u7F16\u53F7
consume.pedestrian.baseLaneInfo=13NF80B5QN0-11,13NF80B5QN0-12,13NF80B5QN0-13,13NF80B5QN0-14,13NGH0B5RC0-21,13NGH0B5RC0-22,13NGH0B5RC0-23,13NGH0B5RC0-24,13NI00B5RM0-31,13NI00B5RM0-32 consume.pedestrian.baseLaneInfo=13NF80B5QN0-11,13NF80B5QN0-12,13NF80B5QN0-13,13NF80B5QN0-14,13NGH0B5RC0-21,13NGH0B5RC0-22,13NGH0B5RC0-23,13NGH0B5RC0-24,13NI00B5RM0-31,13NI00B5RM0-32
line.congestion.sink.topic="line_congestion_sink" line.congestion.sink.topic=line_congestion_sink
#\u7EFF\u6CE2ID\uFF0Cdir(0\u6B63\u5411,1\u53CD\u5411),\u5173\u8054RID1_\u5173\u8054RID2_...@
green.wave.info=1,0,13NI00B5RM013NGH0B5RC00_13NGH0B5RC013NF80B5QN00@1,1,13NF80B5QN013NGH0B5RC00_13NGH0B5RC013NI00B5RM00
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment