Commit 5d7cdb3a authored by zhoushiguang's avatar zhoushiguang

计算程序优化

parent a4e338c1
......@@ -119,9 +119,9 @@ public class AreaIndicatorMain {
//区域指数相关
AreaIndexAnalysisMainNew.init(env, "indexName",false).run(joinLaneStream);
//行人非机动车数量对机动车速度影响
//RidParticipantAnalysisMainNew.init(env,null,false).run(joinLaneStream);;
//区域参与者类别数量统计
LaneNonMotorAnalysisMainNew.init(env,null,false).run(carTrackModelStream);
//RidParticipantAnalysisMainNew.init(env,null,false).run(joinLaneStream);
//非机动车参与者类别数量统计
LaneNonMotorAnalysisMainNew.init(env,null,false).run(joinLaneStream);
//路段路况
RidTrafficStateMain.init(env, null,false).run(joinLaneStream);
......
......@@ -4,8 +4,10 @@ import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.sink.JsonStringFileSink;
import com.wanji.indicators.task.area.service.func.LaneCarProcessWindow;
import com.wanji.indicators.task.area.service.func.RidOutProcessWindow;
import com.wanji.indicators.task.area.service.key.ExactCategoryKeySelector;
import com.wanji.indicators.task.area.service.model.LaneCarCountAnalysisOutputModel;
import com.wanji.indicators.task.area.service.model.ParticipantAnalysisOutputModel;
import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
......@@ -39,7 +41,7 @@ public class LaneNonMotorAnalysisMainNew {
private String path;
private int windowTime = 5;
private int windowTime = 15;
//本地调试设为true输出结果保存到文件
private boolean isDebug = false;
......@@ -60,20 +62,20 @@ public class LaneNonMotorAnalysisMainNew {
try {
// SingleOutputStreamOperator<CarTrackModel> nonMotorFilterStream =
// filterStream.
// filter(new FilterFunction<CarTrackModel>() {
// @Override
// public boolean filter(CarTrackModel value) throws Exception {
// return Objects.equals(value.getCategory(),2);
// }
// })
// .setParallelism(1)
// .name("车道非机动车数量计算-筛选非机动车轨迹");
SingleOutputStreamOperator<CarTrackModel> nonMotorFilterStream =
filterStream.
filter(new FilterFunction<CarTrackModel>() {
@Override
public boolean filter(CarTrackModel value) throws Exception {
return Objects.equals(value.getCategory(),2);
}
})
.setParallelism(1)
.name("车道非机动车数量计算-筛选非机动车轨迹");
SingleOutputStreamOperator<LaneCarCountAnalysisOutputModel> laneStream =
filterStream.assignTimestampsAndWatermarks(
nonMotorFilterStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(new ExactCategoryKeySelector())
......@@ -81,6 +83,17 @@ public class LaneNonMotorAnalysisMainNew {
.process(new LaneCarProcessWindow())
.name("车道非机动车数量计算-按车道分组统计");
// SingleOutputStreamOperator<ParticipantAnalysisOutputModel> areaStream =
// filterStream.assignTimestampsAndWatermarks(
// WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
// .withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
// .keyBy(o->(o.getRoadnet().getRid()))
// .window(TumblingEventTimeWindows.of(Time.minutes(this.windowTime)))
// //.aggregate(new AreaTravelAggregateFunction(),new AreaTravelProcessWindow())
// .process(new RidOutProcessWindow())
// .name("行人非机动车对机动车速度影响计算-按RID分组计算");
//最后一条记录存储到redis
//areaStream.addSink( new RedisSink<ParticipantAnalysisOutputModel>(new RedisConfig().getRedisConfig(), new TravelRedisMapper(Constant.RID_REALTIME_INDICATOR+"_10M")));
......
......@@ -67,33 +67,33 @@ public class RidParticipantAnalysisMainNew implements Serializable {
try {
SingleOutputStreamOperator<CarTrackModel> nonmotorFilterStream =
filterStream.
filter(new FilterFunction<CarTrackModel>() {
@Override
public boolean filter(CarTrackModel value) throws Exception {
return !Objects.equals(value.getCategory(),1);
}
})
.setParallelism(1)
.name("筛选非机动车轨迹");
// SingleOutputStreamOperator<CarTrackModel> nonmotorFilterStream =
// filterStream.
// filter(new FilterFunction<CarTrackModel>() {
// @Override
// public boolean filter(CarTrackModel value) throws Exception {
// return !Objects.equals(value.getCategory(),1);
// }
// })
// .setParallelism(1)
// .name("筛选非机动车轨迹");
// SingleOutputStreamOperator<ParticipantAnalysisMidResultModel> groupByPlateStream =
// nonmotorFilterStream.assignTimestampsAndWatermarks(
// WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// .withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
// .keyBy(o->(o.getRoadnet().getRid()+"_"+o.getCategory()))
// .window(TumblingEventTimeWindows.of(Time.minutes(this.windowTime)))
// //.aggregate(new TravelAggregateFunction(),new TravelProcessWindow())
// .process(new RidCarProcessWindow())
// .name("行人非机动车对机动车速度影响计算-按RID_参与者类别分组");
SingleOutputStreamOperator<ParticipantAnalysisMidResultModel> groupByPlateStream =
nonmotorFilterStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(5))
SingleOutputStreamOperator<ParticipantAnalysisOutputModel> areaStream =
filterStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(o->(o.getRoadnet().getRid()+"_"+o.getCategory()))
.keyBy(o->(o.getRoadnet().getRid()))
.window(TumblingEventTimeWindows.of(Time.minutes(this.windowTime)))
//.aggregate(new TravelAggregateFunction(),new TravelProcessWindow())
.process(new RidCarProcessWindow())
.name("行人非机动车对机动车速度影响计算-按RID_参与者类别分组");
SingleOutputStreamOperator<ParticipantAnalysisOutputModel> areaStream =
groupByPlateStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<ParticipantAnalysisMidResultModel>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()))
.keyBy(o->(o.getRid()))
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//.aggregate(new AreaTravelAggregateFunction(),new AreaTravelProcessWindow())
.process(new RidOutProcessWindow())
.name("行人非机动车对机动车速度影响计算-按RID分组计算");
......@@ -109,12 +109,6 @@ public class RidParticipantAnalysisMainNew implements Serializable {
}).name("行人非机动车对机动车速度影响计算-转换为json");
if (!isDebug) {
// 初始化kafka生产者
// FlinkKafkaProducer producer = new FlinkKafkaProducer<>(p.getProperty("rid.participant.analysis.data.topic"),
// new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
// instance.getProducerProperties(), FlinkKafkaProducer.Semantic.NONE);
// //存储到kafka
// toJsonStream.addSink(producer).name("行人非机动车对机动车速度影响计算-数据发送至kafka").setParallelism(1);
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(p.getProperty("bootstrap.servers"))
......
......@@ -26,7 +26,7 @@ import java.util.stream.StreamSupport;
* @date 2023/4/6
* @description
*/
public class LaneCarProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneCarCountAnalysisOutputModel, Tuple5<String, String, String,Integer,Integer>, TimeWindow> {
public class LaneCarProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneCarCountAnalysisOutputModel, Tuple5<String, String, String, Integer, Integer>, TimeWindow> {
private static final Logger log = LoggerFactory.getLogger(LaneCarProcessWindow.class);
......@@ -38,7 +38,7 @@ public class LaneCarProcessWindow extends ProcessWindowFunction<CarTrackModel, L
}
@Override
public void process(Tuple5<String, String, String,Integer,Integer> key, Context context, Iterable<CarTrackModel> elements, Collector<LaneCarCountAnalysisOutputModel> out) throws Exception {
public void process(Tuple5<String, String, String, Integer, Integer> key, Context context, Iterable<CarTrackModel> elements, Collector<LaneCarCountAnalysisOutputModel> out) throws Exception {
long windowStartTs = context.window().getStart();
long windowEndTs = context.window().getEnd();
......@@ -59,24 +59,13 @@ public class LaneCarProcessWindow extends ProcessWindowFunction<CarTrackModel, L
outModel.setEndTimestamp(windowEndTs);
outModel.setStartTime(DateUtil.toDateTime(windowStartTs, "yyyy-MM-dd HH:mm:ss"));
outModel.setEndTime(DateUtil.toDateTime(windowEndTs, "yyyy-MM-dd HH:mm:ss"));
outModel.setAvgSpeed(this.getFrameAvgSpeed(list));
outModel.setAvgSpeed(CommonUtil.getFrameAvgSpeed(list));
List<Integer> idList = list.stream().map(CarTrackModel::getId).distinct().collect(Collectors.toList());
outModel.setIdList(idList);
out.collect(outModel);
}
}
private double getFrameAvgSpeed(List<CarTrackModel> list) {
//当前帧机动车平均速度,所有速度大于5km/h的速度进行排序,去掉低速10%,然后进行平均
List<Double> speedList = list.stream()
.filter(o -> o.getSpeed() > 5)
.sorted(Comparator.comparing(CarTrackModel::getSpeed))
.map(CarTrackModel::getSpeed).distinct()
.collect(Collectors.toList());
int startIndex = new BigDecimal(speedList.size() * 0.1).intValue();
int endIndex = (int) ArithOfBigDecmial.round(speedList.size() * 0.9, 0);
List<Double> subList = speedList.subList(startIndex, endIndex);
double avgSpeed = subList.stream().collect(Collectors.averagingDouble(Double::doubleValue));
return ArithOfBigDecmial.round(avgSpeed, 0);
}
}
package com.wanji.indicators.task.area.service.func;
import com.wanji.indicators.constant.VehicleTypeEnum;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.task.area.service.model.ParticipantAnalysisOutputModel;
import com.wanji.indicators.task.area.service.model.ParticipantAnalysisMidResultModel;
import com.wanji.indicators.util.CommonUtil;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
......@@ -12,9 +17,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
......@@ -24,30 +27,58 @@ import java.util.stream.StreamSupport;
* @description
*/
@Slf4j
public class RidOutProcessWindow extends ProcessWindowFunction<ParticipantAnalysisMidResultModel, ParticipantAnalysisOutputModel,String, TimeWindow> {
public class RidOutProcessWindow extends ProcessWindowFunction<CarTrackModel, ParticipantAnalysisOutputModel,String, TimeWindow> {
private Properties properties = new Properties();
public void open(Configuration parameters) throws Exception {
PropertiesHelper instance = PropertiesHelper.getInstance();
this.properties = instance.getProperties();
}
@Override
public void process(String key, Context context, Iterable<ParticipantAnalysisMidResultModel> elements, Collector<ParticipantAnalysisOutputModel> out) throws Exception {
public void process(String key, Context context, Iterable<CarTrackModel> elements, Collector<ParticipantAnalysisOutputModel> out) throws Exception {
long windowStartTs = context.window().getStart();
long windowEndTs = context.window().getEnd();
List<CarTrackModel> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList());
Map<Integer,List<CarTrackModel>> group = list.stream().collect(Collectors.groupingBy(o->o.getCategory()));
List<ParticipantAnalysisMidResultModel> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList());
Map<Integer,List<ParticipantAnalysisMidResultModel>> group = list.stream().collect(Collectors.groupingBy(o->o.getCategory()));
Integer[] categorys = new Integer[]{1,2,3};
Map<Integer,ParticipantAnalysisOutputModel> outMap = new HashMap<>();
for (Map.Entry<Integer,List<ParticipantAnalysisMidResultModel>> entry : group.entrySet()) {
for (Map.Entry<Integer,List<CarTrackModel>> entry : group.entrySet()) {
Integer category = entry.getKey();
List<ParticipantAnalysisMidResultModel> value = entry.getValue();
ParticipantAnalysisMidResultModel midModel = value.get(0) ;
List<CarTrackModel> value = entry.getValue();
long number = value.stream().map(CarTrackModel::getId).distinct().count();
ParticipantAnalysisOutputModel resultModel = new ParticipantAnalysisOutputModel();
BeanUtils.copyProperties(midModel,resultModel);
resultModel.setCrossId(value.get(0).getRoadnet().getCrossId());
resultModel.setRid(value.get(0).getRoadnet().getRid());
resultModel.setCategory(category);
resultModel.setNumber(number);
resultModel.setTimestamp(context.window().maxTimestamp());
resultModel.setStartTimestamp(windowStartTs);
resultModel.setEndTimestamp(windowEndTs);
resultModel.setStartTime(DateUtil.toDateTime(windowStartTs, "yyyy-MM-dd HH:mm:ss"));
resultModel.setEndTime(DateUtil.toDateTime(windowEndTs, "yyyy-MM-dd HH:mm:ss"));
resultModel.setNumber(midModel.getCount());
resultModel.setSpeed(midModel.getSpeed());
double avgSpeed = CommonUtil.getFrameAvgSpeed(value);
resultModel.setSpeed(avgSpeed);
outMap.put(category,resultModel);
Double freeSpeed = list.get(0).getRoadnet().getFreeSpeed();
if (Objects.isNull(freeSpeed)) {
freeSpeed = Double.parseDouble(this.properties.getProperty("rid.default.free.speed"));
}
double trafficIndex = 1;
if (avgSpeed != 0) {
trafficIndex = freeSpeed / avgSpeed;
}
resultModel.setTrafficIndex(trafficIndex);
outMap.put(category,resultModel);
}
ParticipantAnalysisOutputModel tmp = null;
for (Map.Entry<Integer,ParticipantAnalysisOutputModel> entry : outMap.entrySet()) {
......@@ -67,7 +98,5 @@ public class RidOutProcessWindow extends ProcessWindowFunction<ParticipantAnalys
out.collect(bean);
}
}
}
}
......@@ -4,6 +4,7 @@ import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
import java.util.List;
/**
* @author fengyi
......@@ -31,4 +32,6 @@ public class LaneCarCountAnalysisOutputModel implements Serializable {
private String crossId;
private String rid;
private String laneId;
private List<Integer> idList;
}
......@@ -84,7 +84,7 @@ public class SplitRoutePathFunction extends KeyedProcessFunction<Tuple3<String,
distance = ArithOfBigDecmial.round(distance, 2);
agoTrack.setRouteLength(distance + agoDistance);
//agoTrack.setCreateTime(System.currentTimeMillis());
agoTrack.setCreateTime(System.currentTimeMillis());
agoTrack.getRoadNets().add(carTrackModel.getRoadnet());
agoTrack.setLicenseColor(carTrackModel.getLicenseColor());
agoTrack.setOriginalColor(carTrackModel.getOriginalColor());
......@@ -229,7 +229,7 @@ public class SplitRoutePathFunction extends KeyedProcessFunction<Tuple3<String,
initModel.setPreTimestamp(carTrackModel.getGlobalTimeStamp());
initModel.setPreDateTime(DateUtil.toDateTime(carTrackModel.getGlobalTimeStamp(), "yyyy-MM-dd HH:mm:ss.SSS"));
//initModel.setCreateTime(System.currentTimeMillis());
initModel.setCreateTime(System.currentTimeMillis());
Double[] xy = new Double[]{carTrackModel.getLongitude(),carTrackModel.getLatitude()};
initModel.getPointList().add(xy);
......@@ -249,7 +249,7 @@ public class SplitRoutePathFunction extends KeyedProcessFunction<Tuple3<String,
while (iterator.hasNext()) {
Map.Entry<Tuple3<String, Integer, String>, RoutePathCarTrackModel> entry = iterator.next();
RoutePathCarTrackModel model = entry.getValue();
long timestamp = model.getStartGlobalTimeStamp();
long timestamp = model.getCreateTime();
if (System.currentTimeMillis() - timestamp > 60 * 60 * 1000) {
//缓存超过
expireKeys.add(entry.getKey());
......
......@@ -131,6 +131,6 @@ public class ArithOfBigDecmial {
}
public static void main(String[] args) {
System.out.println(DateUtil.stringToDate("2024-01-31 07:00:00","yyyy-MM-dd HH:mm:ss").getTime());
System.out.println(DateUtil.stringToDate("2024-01-31 17:00:00","yyyy-MM-dd HH:mm:ss").getTime());
}
}
......@@ -8,6 +8,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
......@@ -16,6 +17,24 @@ public class CommonUtil {
private static final Logger log = LoggerFactory.getLogger(CommonUtil.class);
/**
* 所有速度大于5km/h的速度进行排序,去掉低速10%,然后进行平均
* @param list
* @return
*/
public static double getFrameAvgSpeed(List<CarTrackModel> list) {
List<Double> speedList = list.stream()
.filter(o -> o.getSpeed() > 5)
.sorted(Comparator.comparing(CarTrackModel::getSpeed))
.map(CarTrackModel::getSpeed).distinct()
.collect(Collectors.toList());
int startIndex = new BigDecimal(speedList.size() * 0.1).intValue();
int endIndex = (int) ArithOfBigDecmial.round(speedList.size() * 0.9, 0);
List<Double> subList = speedList.subList(startIndex, endIndex);
double avgSpeed = subList.stream().collect(Collectors.averagingDouble(Double::doubleValue));
return ArithOfBigDecmial.round(avgSpeed, 0);
}
/**
* 验证灯态是否为绿
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment