Commit 020a1107 authored by zhoushiguang's avatar zhoushiguang

北航合作项目-上地西路流量提供

parent 67593414
......@@ -32,13 +32,13 @@ public class OfflineEventTestMain {
env.setParallelism(1);
//加载配置项 并获取工作路径
String inPath = "d:/flink/workspace";
String inPath = "D:\\WanJi-Work\\07 项目\\02 济南\\数据";
String outPath = "d:/flink/out/";
String startTime = "2024-02-25_08:30:00";
String endTime = "2024-02-25_09:00:00";
startTime = "2023-10-19_07:10:00";
endTime = "2023-10-19_08:00:00";
startTime = "2024-03-11_07:40:27";
endTime = "2024-03-11_07:50:27";
//读取离线数据
DataStreamSource<String> streamSource = env.addSource(new DebugSourceGlobalFrameDtFile(inPath,startTime,endTime));
......@@ -71,7 +71,7 @@ public class OfflineEventTestMain {
boolean isDebug = true;
try {
// ConflictEventMain.init(env,outPath,isDebug).run(joinLaneStream);
AccidentEventMainNew.init(env,outPath,false).run(joinLaneStream,null);
AccidentEventMainNew.init(env,outPath,isDebug).run(joinLaneStream,null);
env.execute("事故事件测试");
} catch (Exception e) {
log.error("事件测试 - 计算任务异常 : " + e);
......
package com.wanji.indicators.model;
import com.vividsolutions.jts.geom.Geometry;
import com.wanji.indicators.entity.BaseCrossTurnInfo;
import lombok.Data;
......@@ -60,6 +61,7 @@ public class CarTrackModel implements Serializable {
private String rid;
/*车道ID*/
private String laneId;
private Integer laneNo;
/*渠化ID*/
private String segmentId;
/*是否在路口范围内*/
......@@ -87,6 +89,8 @@ public class CarTrackModel implements Serializable {
private double ridLength;
/*绑定的转向关系*/
private List<BaseCrossTurnInfo> turnRelationList;
private double laneLength;
private Geometry geometry;
}
/***********扩展属性*********************************/
......
......@@ -38,7 +38,8 @@ public class OfflineTestIndicatorTaskMain {
public static void main(String[] args) {
//初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
PedestrianDataCache.getInstance();
env.setParallelism(1);
......@@ -53,8 +54,8 @@ public class OfflineTestIndicatorTaskMain {
ParameterTool parameter = ParameterTool.fromArgs(args);
inPath = parameter.get("inPath","D:\\WanJi-Work\\07 项目\\02 济南\\数据");
outPath = parameter.get("outPath","d:/flink/out/");
startTime = parameter.get("startTime","2024-01-02_08:00:00");
endTime = parameter.get("endTime","2024-01-02_11:00:00");
startTime = parameter.get("startTime","2024-04-18_08:00:00");
endTime = parameter.get("endTime","2024-04-18_08:10:00");
// inPath = "d:/flink/workspace";
//读取离线数据
......@@ -80,15 +81,15 @@ public class OfflineTestIndicatorTaskMain {
try {
SingleOutputStreamOperator<CarTrackModel> joinLaneStream = carTrackModelStream
.connect(env.addSource(new AreaFreeSpeedSource()).broadcast())
.flatMap(new RidFreeSpeedCoFlatMap())
.setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定");
// SingleOutputStreamOperator<CarTrackModel> joinLaneStream = carTrackModelStream
// .connect(env.addSource(new AreaFreeSpeedSource()).broadcast())
// .flatMap(new RidFreeSpeedCoFlatMap())
// .setParallelism(1)
// .name("全域轨迹帧数据-车道属性数据绑定");
//路段路况
RidTrafficStateMain.init(env, outPath,false).run(joinLaneStream);
// RidTrafficStateMain.init(env, outPath,false).run(joinLaneStream);
// CrossTurnIndicatorMainNew.init(env,outPath,false).run(crossDataStream);
CrossTurnIndicatorMainNew.init(env,outPath,true).run(crossDataStream);
// LaneNonMotorAnalysisMainNew.init(env,outPath,true).run(carTrackModelStream);
//批量轨迹封装
// TrackStoreMainNew.init(env,outPath,true).run(carTrackModelStream);
......
......@@ -6,6 +6,9 @@ package com.wanji.indicators.task.beihang;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.Point;
import com.vividsolutions.jts.index.strtree.STRtree;
import com.wanji.indicators.constant.TurnEnum;
import com.wanji.indicators.entity.BaseCrossTurnInfo;
import com.wanji.indicators.entity.BaseLaneInfo;
......@@ -13,8 +16,10 @@ import com.wanji.indicators.mapper.BaseCrossTurnInfoMapper;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.service.BaseLaneInfoService;
import com.wanji.indicators.service.impl.BaseLaneInfoServiceImpl;
import com.wanji.indicators.util.GeomsConvertUtil;
import com.wanji.indicators.util.PropertiesHelper;
import com.wanji.indicators.util.PtInPolyUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
......@@ -35,18 +40,20 @@ import java.util.stream.Collectors;
@Component
@Slf4j
public class BaseGisLaneCache {
private static BaseGisLaneCache instance;
static PropertiesHelper property = PropertiesHelper.getInstance();
static STRtree rtree = new STRtree();
private static Map<String, CarTrackModel.RoadNet> roadnetMap = new ConcurrentHashMap<>();
private BaseGisLaneCache() {
}
public static synchronized BaseGisLaneCache getInstance() {
public static BaseGisLaneCache getInstance() {
if (instance == null) {
instance = new BaseGisLaneCache();
......@@ -67,7 +74,7 @@ public class BaseGisLaneCache {
private Map<String, String> loadLaneData(String file) {
Map<String, String> retMap = new HashMap<>();
log.info("加载车道数据信息,路径:{},数据:{}", file);
try (InputStream inputStream = PropertiesHelper.class.getResourceAsStream(file);
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, Charset.defaultCharset()));
) {
......@@ -89,6 +96,7 @@ public class BaseGisLaneCache {
Integer ridDir = propJsonObj.getIntValue("dir");
String rid = crossId + "_" + ridDir;
String laneId = propJsonObj.getString("id");
int laneNo = propJsonObj.getIntValue("laneNo");
String turn = propJsonObj.getString("turntype");
if (StringUtils.isEmpty(turn)){
turn = "2";
......@@ -113,8 +121,6 @@ public class BaseGisLaneCache {
}
}
}
try {
JSONObject centerLineJson = propJsonObj.getJSONObject("leftBoundaryLine");
JSONArray centerLineArray = centerLineJson.getJSONArray("geometry");
......@@ -138,6 +144,8 @@ public class BaseGisLaneCache {
String centerLineWkt = stringBuilder1.toString();
String polygonWkt = stringBuilder.toString();
Geometry polygon = GeomsConvertUtil.genGeometry(polygonWkt);
String laneStartPoint = startPoint;
Double fx = Double.parseDouble(laneStartPoint.split(",")[0]);
Double fy = Double.parseDouble(laneStartPoint.split(",")[1]);
......@@ -150,6 +158,7 @@ public class BaseGisLaneCache {
CarTrackModel.RoadNet roadNet = new CarTrackModel.RoadNet();
roadNet.setLaneId(laneId);
roadNet.setLaneNo(laneNo);
roadNet.setCrossId(crossId);
roadNet.setRid(rid);
roadNet.setRidDir8(ridDir);
......@@ -161,7 +170,9 @@ public class BaseGisLaneCache {
if (roadNet.getLaneType() == 2) {
roadNet.setInCrossFlag(1);
}
roadnetMap.put(polygonWkt, roadNet);
roadNet.setGeometry(polygon);
rtree.insert(polygon.getEnvelopeInternal(), roadNet);
} catch (Exception e) {
e.printStackTrace();
......@@ -175,6 +186,23 @@ public class BaseGisLaneCache {
}
public CarTrackModel.RoadNet queryRoadNet(String pxy) {
Point point = GeomsConvertUtil.createPoint(pxy) ;
List<CarTrackModel.RoadNet> results = rtree.query(point.getEnvelopeInternal());
for (CarTrackModel.RoadNet roadNet : results) {
Geometry geometry = roadNet.getGeometry();
if (geometry != null) {
boolean isWithin = point.within(geometry);
if (isWithin)
return roadNet;
}
}
return null;
}
public Map<String, CarTrackModel.RoadNet> getRoadnetMap() {
return roadnetMap;
......
......@@ -2,6 +2,7 @@ package com.wanji.indicators.task.beihang;
import com.wanji.indicators.cache.PedestrianDataCache;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.task.laneIndicators.CrossLaneIndicatorMainNew;
import com.wanji.indicators.task.turnIndicators.CrossTurnIndicatorMainNew;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
......@@ -20,10 +21,6 @@ public class OfflineBeiHangTaskMain {
//初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
PedestrianDataCache.getInstance();
env.setParallelism(1);
//加载配置项 并获取工作路径
String inPath = "d:/flink/workspace";
String outPath = "d:/flink/out/";
......@@ -34,14 +31,13 @@ public class OfflineBeiHangTaskMain {
ParameterTool parameter = ParameterTool.fromArgs(args);
inPath = parameter.get("inPath","D:\\WanJi-Work\\07 项目\\02 济南\\数据");
outPath = parameter.get("outPath","d:/flink/out/");
startTime = parameter.get("startTime","2024-04-18_08:00:00");
endTime = parameter.get("endTime","2024-04-18_08:10:00");
// inPath = "d:/flink/workspace";
int parallelism = parameter.getInt("parallelism",1);
env.setParallelism(parallelism);
//读取离线数据
DataStreamSource<CrossFrameModel> streamSource = env.addSource(new BeiHangTrackFrameDateFile());
DataStreamSource<CrossFrameModel> streamSource = env.addSource(new BeiHangTrackFrameDateFile(inPath));
try {
CrossTurnIndicatorMainNew.init(env,outPath,true).run(streamSource);
CrossLaneIndicatorMainNew.init(env,outPath,true).run(streamSource);
env.execute("离线调试测试");
} catch (Exception e) {
log.error("事件测试 - 计算任务异常 : " + e);
......
package com.wanji.indicators.task.laneIndicators.func;
import com.wanji.indicators.constant.TimeGranularityEnum;
import com.wanji.indicators.entity.BaseCrossTurnInfo;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.task.laneIndicators.model.CrossLaneResultModel;
import com.wanji.indicators.task.laneIndicators.model.LaneResultModel;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.GeomsConvertUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
public class LaneDataProcessWindow extends ProcessWindowFunction<CrossFrameModel, CrossLaneResultModel, String, TimeWindow> {
private static final Logger log = LoggerFactory.getLogger(LaneDataProcessWindow.class);
@Override
public void open(Configuration parameters) {
//mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("mapStateAccident_Area_Indicators", String.class, CarTrackModel.class));
}
@Override
public void process(String groupKey, Context context, Iterable<CrossFrameModel> elements, Collector<CrossLaneResultModel> out) throws Exception {
List<CrossFrameModel> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList());
List<CarTrackModel> trackList = new ArrayList<>();
for (CrossFrameModel crossFrameModel : list) {
trackList.addAll(crossFrameModel.getTrackList());
}
Map<String, List<CarTrackModel>> groupById = trackList.stream().collect(Collectors.groupingBy(o -> o.getRoadnet().getLaneId()));
//=====================================================================================//
long windowStartTime = context.window().getStart();
long windowEndTime = context.window().getEnd();
String startTime = DateUtil.toDateTime(windowStartTime, DateUtil.YYYY_MM_DD_HH_MM_SS);
String endTime = DateUtil.toDateTime(windowEndTime, DateUtil.YYYY_MM_DD_HH_MM_SS);
Map<String, LaneResultModel> turnResultModelMap = new HashMap<>();
for (Map.Entry<String, List<CarTrackModel>> entry : groupById.entrySet()) {
List<CarTrackModel> value = entry.getValue();
/***************************计算过停止线车辆数************************************************************/
long count = 0;//value.stream().map(o->o.getId()).distinct().count();
Map<Integer, List<CarTrackModel>> groupByTrackId = value.stream().collect(Collectors.groupingBy(o -> o.getId()));
for (Map.Entry<Integer, List<CarTrackModel>> entry1 : groupByTrackId.entrySet()) {
List<CarTrackModel> value1 = entry1.getValue();
value1 = value1.stream().sorted(Comparator.comparing(o -> o.getGlobalTimeStamp())).collect(Collectors.toList());
double laneLength = value1.get(0).getRoadnet().getLaneLength();
for (CarTrackModel carTrackModel : value1) {
String laneStartPoint = carTrackModel.getRoadnet().getLaneStartPoint();
String carPoint = carTrackModel.getLongitude() + "," + carTrackModel.getLatitude();
double distance = GeomsConvertUtil.getDistance(laneStartPoint, carPoint);
if (distance >= (laneLength+1)) {
//过停止线开始计数
count++;
break;
}
}
}
/******************************************************************************************************/
CarTrackModel carTrackModel = value.get(0);
LaneResultModel resultModel = turnResultModelMap.get(carTrackModel.getRoadnet().getLaneId());
if (resultModel == null) {
resultModel = new LaneResultModel();
}
resultModel.setLaneId(carTrackModel.getRoadnet().getLaneId());
resultModel.setLaneNo(carTrackModel.getRoadnet().getLaneNo() + "");
resultModel.setInRid(carTrackModel.getRoadnet().getRid());
resultModel.setInRidDir(carTrackModel.getRoadnet().getRidDir8());
resultModel.setTurnDirNo(carTrackModel.getRoadnet().getTurn());
resultModel.setFlow((int) count);
turnResultModelMap.put(carTrackModel.getRoadnet().getLaneId(), resultModel);
}
//输出流量
List<LaneResultModel> outList = new ArrayList<>(turnResultModelMap.values());
if (!list.isEmpty()) {
CrossLaneResultModel crossTurnResultModel = new CrossLaneResultModel();
crossTurnResultModel.setStartTime(startTime);
crossTurnResultModel.setEndTime(endTime);
crossTurnResultModel.setTimestamp(context.window().getStart());
crossTurnResultModel.setCrossId(groupKey);
crossTurnResultModel.setLaneDataList(outList);
crossTurnResultModel.setGranularity(TimeGranularityEnum.FIVE_MINUTE.getType());
out.collect(crossTurnResultModel);
}
}
}
package com.wanji.indicators.task.laneIndicators.model;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import java.io.Serializable;
import java.util.List;
@Data
@EqualsAndHashCode(callSuper = false)
@ToString
public class CrossLaneResultModel implements Serializable {
private String crossId;
//周期开始时间
private String startTime;
//周期截止时间
private String endTime;
private Long timestamp;
private List<LaneResultModel> laneDataList;
private String granularity;
}
package com.wanji.indicators.task.laneIndicators.model;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@Data
@EqualsAndHashCode(callSuper = false)
@ToString
public class LaneResultModel implements Serializable {
private String inRid;
private Integer inRidDir;
private String laneId;
private String laneNo;
private int turnDirNo;
private int flow;
private double speed;
}
......@@ -10,7 +10,9 @@ import com.wanji.indicators.sink.JsonStringFileSink;
import com.wanji.indicators.task.turnIndicators.func.TurnDataPeriodProcessWindow;
import com.wanji.indicators.task.turnIndicators.func.TurnDataProcessWindow;
import com.wanji.indicators.task.turnIndicators.model.CrossTurnResultModel;
import com.wanji.indicators.task.turnIndicators.model.TurnResultModel;
import com.wanji.indicators.task.turnIndicators.sink.TurnIndicationSinkFunction;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
......@@ -80,7 +82,6 @@ public class CrossTurnIndicatorMainNew {
SingleOutputStreamOperator<CrossFrameModel> joinLaneStream = carTrackModelStream
.flatMap(new RefBaseRidInfoCoFlatMap())
.setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定");
SingleOutputStreamOperator<CrossFrameModel> filterStream =
......@@ -107,7 +108,7 @@ public class CrossTurnIndicatorMainNew {
//log.info("路口ID:{}, 数据帧时间:{},路口内机动车数量:{}",value.getCrossId(), DateUtil.toDateTime(value.getGlobalTimeStamp(),"yyyy-MM-dd HH:mm:ss.SSS"),trackFilterList.size());
}
}).setParallelism(1).name("筛选在路口内的机动车");
}).name("筛选在路口内的机动车");
SingleOutputStreamOperator<CrossTurnResultModel> processStream = filterStream
......@@ -117,7 +118,7 @@ public class CrossTurnIndicatorMainNew {
.keyBy(o -> o.getCrossId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new TurnDataProcessWindow())
.setParallelism(1).name("转向指标计算");
.name("转向指标计算");
SingleOutputStreamOperator<String> toJsonStream = processStream.flatMap(new FlatMapFunction<CrossTurnResultModel, String>() {
......@@ -125,7 +126,7 @@ public class CrossTurnIndicatorMainNew {
public void flatMap(CrossTurnResultModel value, Collector<String> out) throws Exception {
out.collect(JSONObject.toJSONString(value));
}
}).setParallelism(1).name("转向指标转JSON");
}).name("转向指标转JSON");
if (!isDebug) {
KafkaSink<String> sink = KafkaSink.<String>builder()
......@@ -136,9 +137,10 @@ public class CrossTurnIndicatorMainNew {
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
.build()
;
toJsonStream.sinkTo(sink).name("转向指标-数据发送至kafka").setParallelism(1);
toJsonStream.sinkTo(sink).name("转向指标-数据发送至kafka");
// processStream.addSink(new TurnIndicationSinkFunction()).name("转向指标-5分钟周期流量插入更新到mysql表");
......@@ -176,8 +178,22 @@ public class CrossTurnIndicatorMainNew {
// process60mStream.addSink(new TurnIndicationSinkFunction()).name("转向指标-60分钟周期流量插入更新到mysql表");
} else {
String path = this.path + "analysis.turn.indicators.json";
toJsonStream.addSink(new JsonStringFileSink(path)).setParallelism(1).name("转向指标-结果输出至文件");
SingleOutputStreamOperator<String> toCsvStream = processStream.flatMap(new FlatMapFunction<CrossTurnResultModel, String>() {
@Override
public void flatMap(CrossTurnResultModel value, Collector<String> out) throws Exception {
List<TurnResultModel> list = value.getTurnDataList();
for (TurnResultModel model : list) {
String line = value.getCrossId()+",,"+model.getInRid()+","+model.getInRidDir()+","+model.getOutRid()+","+
model.getTurnDirNo()+","+model.getFlow()+",5m,"+value.getStartTime()+","+value.getEndTime()+","+
DateUtil.dateFormat(value.getStartTime(),"yyyyMMdd")+","+DateUtil.getNowTime("yyyy-MM-dd HH:mm:ss");
out.collect(line);
}
}
}).name("转向指标转JSON");
String path = this.path + "analysis.turn.indicators.csv";
//toJsonStream.addSink(new JsonStringFileSink(path)).setParallelism(1).name("转向指标-结果输出至文件");
toCsvStream.addSink(new JsonStringFileSink(path)).setParallelism(1).name("转向指标-结果输出至文件");
}
} catch (Exception e) {
log.error("机动车指标计算任务异常 : " + e);
......
......@@ -50,6 +50,7 @@ public class CommonUtil {
Double ey = Double.parseDouble(laneEndPoint.split(",")[1]);
double laneAngle = PtInPolyUtil.getAngle(fx, fy, ex, ey);
double laneLength = GeomsConvertUtil.getDistance(laneStartPoint,laneEndPoint);
//2进口道 3出口道
Integer type = baseLaneInfo.getType();
carTrack.getRoadnet().setLaneType(type);
......@@ -59,6 +60,7 @@ public class CommonUtil {
carTrack.getRoadnet().setRidDir8(ridDir);
carTrack.getRoadnet().setLaneAngle(laneAngle);
carTrack.getRoadnet().setCrossId(baseLaneInfo.getCrossId());
carTrack.getRoadnet().setLaneLength(laneLength);
//出口道对应的哪些进口转向可以进入
if (type.equals(3)) {
List<BaseCrossTurnInfo> turnRelationList = turnRelationMap.get(carTrack.getRoadnet().getRid());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment