Commit 2b0b1e67 authored by zhoushiguang's avatar zhoushiguang

待转区停车疑似事故误检问题解决

parent 7712dc51
......@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.io.ParseException;
import com.wanji.indicators.model.event.WaitingAreaDo;
import com.wanji.indicators.util.GeomsConvertUtil;
import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j;
......@@ -12,7 +13,9 @@ import org.apache.commons.lang3.StringUtils;
import java.io.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
......@@ -27,7 +30,10 @@ import java.util.concurrent.ConcurrentHashMap;
public class PedestrianDataCache {
//安全岛 key:crossId_key value:围栏坐标
private static Map<String, String> safetyIslandMap = new ConcurrentHashMap<>();
//人行横道
private static Map<String, String> crossingWalkMap = new ConcurrentHashMap<>();
//待转区车道面
private static Map<String, List<WaitingAreaDo>> waitingTurnAreaMap = new ConcurrentHashMap<>();
private static PedestrianDataCache instance;
......@@ -47,8 +53,10 @@ public class PedestrianDataCache {
}
String f1 = dataDirectory + "gis-aqd.geojson";
String f2 = dataDirectory + "gis-rxhd.geojson";
String f3 = dataDirectory + "gis-dzq.geojson";
instance.loadData(f1, 1);
instance.loadData(f2, 2);
instance.loadData(f3, 3);
}
return instance;
}
......@@ -75,6 +83,7 @@ public class PedestrianDataCache {
JSONObject propJsonObj = jsonObject1.getJSONObject("properties");
String crossId = propJsonObj.getString("lkid");
String rid = propJsonObj.getString("ldid");
String laneId = propJsonObj.getString("cdid");
JSONObject geometryJsonObj = jsonObject1.getJSONObject("geometry");
JSONArray coordinatesArray = geometryJsonObj.getJSONArray("coordinates");
......@@ -97,6 +106,21 @@ public class PedestrianDataCache {
instance.safetyIslandMap.put(crossId + "_" + rid, stringBuilder.toString());
} else if (type == 2) {
instance.crossingWalkMap.put(rid, stringBuilder.toString());
} else if (type == 3) {
String waitTurnDir = propJsonObj.getString("cdfx");
List<WaitingAreaDo> list = waitingTurnAreaMap.get(rid);
if (list == null) {
list = new ArrayList<>();
}
crossId = rid.substring(11,22);
WaitingAreaDo model = new WaitingAreaDo();
model.setCrossId(crossId);
model.setLaneId(laneId);
model.setRid(rid);
model.setTurnDir(waitTurnDir);
model.setGeomWkt(stringBuilder.toString());
list.add(model);
instance.waitingTurnAreaMap.put(rid, list);
}
}
}
......@@ -158,4 +182,9 @@ public class PedestrianDataCache {
}
return null;
}
public Map<String, List<WaitingAreaDo>> getWaitingTurnAreaMap() {
return waitingTurnAreaMap;
}
}
package com.wanji.indicators.event.cross.accident.funcnew;
import com.wanji.indicators.cache.PedestrianDataCache;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.constant.TurnEnum;
import com.wanji.indicators.constant.VehicleTypeEnum;
......@@ -7,6 +8,7 @@ import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.CrossRidTurnLampStatusModel;
import com.wanji.indicators.model.event.WaitingAreaDo;
import com.wanji.indicators.model.event.accident.AccidentFrameModel;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.GeomsConvertUtil;
......@@ -23,7 +25,6 @@ import java.util.Objects;
import java.util.stream.Collectors;
/**
*
* @auther fengyi
*/
@Slf4j
......@@ -42,6 +43,7 @@ public class AccidentFlatMap implements FlatMapFunction<CrossFrameModel, Acciden
if (configModel.getAbnormalParkIsOpen() == Constant.OPEN) {
for (CarTrackModel carTrack : trackList) {
String crossId = carTrack.getRoadnet().getCrossId();
String rid = carTrack.getRoadnet().getRid();
Integer ridDir = carTrack.getRoadnet().getRidDir8();
Integer laneFunction = carTrack.getRoadnet().getTurn();
String[] turns = TurnEnum.getLaneTurnValue(laneFunction);
......@@ -72,7 +74,21 @@ public class AccidentFlatMap implements FlatMapFunction<CrossFrameModel, Acciden
double stopLineDist = GeomsConvertUtil.getDistance(car.getLongitude(), car.getLatitude(), Double.parseDouble(sps[0]), Double.parseDouble(sps[1]));
car.setStopLineDist(stopLineDist);
list.add(car);
boolean isInWaitArea = false;
List<WaitingAreaDo> waitAreaList = PedestrianDataCache.getInstance().getWaitingTurnAreaMap().get(rid);
if (Objects.nonNull(waitAreaList)) {
for (WaitingAreaDo model : waitAreaList) {
String geom = model.getGeomWkt();
isInWaitArea = GeomsConvertUtil.withinGeo(carTrack.getLongitude() + "," + carTrack.getLatitude(), geom);
if (isInWaitArea) {
break;
}
}
}
//排除掉待转区车辆
if (!isInWaitArea) {
list.add(car);
}
}
}
......@@ -86,8 +102,8 @@ public class AccidentFlatMap implements FlatMapFunction<CrossFrameModel, Acciden
collector.collect(model);
}
}
}catch ( Exception e){
log.error("",e);
} catch (Exception e) {
log.error("", e);
}
}
......
......@@ -7,9 +7,7 @@ import com.wanji.indicators.model.event.congestion.CongestionFrameModel;
import com.wanji.indicators.model.event.congestion.CongestionModel;
import com.wanji.indicators.model.event.congestion.CongestionOverflowModel;
import com.wanji.indicators.source.ConsulAreaSource;
import com.wanji.indicators.source.CrossShapeSource;
import com.wanji.indicators.source.RidDirSource;
import com.wanji.indicators.source.WaitLaneSource;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
......@@ -83,14 +81,14 @@ public class CongestionEventMainNew implements Serializable {
.name("路口溢出-计算车辆在路口内的旅行时间");
//匹配待转区
SingleOutputStreamOperator<CongestionFrameModel> checkWaitLaneKeyedProcess = checkDataKeyedProcess
.connect(env.addSource(new WaitLaneSource()).broadcast())
.flatMap(new CongestionCheckWaitLaneCoFlatMap())
.setParallelism(1)
.name("路口溢出-匹配待转区");
// SingleOutputStreamOperator<CongestionFrameModel> checkWaitLaneKeyedProcess = checkDataKeyedProcess
// .connect(env.addSource(new WaitLaneSource()).broadcast())
// .flatMap(new CongestionCheckWaitLaneCoFlatMap())
// .setParallelism(1)
// .name("路口溢出-匹配待转区");
//匹配拥堵检测区域
SingleOutputStreamOperator<CongestionFrameModel> checkCongestionArea = checkWaitLaneKeyedProcess
SingleOutputStreamOperator<CongestionFrameModel> checkCongestionArea = checkDataKeyedProcess
.connect(env.addSource(new ConsulAreaSource()).broadcast())
.flatMap(new CongestionAreaCheckCoFlatMap())
.setParallelism(1)
......
......@@ -61,8 +61,8 @@ public class CrossTrafficEventMain {
// env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
// env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri")));
// env.setStateBackend(new FsStateBackend(properties.getProperty("check.point.uri")));
// env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
......
......@@ -8,6 +8,7 @@ import com.wanji.indicators.event.stream.func.RefBaseRidInfoCoFlatMap;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.source.LaneInfoSource;
import com.wanji.indicators.source.debug.DebugSourceGlobalFrame;
import com.wanji.indicators.source.debug.DebugSourceGlobalFrameDtFile;
import com.wanji.indicators.util.CommonUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
......@@ -31,17 +32,13 @@ public class OfflineEventTestMain {
env.setParallelism(1);
//加载配置项 并获取工作路径
String path = new CommonUtil().getWorkSpasePath();
String inPath = "d:/flink/workspace";
String outPath = "d:/flink/out/";
String osName = System.getProperties().getProperty("os.name");
if (StringUtils.isNotBlank(osName) && osName.contains("Windows")) {
//此处是 Windows 配置
} else {
path = "/opt/flink/workspace/";
}
String startTime = "2024-02-18 07:00:00";
String endTime = "2024-02-18 08:00:00";
//读取离线数据
DataStreamSource<String> streamSource = env.addSource(new DebugSourceGlobalFrame(path));
DataStreamSource<String> streamSource = env.addSource(new DebugSourceGlobalFrameDtFile(inPath,startTime,endTime));
SingleOutputStreamOperator<CrossFrameModel> crossDataStream =
streamSource
......
......@@ -7,9 +7,10 @@ import java.io.Serializable;
@Data
public class WaitingAreaDo implements Serializable {
private String interId;
private String crossId;
private String rid;
private String laneId;
private Integer waitingArea;
private String turnDir;
private String geomWkt;
}
package com.wanji.indicators.source;
import com.wanji.indicators.model.event.WaitingAreaDo;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @Author ke.han
* @Date 2020/4/16 19:03
**/
public class WaitLaneSource extends RichParallelSourceFunction<
HashMap<String, Integer>> {
public static void main(String[] args) {
System.out.println(DateUtil.toDateTime(1697244174106L,"yyyy-MM-dd HH:mm:ss"));
}
private volatile boolean running = true;
@Override
public void run(SourceContext<HashMap<String, Integer>> out) {
HashMap<String, Integer> outMap = new HashMap<>();
while (running) {
String sql = "SELECT * FROM cross_flink_settings";
List<WaitingAreaDo> ridLaneNumDos = null;//new JdbcUtil().excuteQueryToList(WaitingAreaDo.class, sql);
if (ridLaneNumDos != null && !ridLaneNumDos.isEmpty()) {
outMap.clear();
for (WaitingAreaDo data : ridLaneNumDos) {
String laneId = data.getLaneId();
Integer waitingArea = data.getWaitingArea();
outMap.put(laneId.trim(), waitingArea);
}
}
if (outMap.size() > 0) {
out.collect(outMap);
}
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Thread.sleep(1000*60);
}
}
@Override
public void cancel() {
running = false;
}
}
config_name=config_cs.properties
\ No newline at end of file
config_name=config_mz.properties
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment