Commit 6834e82c authored by zhoushiguang's avatar zhoushiguang

交通事件计算调测修改

parent 282152b1
......@@ -81,7 +81,7 @@ public class Constant {
/**
* 事故-目标车辆
*/
public static final String TARGET_VEHICLE = "目标车辆";
public static final String TARGET_VEHICLE = "默000000";
/**
* 事故相关
......
......@@ -94,25 +94,25 @@ public class AccidentCheckTimeFlatMap implements FlatMapFunction<AccidentFrameMo
long stopDu = stop.getStopDuration();
//疑似事故车辆与停车车辆之间的停车时间差
long tx = Math.abs(stopTimeA - stopTimeS);
// boolean isOK = false;
// if (distance < 10.0 && tx < 15000L && stopDu > 7500L) {
// //时间差小于15秒 且 匹配车辆停车时间大于7.5秒的则同意匹配
// log.info("LOG_EVENT_ACCIDENT_MERGE >>> 10 > 车辆合并 停车k:" + stop.getPicLicense() + "-" + stop.getId() +
// " 跟踪k:" + accident.getPicLicense() + "-" + accident.getId() +
// " 停车时间差:" + tx + " 距离:" + distance + " 停车时间:" + stopDu);
// isOK = true;
// } else if (distance < 9.0 && tx < 30000L && stopDu > 20000L) {
// //时间差小于30秒 且 匹配车辆停车时间大于20秒的则同意匹配
// log.info("LOG_EVENT_ACCIDENT_MERGE >>> 9 > 车辆合并 停车k:" + stop.getPicLicense() + "-" + stop.getId() +
// " 跟踪k:" + accident.getPicLicense() + "-" + accident.getId() +
// " 停车时间差:" + tx + " 距离:" + distance + " 停车时间:" + stopDu);
// isOK = true;
// } else if (distance < 8.0 && tx < 180000L && stopDu > 30000L && stopDu < 190000L) {
// log.info("LOG_EVENT_ACCIDENT_MERGE >>> 8 > 车辆合并 停车k:" + stop.getPicLicense() + "-" + stop.getId() +
// " 跟踪k:" + accident.getPicLicense() + "-" + accident.getId() +
// " 停车时间差:" + tx + " 距离:" + distance + " 停车时间:" + stopDu);
// isOK = true;
// }
boolean isOK = false;
if (distance < 10.0 && tx < 15000L && stopDu > 7500L) {
//时间差小于15秒 且 匹配车辆停车时间大于7.5秒的则同意匹配
log.info("LOG_EVENT_ACCIDENT_MERGE >>> 10 > 车辆合并 停车k:" + stop.getPicLicense() + "-" + stop.getId() +
" 跟踪k:" + accident.getPicLicense() + "-" + accident.getId() +
" 停车时间差:" + tx + " 距离:" + distance + " 停车时间:" + stopDu);
isOK = true;
} else if (distance < 9.0 && tx < 30000L && stopDu > 20000L) {
//时间差小于30秒 且 匹配车辆停车时间大于20秒的则同意匹配
log.info("LOG_EVENT_ACCIDENT_MERGE >>> 9 > 车辆合并 停车k:" + stop.getPicLicense() + "-" + stop.getId() +
" 跟踪k:" + accident.getPicLicense() + "-" + accident.getId() +
" 停车时间差:" + tx + " 距离:" + distance + " 停车时间:" + stopDu);
isOK = true;
} else if (distance < 8.0 && tx < 180000L && stopDu > 30000L && stopDu < 190000L) {
log.info("LOG_EVENT_ACCIDENT_MERGE >>> 8 > 车辆合并 停车k:" + stop.getPicLicense() + "-" + stop.getId() +
" 跟踪k:" + accident.getPicLicense() + "-" + accident.getId() +
" 停车时间差:" + tx + " 距离:" + distance + " 停车时间:" + stopDu);
isOK = true;
}
if (stopDu >= configModel.getAbnormalParkStopBaseDuration()) {
if (set.add(trackIDA + "-" + trackIDS) && set.add(trackIDS + "-" + trackIDA)) {
//如果距离小于上一车辆
......
......@@ -37,39 +37,58 @@ public class AccidentDuplicateRemoveKeyedProcess extends KeyedProcessFunction<St
String accidentType = accidentFrameModel.getAccidentType();
List<AccidentFrameModel.CarTrack> trackList = accidentFrameModel.getTrackList();
boolean isOk = true;
AccidentFrameModel distinctModel = new AccidentFrameModel();
BeanUtils.copyProperties(accidentFrameModel,distinctModel);
// AccidentFrameModel distinctModel = new AccidentFrameModel();
// BeanUtils.copyProperties(accidentFrameModel,distinctModel);
List<AccidentFrameModel.CarTrack> distinctCarList = new ArrayList<>();
for (AccidentFrameModel.CarTrack carTrack : trackList) {
int trackID = carTrack.getId();
String crossId = carTrack.getRoadnet().getCrossId();
String key = crossId + Constant.MARK + trackID + Constant.MARK + accidentType;
AccidentFrameModel.CarTrack value = mapState.get(key);
if (value == null) {
mapState.put(key,carTrack);
} else {
if (mapState.contains(key)) {
continue;
//AccidentFrameModel.CarTrack value = mapState.get(key);
if (isOk) {
AccidentFrameModel.CarTrack value = mapState.get(key);
if (value == null) {
mapState.put(key,carTrack);
} else {
isOk = false;
}
//若在上报单车事故之前已经上报多车事故 则此处再次查询是否有包含该车的多车事故 有则不予上报 无则正常上报单车事故
if (!EventAbnormalEnum.ACCIDENT_ONE_YS.getType().equals(accidentType)) {
key = crossId + Constant.MARK + trackID + Constant.MARK + EventAbnormalEnum.ACCIDENT_ONE_YS.getType();
AccidentFrameModel.CarTrack valueCheck = mapState.get(key);
if (valueCheck != null) {
continue;
}
}
if (!EventAbnormalEnum.ACCIDENT_ONE_YS.getType().equals(accidentType)) {
key = crossId + Constant.MARK + trackID + Constant.MARK + EventAbnormalEnum.ACCIDENT_ONE_YS.getType();
AccidentFrameModel.CarTrack valueCheck = mapState.get(key);
if (valueCheck != null) {
isOk = false;
}
}
distinctCarList.add(carTrack);
// if (value == null) {
// mapState.put(key,carTrack);
// } else {
// if (mapState.contains(key)) {
// continue;
// }
// //若在上报单车事故之前已经上报多车事故 则此处再次查询是否有包含该车的多车事故 有则不予上报 无则正常上报单车事故
// if (!EventAbnormalEnum.ACCIDENT_ONE_YS.getType().equals(accidentType)) {
// key = crossId + Constant.MARK + trackID + Constant.MARK + EventAbnormalEnum.ACCIDENT_ONE_YS.getType();
// AccidentFrameModel.CarTrack valueCheck = mapState.get(key);
// if (valueCheck != null) {
// continue;
// }
// }
// }
// distinctCarList.add(carTrack);
}
if (isOk) {
// distinctModel.setTrackList(distinctCarList);
//输出结果数据
collector.collect(accidentFrameModel);
//日志记录
logInfo(accidentFrameModel);
//清除状态缓存
clearStateCache(accidentFrameModel);
}
distinctModel.setTrackList(distinctCarList);
//输出结果数据
collector.collect(distinctModel);
//日志记录
logInfo(distinctModel);
//清除状态缓存
clearStateCache(distinctModel);
}
/**
......
......@@ -91,7 +91,7 @@ public class AccidentEncapsulationFlatMap implements FlatMapFunction<AccidentFra
AccidentFrameModel.CarTrack carTrack1,
EventAbnormalEnum eventAbnormalEnum) {
AccidentShowModel m = new AccidentShowModel();
m.setEventType(eventAbnormalEnum.getDesc());
m.setEventType(eventAbnormalEnum.getType());
m.setTypeCode(eventAbnormalEnum.getType());
m.setEventClass(eventAbnormalEnum.getDesc());
m.setCrossId(carTrack0.getRoadnet().getCrossId());
......@@ -113,7 +113,7 @@ public class AccidentEncapsulationFlatMap implements FlatMapFunction<AccidentFra
m.setInfluenceDegree(2);
m.setX(carTrack0.getLongitude());
m.setY(carTrack0.getLatitude());
m.setCategory(Integer.valueOf(eventAbnormalEnum.getType()));
m.setCategory(Integer.valueOf(EventAbnormalEnum.ACCIDENT_YS.getType()));
m.setPictureTime1(carTrack0.getStopTime() - 10000);
m.setPictureTime2(carTrack0.getStopTime());
m.setPictureTime3(carTrack0.getStopTime() + 10000);
......@@ -133,6 +133,8 @@ public class AccidentEncapsulationFlatMap implements FlatMapFunction<AccidentFra
m.setAccidentDescribe(Constant.MULTI_CAR_ACCIDENT + m.getPlateNumber() + Constant.AND + m.getPlateNumberOther() + Constant.SPACE + runingDir);
//英文 - 事故描述
m.setAccidentDescribeEn(Constant.EN_MULTI_CAR_ACCIDENT + m.getPlateNumber() + Constant.EN_AND + m.getPlateNumberOther() + Constant.SPACE + Constant.mapAccident.get(runingDir));
m.setId();
return m;
}
......@@ -232,6 +234,7 @@ public class AccidentEncapsulationFlatMap implements FlatMapFunction<AccidentFra
private AccidentShowModel oneVehicleAccidentShow(AccidentFrameModel accidentFrameModel) {
AccidentFrameModel.CarTrack model = accidentFrameModel.getTrackList().get(0);
AccidentShowModel m = new AccidentShowModel();
m.setEventType(EventAbnormalEnum.ACCIDENT_ONE_YS.getType());
m.setTypeCode(EventAbnormalEnum.ACCIDENT_ONE_YS.getType());
m.setEventClass(EventAbnormalEnum.ACCIDENT_ONE_YS.getDesc());
......@@ -267,9 +270,10 @@ public class AccidentEncapsulationFlatMap implements FlatMapFunction<AccidentFra
m.setInfluenceDegree(2);
m.setX(model.getLongitude());
m.setY(model.getLatitude());
m.setCategory(Integer.valueOf(EventAbnormalEnum.ACCIDENT_ONE_YS.getType()));
m.setCategory(Integer.valueOf(EventAbnormalEnum.ACCIDENT_YS.getType()));
m.setCheckTime(model.getCheckTime());
m.setDuration(model.getStopDuration());
m.setId();
return m;
}
......
......@@ -30,6 +30,11 @@ public class AccidentFlatMap implements FlatMapFunction<CrossFrameModel, Acciden
ConfigModel configModel = frameModel.getConfigModel();
if (configModel.getAbnormalParkIsOpen() == Constant.OPEN) {
for (CarTrackModel carTrack : trackList) {
String crossId = carTrack.getRoadnet().getCrossId();
if (!"13NI00B5RM0".equals(crossId)) {
continue;
}
boolean isMotor = VehicleTypeEnum.isMotorVehicles(carTrack.getOriginalType().shortValue());
int inCrossingFlag = carTrack.getRoadnet().getInCrossFlag();
//路口内机动车
......@@ -56,14 +61,15 @@ public class AccidentFlatMap implements FlatMapFunction<CrossFrameModel, Acciden
double stopLineDist = GeomsConvertUtil.getDistance(car.getLongitude(), car.getLatitude(), Double.parseDouble(sps[0]), Double.parseDouble(sps[1]));
car.setStopLineDist(stopLineDist);
}
list.add(car);
//筛选行驶越过斑马线
if (car.getStopLineDist()>configModel.getAbnormalParkZebraCrossLength()) {
String dt = DateUtil.toDateTime(car.getGlobalTimeStamp(),DateUtil.YYYY_MM_DD_HH_MM_SS);
// if (carTrack.getGlobalTimeStamp()>1697265909793L && "鲁AD66123".equals(carTrack.getPicLicense())){
// System.out.println();
// }
list.add(car);
}
// if (car.getStopLineDist()> configModel.getAbnormalParkZebraCrossLength()) {
// String dt = DateUtil.toDateTime(car.getGlobalTimeStamp(),DateUtil.YYYY_MM_DD_HH_MM_SS);
//// if (carTrack.getGlobalTimeStamp()>1697265909793L && "鲁AD66123".equals(carTrack.getPicLicense())){
//// System.out.println();
//// }
// list.add(car);
// }
}
}
AccidentFrameModel model = new AccidentFrameModel();
......
......@@ -34,7 +34,7 @@ public class ConflictEncapsulationResultFlatMap implements FlatMapFunction<Event
model.setTrackIdNear(event.getTrackIdNear());
model.setEventClass(EventAbnormalEnum.CONFLICT_POINT.getDesc());
model.setEventType(EventAbnormalEnum.CONFLICT_POINT.getDesc());
model.setEventType(EventAbnormalEnum.CONFLICT_POINT.getType());
model.setTypeCode(EventAbnormalEnum.CONFLICT_POINT.getType());
model.setDesc("路口面冲突点: 车辆 " + event.getPlateNumber() + "行驶方向为 " + PtInPolyUtil.getDir(event.getDriveAngle()));
model.setDescEn("Intersection conflict point: the driving direction of vehicle "+ "行驶方向为 " + PtInPolyUtil.getDirEn(event.getDriveAngle()));
......
......@@ -51,8 +51,6 @@ public class ConflictLaneInfoCoFlatMap implements CoFlatMapFunction<CrossFrameMo
carTrack.getRoadnet().setTurn(baseLaneInfo.getTurn());
carTrack.getRoadnet().setRidDir8(ridDir);
carTrack.getRoadnet().setLaneAngle(laneAngle);
} else {
continue;
}
resList.add(carTrack);
}
......
package com.wanji.indicators.event.stream;
import com.wanji.indicators.event.cross.accident.AccidentEventMainNew;
import com.wanji.indicators.event.cross.conflictpoint.ConflictEventMain;
import com.wanji.indicators.event.cross.conflictpoint.func.ConflictLaneInfoCoFlatMap;
import com.wanji.indicators.event.stream.func.CrossFrameFlatMap;
import com.wanji.indicators.model.CrossFrameModel;
......@@ -62,7 +63,8 @@ public class OfflineEventTestMain {
//离线调试模式
boolean isDebug = true;
try {
AccidentEventMainNew.init(env,outPath,isDebug).run(joinLaneStream);
ConflictEventMain.init(env,outPath,isDebug).run(thinningDataStream);
//AccidentEventMainNew.init(env,outPath,isDebug).run(joinLaneStream);
env.execute("事故事件测试");
} catch (Exception e) {
log.error("事件测试 - 计算任务异常 : " + e);
......
package com.wanji.indicators.event.stream;
import com.wanji.indicators.event.cross.accident.AccidentEventMainNew;
import com.wanji.indicators.event.cross.conflictpoint.ConflictEventMain;
import com.wanji.indicators.event.cross.conflictpoint.func.ConflictLaneInfoCoFlatMap;
import com.wanji.indicators.event.stream.func.CrossFrameFlatMap;
......@@ -69,24 +70,25 @@ public class TrafficEventMain {
.setParallelism(1)
.name("全域轨迹帧数据-按路口分组输出");
SingleOutputStreamOperator<CrossFrameModel> joinLaneStream = thinningDataStream
.connect(env.addSource(new LaneInfoSource()).broadcast())
.flatMap(new ConflictLaneInfoCoFlatMap())
.setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定");
//抽帧
SingleOutputStreamOperator<CrossFrameModel> thinningStream = joinLaneStream
SingleOutputStreamOperator<CrossFrameModel> thinningStream = thinningDataStream
.keyBy(CrossFrameModel::getCrossId)
.countWindow(3)
.countWindow(1)
.maxBy("globalTimeStamp")
.name("数据抽帧");
SingleOutputStreamOperator<CrossFrameModel> joinLaneStream = thinningStream
.connect(env.addSource(new LaneInfoSource()).broadcast())
.flatMap(new ConflictLaneInfoCoFlatMap())
.setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定");
try {
//路口内冲突点计算
ConflictEventMain.init(env).run(thinningStream);
ConflictEventMain.init(env).run(joinLaneStream);
//事故计算
AccidentEventMainNew.init(env).run(joinLaneStream);
env.execute("全域安全专题计算");
} catch (Exception e) {
......
package com.wanji.indicators.model;
import com.wanji.indicators.constant.Constant;
import lombok.Data;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import java.io.Serializable;
......@@ -143,6 +146,7 @@ public class BaseEventResultModel implements Serializable {
public BaseEventResultModel() {
}
}
......@@ -45,7 +45,7 @@ public class ConfigModel implements Serializable {
/*判定正常行驶车速阈值*/
private double abnormalParkStopRemoveSpeed = 9;
/*判定停车的车速阈值*/
private double abnormalParkStopSpeed = 0.1;
private double abnormalParkStopSpeed = 2;
/*停车轨迹消散 移除状态时间阈值*/
private long abnormalParkStopDissipateRemoveTime = 1700;
/*提取绕行停车车辆轨迹-行驶距离过滤阈值*/
......
......@@ -3,6 +3,7 @@ package com.wanji.indicators.model.event.accident;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.BaseEventResultModel;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
import java.util.List;
......@@ -11,6 +12,7 @@ import java.util.List;
* 事故专题展示类
**/
@Data
@ToString(callSuper = true)
public class AccidentShowModel extends BaseEventResultModel implements Serializable {
/**
......@@ -206,7 +208,7 @@ public class AccidentShowModel extends BaseEventResultModel implements Serializa
private long time;
public void setId() {
super.setId(crossId + Constant.MARK + trackId + Constant.MARK + typeCode + Constant.MARK + startTime);
this.setId(crossId + Constant.MARK + trackId + Constant.MARK + trackIds + Constant.MARK + typeCode + Constant.MARK + startTime);
}
}
#kafka settings kafka1:9092,kafka2:9092,kafka3:9092 192.168.11.102:9092
bootstrap.servers=10.102.1.182:9092
#bootstrap.servers=37.12.182.31:9092
#bootstrap.servers=10.102.1.182:9092
bootstrap.servers=37.12.182.31:9092
#consumer-settings
#consumer.topic=JNMatchResultMiniData_TEST
consumer.topic=JNMatchResultMiniData
period.topic=JN050001LanePeriodicData
consumer.group.id=FlinkConsumerGroup-V561
consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
......
......@@ -26,7 +26,7 @@ log4j.appender.logfile.encoding=UTF-8
#DailyRollingFileAppender每天产生一个日志文件
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
#设置日志文件保存路径
log4j.appender.R.File=/data/soft/flink/logs/flink-traffic-indicators-compute.log
log4j.appender.R.File=/data/flink/logs/flink-indicators.log
#log4j.appender.R.File=/data/flink-traffic-indicators-compute.log
#日志输出格式
log4j.appender.R.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss} [%c]-[%p] %m%n
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment