Commit afbcf6c6 authored by zhoushiguang's avatar zhoushiguang

冲突点计算任务

parent c0576896
package com.wanji.indicators.constant;
import java.util.Objects;
public enum CongestEnum {
//******************************************************//
NO_CONGEST("1","畅通"),
LIGHT_CONGEST("2","轻微拥堵"),
MODERATE_CONGEST("3","中度拥堵"),
HEAVY_CONGEST("4","重度拥堵")
;
//******************************************************//
private String type;
private String name;
CongestEnum(String type, String name) {
this.type = type;
this.name = name;
}
public static String getName(String type){
for(CongestEnum typeEnum: CongestEnum.values()){
if(Objects.equals(typeEnum.type,type)){
return typeEnum.name;
}
}
return null;
}
public static String getType(CongestEnum type) {
for(CongestEnum typeEnum: CongestEnum.values()) {
if(Objects.equals(typeEnum,type)){
return typeEnum.type;
}
}
return null;
}
}
package com.wanji.indicators.constant;
import java.util.HashMap;
import java.util.Map;
public class Constant {
//机动车类型集合,逗号分割
public static final String MOTOR_TYPES= "1,2,3,7,8,10,11,12,13,15";
......
package com.wanji.indicators.constant;
import java.util.Objects;
public enum DirEnum {
//******************************************************//
NO_CONGEST("1","北"),
LIGHT_CONGEST("3","东"),
MODERATE_CONGEST("5","南"),
HEAVY_CONGEST("4","西")
;
//******************************************************//
private String type;
private String name;
DirEnum(String type, String name) {
this.type = type;
this.name = name;
}
public static String getName(String type){
for(DirEnum typeEnum: DirEnum.values()){
if(Objects.equals(typeEnum.type,type)){
return typeEnum.name;
}
}
return null;
}
public static String getType(DirEnum type) {
for(DirEnum typeEnum: DirEnum.values()) {
if(Objects.equals(typeEnum,type)){
return typeEnum.type;
}
}
return null;
}
}
package com.wanji.indicators.constant;
import java.util.Arrays;
import java.util.List;
/**
* 事件等级
*/
public enum EventGradeEnum {
DEDUCT_POINTS(1,"扣分"),
TICKET(2,"罚款"),
WARN(3,"警告");
int level;
String desc;
EventGradeEnum(int b,String desc) {
this.level = b;
this.desc = desc;
}
public int getLevel() {
return level;
}
public String getDesc() {
return desc;
}
public static int getLevelByEventType(EventAbnormalEnum eventType){
if(eventType == null){
return 0;
}
return getLevelByEventType(eventType.getType());
}
// 扣分
public static final List<EventAbnormalEnum> DEDUCT_POINTS_Events = Arrays.asList(new EventAbnormalEnum[]{
EventAbnormalEnum.VEHICLERUNREDLIGHT, // 机动车闯红灯
EventAbnormalEnum.MOTO_NO_YIEDLD_PEDESTRIAN, // 机动车未礼让行人
EventAbnormalEnum.RETROGRADE, // 机动车 逆行
EventAbnormalEnum.MOTO_PARK_ON_LINE // 机动车停车越线
});
// 罚款
public static final List<EventAbnormalEnum> TICKET_Events = Arrays.asList(new EventAbnormalEnum[]{
EventAbnormalEnum.MOTO_RUN_WRONG_DIRECTION, // 机动车不按车道线行驶
EventAbnormalEnum.MOTO_RUN_IN_OTHER_LINE, // 机动车 占用专用车道
EventAbnormalEnum.OCCUPYBUSLANE, // 机动车 占用公交车道
EventAbnormalEnum.RESTRICTION_LICENSE_PLATE // 机动车 限号出行
});
// 警告
public static final List<EventAbnormalEnum> WARN_Events = Arrays.asList(new EventAbnormalEnum[]{
EventAbnormalEnum.NONMOTORETROGRADE, // 非机动车逆行
EventAbnormalEnum.NONMOTO_RUN_REDLIGHT, // 非机动车闯红灯
EventAbnormalEnum.NONMOTO_RUN_IN_OTHER_LINE, // 非机动车不按车道线行驶
EventAbnormalEnum.NONMOTO_PARK_ON_LINE, // 非机动车越线停车
EventAbnormalEnum.NONMOTO_CARRY_PEOPLE // 非机动车载人
});
public static int getLevelByEventType(String eventType){
// 扣分
if(DEDUCT_POINTS_Events.stream().anyMatch(event -> event.getType().equals(eventType))){
return DEDUCT_POINTS.getLevel();
}
// 罚款
if(TICKET_Events.stream().anyMatch(event -> event.getType().equals(eventType))){
return TICKET.getLevel();
}
// 警告
if(WARN_Events.stream().anyMatch(event -> event.getType().equals(eventType))){
return WARN.getLevel();
}
// 其他返回 0
return 0;
}
}
package com.wanji.indicators.constant;
/**
* 事件分级:~~~~3-行人、2-非机动车、1-机动车
*/
public enum EventLevelEnum {
VEHICLE("1","机动车事件"),
NON_VEHICLE("2","非机动事件"),
PEDESTRIANENT("3","行人事件"),
ACCIDENT("4","疑似事故"),
CONGESTION("5","拥堵"),
// DANGER("6","危险驾驶")
DANGER("6","隐患事件"),
ILLEGAL("7","违法事件")
;
String level;
String desc;
EventLevelEnum(String b, String desc) {
this.level = b;
this.desc = desc;
}
public String getLevel() {
return level;
}
public String getDesc() {
return desc;
}
public static EventLevelEnum getByLevel(String type){
for (EventLevelEnum abnormalEnum : EventLevelEnum.values()) {
if(abnormalEnum.getLevel().equals(type)){
return abnormalEnum;
}
}
return null;
}
public static String getLevelByEventType(String eventType){
// 机动车事件
if(EventAbnormalEnum.isVehicleEvent(eventType)){
return VEHICLE.getLevel();
}
// 非机动车 事件
if(EventAbnormalEnum.isNonVehicleEvent(eventType)){
return NON_VEHICLE.getLevel();
}
// 行人 事件
if(EventAbnormalEnum.isPedestrianentEvent(eventType)){
return PEDESTRIANENT.getLevel();
}
// 事故 事件
if(EventAbnormalEnum.isAccidentEvent(eventType)){
return ACCIDENT.getLevel();
}
// 拥堵 事件
if(EventAbnormalEnum.isCongestionEvent(eventType)){
return CONGESTION.getLevel();
}
// 其他返回 0
return null;
}
}
package com.wanji.indicators.constant;
import java.util.Objects;
public enum FieldLabelEnum {
//******************************************************//
AVG_PASS_RATE("avg_pass_rate","平均通过率"),
AVG_PASS_TIME("avg_pass_time","平均通过时长"),
AVG_WAIT_TIME("avg_wait_time","平均等待时长"),
AVG_WAIT_PEOPLE_NUMBER("avg_wait_people_number","平均等待人数"),
RUN_RED_NUMBER("run_red_number","行人闯红灯数量"),
WLRXR_NON_MOTOR_NUMBER("wlrxr_non_motor_number","未礼让行人的非机动车数量"),
WLRXR_MOTOR_NUMBER("wlrxr_motor_number","未礼让行人的机动车数量"),
ACCELERATE_PASS_RATE("accelerate_pass_rate","行人加速通过率"),
NOT_CLEAR_RATE("not_clear_rate","未清空率"),
NOT_CLEAR_NUMBER("not_clear_number","平均未清空人数"),
RUN_UNSMOOTH_NUMBER("run_unsmooth_number","运行不流畅人数"),
SLOW_SPEED_NUMBER("slow_speed_number","路口内减速车辆数"),
STRAIGHT_NUMBER("straight_number","直行汇车车辆数"),
RIGHT_NUMBER("right_number","右转汇车车辆数")
;
//******************************************************//
private String type;
private String name;
FieldLabelEnum(String type, String name) {
this.type = type;
this.name = name;
}
public static String getName(String type){
for(FieldLabelEnum typeEnum: FieldLabelEnum.values()){
if(Objects.equals(typeEnum.type,type)){
return typeEnum.name;
}
}
return null;
}
public static String getType(FieldLabelEnum type) {
for(FieldLabelEnum typeEnum: FieldLabelEnum.values()) {
if(Objects.equals(typeEnum,type)){
return typeEnum.type;
}
}
return null;
}
}
package com.wanji.indicators.constant;
import java.util.Objects;
public enum HistoryTrackTypeEnum {
EFFICIENT_PEDESTRIAN_HIS_TRACK("EFF_PEDESTRIAN","效率-行人历史轨迹"),
EFFICIENT_NON_MOTOR_HIS_TRACK("EFF_NON_MOTOR","效率-非机动车历史轨迹"),
EFFICIENT_MOTOR_HIS_TRACK("EFF_MOTOR","效率-机动车历史轨迹"),
PLAY_START("START","启动播放"),
PLAY_STOP("STOP","停止播放"),
PLAY_CONTINUE_START("CONTINUE_START","继续播放")
;
private String type;
private String name;
HistoryTrackTypeEnum(String type, String name) {
this.type = type;
this.name = name;
}
public static String getName(String type) {
for(HistoryTrackTypeEnum typeEnum: HistoryTrackTypeEnum.values()){
if(Objects.equals(typeEnum.type,type)){
return typeEnum.name;
}
}
return null;
}
public static String getType(HistoryTrackTypeEnum type) {
for(HistoryTrackTypeEnum typeEnum: HistoryTrackTypeEnum.values()) {
if(Objects.equals(typeEnum,type)){
return typeEnum.type;
}
}
return null;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
package com.wanji.indicators.constant;
import java.util.Objects;
public enum KafkaDataEnum {
KAFAK_REALTIME_CAR_DATA("1","实时车辆数据"),
KAFAK_CROSS_FLOW_DATA("2","路口流量数据"),
KAFAK_ABNORMAL_EVENT_DATA("3","交通异常事件数据"),
KAFAK_SIGNAL_LIGHT_STATE_DATA("4","信号灯态数据"),
KAFAK_INSTANT_DATA("5","瞬时数据"),
BIG_SCREEN_WXJS_DATA("BIG_SCREEN_WXJS_DATA","大屏危险驾驶数据"),
BIG_SCREEN_SHIGU_DATA("BIG_SCREEN_SHIGU_DATA","大屏事故数据"),
BIG_SCREEN_ZZCT_DATA("BIG_SCREEN_ZZCT_DATA","大屏直左冲突车辆数据"),
BIG_SCREEN_XWKF_DATA("BIG_SCREEN_XWKF_DATA","大屏相位空放数据"),
BIG_SCREEN_KYDJZ_DATA("BIG_SCREEN_KYDJZ_DATA","大屏可移动基站数据"),
BIG_SCREEN_MXJT_DATA("BIG_SCREEN_MXJT_DATA","慢行交通数据"),
BIG_SCREEN_WRJ_DATA("BIG_SCREEN_WRJ_DATA","无人机场景数据"),
BIG_SCREEN_GROUP_DATA("BIG_SCREEN_GROUP_DATA","路口群组(带指标、不带指标)"),
BIG_SCREEN_ZZCT_SIGNAL_DATA("BIG_SCREEN_ZZCT_SIGNAL_DATA","直左冲突信号灯态"),
BIG_SCREEN_XWKF_SIGNAL_DATA("BIG_SCREEN_XWKF_SIGNAL_DATA","信号空放信号灯态")
;
private String type;
private String name;
KafkaDataEnum(String type, String name) {
this.type = type;
this.name = name;
}
public static String getName(String type) {
for(KafkaDataEnum typeEnum: KafkaDataEnum.values()){
if(Objects.equals(typeEnum.type,type)){
return typeEnum.name;
}
}
return null;
}
public static String getType(KafkaDataEnum type) {
for(KafkaDataEnum typeEnum: KafkaDataEnum.values()) {
if(Objects.equals(typeEnum,type)){
return typeEnum.type;
}
}
return null;
}
}
package com.wanji.indicators.constant;
import java.util.Objects;
public enum LaneDetectorEnum {
//******************************************************//
DETECTOR_A("DETECTOR_A","感应检测器"),
DETECTOR_B("DETECTOR_B","反溢检测器"),
DETECTOR_C("DETECTOR_C","排队检测器")
;
//******************************************************//
private String type;
private String name;
public String getType() {
return type;
}
public String getName() {
return name;
}
LaneDetectorEnum(String type, String name) {
this.type = type;
this.name = name;
}
public static String getName(String type){
for(LaneDetectorEnum typeEnum: LaneDetectorEnum.values()){
if(Objects.equals(typeEnum.type,type)){
return typeEnum.name;
}
}
return null;
}
public static String getType(String name) {
for(LaneDetectorEnum typeEnum: LaneDetectorEnum.values()) {
if(Objects.equals(typeEnum.getName(),name)){
return typeEnum.type;
}
}
return null;
}
}
package com.wanji.indicators.constant;
import java.util.Objects;
public enum LaneTurnDescEnum {
//******************************************************//
NO_CONGEST("1","左"),
LIGHT_CONGEST("2","直"),
MODERATE_CONGEST("3","右"),
HEAVY_CONGEST("4","掉头")
;
//******************************************************//
private String type;
private String name;
LaneTurnDescEnum(String type, String name) {
this.type = type;
this.name = name;
}
public static String getName(String type){
for(LaneTurnDescEnum typeEnum: LaneTurnDescEnum.values()){
if(Objects.equals(typeEnum.type,type)){
return typeEnum.name;
}
}
return null;
}
public static String getType(LaneTurnDescEnum type) {
for(LaneTurnDescEnum typeEnum: LaneTurnDescEnum.values()) {
if(Objects.equals(typeEnum,type)){
return typeEnum.type;
}
}
return null;
}
}
package com.wanji.indicators.constant;
/**
* @author fengyi
* @date 2023/6/9
* @description
*/
public class ModuleConstant {
public final static String SITUATION_MODULE_NAME = "态势监测";
public final static String EFFICIENCY_MODULE_NAME = "效率评价";
public final static String ORGANIZE_MODULE_NAME = "组织评价";
public final static String SAFETY_MODULE_NAME = "安全评价";
public final static String REPORT_MODULE_NAME = "分析报告";
public final static String SIGNAL_MODULE_NAME = "信号评价";
public final static String TRACK_MODULE_NAME = "轨迹分析";
public final static String ROADNET_BASE_MODULE_NAME = "路网基础数据";
public final static String ROAD_STATUS_BASE_MODULE_NAME = "路况图层模块";
public final static String EVENT_ANA_BASE_MODULE_NAME = "事件分析";
}
package com.wanji.indicators.constant;
import java.util.Objects;
public enum ModuleEnum {
//******************************************************//
EFFICIENCY_EVALUATION("1","效率评价"),
SAFETY_EVALUATION("2","安全评价")
;
//******************************************************//
private String type;
private String name;
ModuleEnum(String type, String name) {
this.type = type;
this.name = name;
}
public static String getName(String type){
for(ModuleEnum typeEnum: ModuleEnum.values()){
if(Objects.equals(typeEnum.type,type)){
return typeEnum.name;
}
}
return null;
}
public static String getType(ModuleEnum type) {
for(ModuleEnum typeEnum: ModuleEnum.values()) {
if(Objects.equals(typeEnum,type)){
return typeEnum.type;
}
}
return null;
}
}
package com.wanji.indicators.constant;
import java.util.Objects;
/**
* @author fengyi
* @date 2023/2/8
* @description
*/
public enum ParticipantEnum {
//******************************************************//
MOTOR("1","机动车"),
NON_MOTOR("2","非机动车"),
PED("3","行人")
;
//******************************************************//
private String type;
private String name;
ParticipantEnum(String type, String name) {
this.type = type;
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public static String getName(String type){
for(ParticipantEnum typeEnum: ParticipantEnum.values()){
if(Objects.equals(typeEnum.type,type)){
return typeEnum.name;
}
}
return null;
}
public static String getType(ParticipantEnum type) {
for(ParticipantEnum typeEnum: ParticipantEnum.values()) {
if(Objects.equals(typeEnum,type)){
return typeEnum.type;
}
}
return null;
}
}
package com.wanji.indicators.constant;
/**
* @ClassName PlateColorEnum
* @Description 车牌颜色
* @Date 2021/4/15 8:46
* @Version 1.0
*/
public enum PlateColorEnum {
//蓝牌
BULE(0),
YELLOW(1),
BLACK(2),
WHITE(3),
//渐变绿
GRADUALGREEN(4),
//黄绿双拼
YELLOWANDGREEN(5),
//蓝白渐变
BLUEANDWHITE(6),
//临时牌
TEMPORARY(7),
UNKOWN(9),
GREEN(11),
RED(12);
int color;
PlateColorEnum(int b) {
this.color=b;
//命令字
}
public int getColor(){
return color;
}
}
package com.wanji.indicators.constant;
import java.util.Objects;
public enum SignalLightStateEnum {
RED("R","1"),
YELLOW("Y","2"),
GREEN("G","3"),
NO_LIGHT("N","0");
private String key;
private String value;
SignalLightStateEnum(String key, String value) {
this.key = key;
this.value = value;
}
public static String getValue(String key){
for(SignalLightStateEnum typeEnum: SignalLightStateEnum.values()){
if(Objects.equals(typeEnum.key,key)){
return typeEnum.value;
}
}
return null;
}
public static String getKey(SignalLightStateEnum key) {
for(SignalLightStateEnum typeEnum: SignalLightStateEnum.values()) {
if(Objects.equals(typeEnum,key)){
return typeEnum.key;
}
}
return null;
}
}
package com.wanji.indicators.constant;
/**
* 时间粒度
*/
public enum TimeGranularityEnum {
FIVE_MINUTE("5m","5分钟",5,"5 MINUTE"),
TEN_MINUTE("10m","10分钟",10,"10 MINUTE"),
FIFTEEN_MINUTE("15m","15分钟",15,"15 MINUTE"),
THIRTY_MINUTE("30m","30分钟",30,"30 MINUTE"),
ONE_HOUR("1h","1小时",60,"60 MINUTE");
private String type;
private String desc;
private Integer minites;
private String enDesc;
TimeGranularityEnum(String type, String desc, int minites,String enDesc) {
this.type = type;
this.desc = desc;
this.minites = minites;
this.enDesc = enDesc;
}
public static TimeGranularityEnum getEnumByType(String type){
for (TimeGranularityEnum timeGranularityEnum : TimeGranularityEnum.values()) {
if(timeGranularityEnum.getType().equals(type)){
return timeGranularityEnum;
}
}
return null;
}
public String getType() {
return type;
}
public String getDesc() {
return desc;
}
public Integer getMinites() {
return minites;
}
public String getEnDesc() {
return enDesc;
}
public void setEnDesc(String enDesc) {
this.enDesc = enDesc;
}
}
package com.wanji.indicators.constant;
import java.util.Objects;
public enum TurnEnum {
//******************************************************//
LEFT("l","1","左"),
STRAIGHT("s","2","直"),
RIGHT("r","3","右"),
UTURN("u","4","掉头");
//******************************************************//
private String key;
private String value;
private String name;
TurnEnum(String key, String value,String name) {
this.key = key;
this.value = value;
this.name = name;
}
public static String getValue(String key){
for(TurnEnum typeEnum: TurnEnum.values()){
if(Objects.equals(typeEnum.key,key)){
return typeEnum.value;
}
}
return null;
}
public static String getKey(TurnEnum key) {
for(TurnEnum typeEnum: TurnEnum.values()) {
if(Objects.equals(typeEnum,key)){
return typeEnum.key;
}
}
return null;
}
/**
* 转换车道功能转向
*
* @param function 车道功能 1左转;2直行;3右转;4掉头;5直左;6直右;7左直右;8左右;9左转掉头;10直行掉头;11右转掉头;12左直掉头;13直右掉头;14左直右掉头;15左右掉头
* @return
*/
public static String[] getLaneTurn(int function) {
String[] turnArr = new String[4];
if (function == 1) {
turnArr[0] = TurnEnum.LEFT.getKey();
} else if (function == 2) {
turnArr[1] = TurnEnum.STRAIGHT.getKey();
} else if (function == 3) {
turnArr[2] = TurnEnum.RIGHT.getKey();
} else if (function == 4) {
turnArr[3] = TurnEnum.UTURN.getKey();
} else if (function == 5) {
turnArr[0] = TurnEnum.LEFT.getKey();
turnArr[1] = TurnEnum.STRAIGHT.getKey();
} else if (function == 6) {
turnArr[1] = TurnEnum.STRAIGHT.getKey();
turnArr[2] = TurnEnum.RIGHT.getKey();
} else if (function == 7) {
turnArr[0] = TurnEnum.LEFT.getKey();
turnArr[1] = TurnEnum.STRAIGHT.getKey();
turnArr[2] = TurnEnum.RIGHT.getKey();
} else if (function == 8) {
turnArr[0] = TurnEnum.LEFT.getKey();
turnArr[2] = TurnEnum.RIGHT.getKey();
} else if (function == 9) {
turnArr[0] = TurnEnum.LEFT.getKey();
turnArr[3] = TurnEnum.UTURN.getKey();
} else if (function == 10) {
turnArr[1] = TurnEnum.STRAIGHT.getKey();
turnArr[3] = TurnEnum.UTURN.getKey();
} else if (function == 11) {
turnArr[2] = TurnEnum.RIGHT.getKey();
turnArr[3] = TurnEnum.UTURN.getKey();
} else if (function == 12) {
turnArr[0] = TurnEnum.LEFT.getKey();
turnArr[1] = TurnEnum.STRAIGHT.getKey();
turnArr[3] = TurnEnum.UTURN.getKey();
} else if (function == 13) {
turnArr[1] = TurnEnum.STRAIGHT.getKey();
turnArr[2] = TurnEnum.RIGHT.getKey();
turnArr[3] = TurnEnum.UTURN.getKey();
} else if (function == 14) {
turnArr[0] = TurnEnum.LEFT.getKey();
turnArr[1] = TurnEnum.STRAIGHT.getKey();
turnArr[2] = TurnEnum.RIGHT.getKey();
turnArr[3] = TurnEnum.UTURN.getKey();
} else if (function == 15) {
turnArr[0] = TurnEnum.LEFT.getKey();
turnArr[2] = TurnEnum.RIGHT.getKey();
turnArr[3] = TurnEnum.UTURN.getKey();
}
return turnArr;
}
/**
* 转换车道功能转向[]
*
* @param function 车道功能 1左转;2直行;3右转;4掉头;5直左;6直右;7左直右;8左右;9左转掉头;10直行掉头;11右转掉头;12左直掉头;13直右掉头;14左直右掉头;15左右掉头
* @return
*/
public static String[] getLaneTurnValue(int function) {
String[] turnArr = new String[4];
if (function == 1) {
turnArr[0] = TurnEnum.LEFT.getValue();
} else if (function == 2) {
turnArr[1] = TurnEnum.STRAIGHT.getValue();
} else if (function == 3) {
turnArr[2] = TurnEnum.RIGHT.getValue();
} else if (function == 4) {
turnArr[3] = TurnEnum.UTURN.getValue();
} else if (function == 5) {
turnArr[0] = TurnEnum.LEFT.getValue();
turnArr[1] = TurnEnum.STRAIGHT.getValue();
} else if (function == 6) {
turnArr[1] = TurnEnum.STRAIGHT.getValue();
turnArr[2] = TurnEnum.RIGHT.getValue();
} else if (function == 7) {
turnArr[0] = TurnEnum.LEFT.getValue();
turnArr[1] = TurnEnum.STRAIGHT.getValue();
turnArr[2] = TurnEnum.RIGHT.getValue();
} else if (function == 8) {
turnArr[0] = TurnEnum.LEFT.getValue();
turnArr[2] = TurnEnum.RIGHT.getValue();
} else if (function == 9) {
turnArr[0] = TurnEnum.LEFT.getValue();
turnArr[3] = TurnEnum.UTURN.getValue();
} else if (function == 10) {
turnArr[1] = TurnEnum.STRAIGHT.getValue();
turnArr[3] = TurnEnum.UTURN.getValue();
} else if (function == 11) {
turnArr[2] = TurnEnum.RIGHT.getValue();
turnArr[3] = TurnEnum.UTURN.getValue();
} else if (function == 12) {
turnArr[0] = TurnEnum.LEFT.getValue();
turnArr[1] = TurnEnum.STRAIGHT.getValue();
turnArr[3] = TurnEnum.UTURN.getValue();
} else if (function == 13) {
turnArr[1] = TurnEnum.STRAIGHT.getValue();
turnArr[2] = TurnEnum.RIGHT.getValue();
turnArr[3] = TurnEnum.UTURN.getValue();
} else if (function == 14) {
turnArr[0] = TurnEnum.LEFT.getValue();
turnArr[1] = TurnEnum.STRAIGHT.getValue();
turnArr[2] = TurnEnum.RIGHT.getValue();
turnArr[3] = TurnEnum.UTURN.getValue();
} else if (function == 15) {
turnArr[0] = TurnEnum.LEFT.getValue();
turnArr[2] = TurnEnum.RIGHT.getValue();
turnArr[3] = TurnEnum.UTURN.getValue();
}
return turnArr;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
package com.wanji.indicators.constant;
/**
* @ClassName VehicleColorEnum
* @Description
* @Date 2021/4/15 8:29
* @Version 1.0
*/
public enum VehicleColorEnum {
//未知
UNKNOWN((byte)0),
//白色
WHITE((byte)1),
//黑色
BLACK((byte)2),
//红色
RED((byte)3),
//银色
SILVERY((byte)4),
//黄色
YELLOW((byte)5),
//蓝色
BLUE((byte)6),
//彩色
PARTICOLOR((byte)7),
//棕色
BROWN((byte)8),
//灰色
GREY((byte)9);
byte color;
VehicleColorEnum(byte b) {
this.color=b;
//命令字
}
public byte getColor(){
return color;
}
}
package com.wanji.indicators.constant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @ClassName VehicleTypeEnum
* @Description
* @Date 2021/4/14 16:56
* @Version 1.0
*/
public enum VehicleTypeEnum {
//未知
UNKNOWN((short) 0),
//小客车
CAR((short) 1),
//大货车
TRUCK((short) 2),
//大巴车
BUS((short) 3),
//行人
PEDESTRIAN((short) 4),
//自行车
BIKE((short) 5),
//电动车
ELECTROMOBILE((short) 6),
//中巴车
MEDIUMBUS((short) 7),
//危化品
DANGERCAR((short) 8),
//遗撒物
LITTER((short) 9),
//小货车
VAN((short) 10),
//中货车
MIDTRUCK((short) 11),
//救护车
AMBULANCE((short) 12),
//消防车
FIREVEHICEL((short) 13),
//机动摩托车
MOTOR((short) 14),
//外卖车
TACKEOUT((short) 150),
//外卖车-美团
TACKEOUT_MEITUAN((short) 151),
//外卖车-饿了么
TACKEOUT_ELEME((short) 152),
//外卖车-肯德基
TACKEOUT_KFC((short) 153),
//外卖车-麦当劳
TACKEOUT_MCDONALD((short) 154),
//快递车
EXPRESS((short) 160),
//快递车-京东
EXPRESS_JD((short) 161),
//快递车-顺丰
EXPRESS_SF((short) 162),
//快递车-申通
EXPRESS_STO((short) 163),
//快递车-圆通
EXPRESS_YTO((short) 164),
//快递车-中通
EXPRESS_ZTO((short) 165),
//快递车-韵达
EXPRESS_YUNDA((short) 166);
short type;
VehicleTypeEnum(short b) {
this.type = b;
//命令字
}
public short getType() {
return type;
}
/**
* 根据大类获取小类
* @param type
* @return
*/
public static List<Integer> getOriginalTypeList(String type) {
List<Integer> originalTypeList = new ArrayList<>();
if (Objects.isNull(type))
return originalTypeList;
String[] sps = type.split(",");
for (String tp : sps) {
if (Objects.equals(ParticipantEnum.MOTOR.getType(), tp)) {
List<Integer> list = VehicleTypeEnum.VehicleType.stream().map(o -> (int)o.getType()).collect(Collectors.toList());
originalTypeList.addAll(list);
}
if (Objects.equals(ParticipantEnum.NON_MOTOR.getType(), tp)) {
List<Integer> list = VehicleTypeEnum.NonMotorVehicleType.stream().map(o -> (int)o.getType()).collect(Collectors.toList());
originalTypeList.addAll(list);
}
if (Objects.equals(ParticipantEnum.PED.getType(), tp)) {
List<Integer> list = VehicleTypeEnum.PedestrianType.stream().map(o -> (int)o.getType()).collect(Collectors.toList());
originalTypeList.addAll(list);
}
}
return originalTypeList;
}
// 机动车类型
public static final List<VehicleTypeEnum> VehicleType = Arrays.asList(new VehicleTypeEnum[]{
VehicleTypeEnum.CAR,
VehicleTypeEnum.TRUCK,
VehicleTypeEnum.BUS,
VehicleTypeEnum.MEDIUMBUS,
VehicleTypeEnum.DANGERCAR,
VehicleTypeEnum.VAN,
VehicleTypeEnum.MIDTRUCK,
VehicleTypeEnum.AMBULANCE,
VehicleTypeEnum.FIREVEHICEL,
VehicleTypeEnum.MOTOR
});
/**
* 是否为机动车
*
* @param type 车辆类型
* @return
*/
public static boolean isMotorVehicles(short type) {
return VehicleTypeEnum.CAR.type == type
|| VehicleTypeEnum.TRUCK.type == type
|| VehicleTypeEnum.BUS.type == type
|| VehicleTypeEnum.MEDIUMBUS.type == type
|| VehicleTypeEnum.DANGERCAR.type == type
|| VehicleTypeEnum.VAN.type == type
|| VehicleTypeEnum.MIDTRUCK.type == type
|| VehicleTypeEnum.AMBULANCE.type == type
|| VehicleTypeEnum.FIREVEHICEL.type == type
|| VehicleTypeEnum.MOTOR.type == type;
}
// 非机动车类型
public static final List<VehicleTypeEnum> NonMotorVehicleType = Arrays.asList(new VehicleTypeEnum[] {
VehicleTypeEnum.BIKE,
VehicleTypeEnum.ELECTROMOBILE,
VehicleTypeEnum.TACKEOUT,
VehicleTypeEnum.TACKEOUT_MEITUAN,
VehicleTypeEnum.TACKEOUT_ELEME,
VehicleTypeEnum.TACKEOUT_KFC,
VehicleTypeEnum.TACKEOUT_MCDONALD,
VehicleTypeEnum.EXPRESS,
VehicleTypeEnum.EXPRESS_JD,
VehicleTypeEnum.EXPRESS_SF,
VehicleTypeEnum.EXPRESS_STO,
VehicleTypeEnum.EXPRESS_YTO,
VehicleTypeEnum.EXPRESS_ZTO,
VehicleTypeEnum.EXPRESS_YUNDA
});
/**
* 是否为非机动车
*
* @param type 车辆类型
* @return
*/
public static boolean isNonMotorVehicles(short type) {
return VehicleTypeEnum.BIKE.type == type
|| VehicleTypeEnum.ELECTROMOBILE.type == type
|| isTakeout(type)
|| isExpress(type);
}
// 非机动车类型
public static final List<VehicleTypeEnum> PedestrianType = Arrays.asList(new VehicleTypeEnum[] {
VehicleTypeEnum.PEDESTRIAN
});
/**
* 是否为行人
*
* @param type 车辆类型
* @return
*/
public static boolean isPedestrian(short type) {
return VehicleTypeEnum.PEDESTRIAN.type == type;
}
/**
* 是否为 小汽车
*
* @param type 车辆类型
* @return
*/
public static boolean isCar(short type) {
return VehicleTypeEnum.CAR.type == type;
}
/**
* 是否为 大货车
*
* @param type 车辆类型
* @return
*/
public static boolean isTruck(short type) {
return VehicleTypeEnum.TRUCK.type == type;
}
/**
* 是否为 中货车
*
* @param type 车辆类型
* @return
*/
public static boolean isMediumTruck(short type) {
return VehicleTypeEnum.MIDTRUCK.type == type;
}
/**
* 是否为 小货车
*
* @param type 车辆类型
* @return
*/
public static boolean isSmallTruck(short type) {
return VehicleTypeEnum.VAN.type == type;
}
/**
* 是否为 大巴车
*
* @param type 车辆类型
* @return
*/
public static boolean isBus(short type) {
return VehicleTypeEnum.BUS.type == type;
}
/**
* 是否为 中巴车
*
* @param type 车辆类型
* @return
*/
public static boolean isMediumBus(short type) {
return VehicleTypeEnum.MEDIUMBUS.type == type;
}
/**
* 是否为 危化品车
*
* @param type 车辆类型
* @return
*/
public static boolean isDangerCar(short type) {
return VehicleTypeEnum.DANGERCAR.type == type;
}
/**
* 是否为 救护车
*
* @param type 车辆类型
* @return
*/
public static boolean isAmbulance(short type) {
return VehicleTypeEnum.AMBULANCE.type == type;
}
/**
* 是否为 消防车
*
* @param type 车辆类型
* @return
*/
public static boolean isFireEngine(short type) {
return VehicleTypeEnum.FIREVEHICEL.type == type;
}
/**
* 是否为 外卖车
*
* @param type 车辆类型
* @return
*/
public static boolean isTakeout(short type) {
return VehicleTypeEnum.TACKEOUT.type == type
|| VehicleTypeEnum.TACKEOUT_MEITUAN.type == type
|| VehicleTypeEnum.TACKEOUT_ELEME.type == type
|| VehicleTypeEnum.TACKEOUT_KFC.type == type
|| VehicleTypeEnum.TACKEOUT_MCDONALD.type == type;
}
/**
* 是否为 外卖车
*
* @param type 车辆类型
* @return
*/
public static boolean isTakeout_MEITUAN(short type) {
return VehicleTypeEnum.TACKEOUT_MEITUAN.type == type;
}
/**
* 是否为 外卖车
*
* @param type 车辆类型
* @return
*/
public static boolean isTakeout_ELEME(short type) {
return VehicleTypeEnum.TACKEOUT_ELEME.type == type;
}
/**
* 是否为 快递车
*
* @param type 车辆类型
* @return
*/
public static boolean isExpress(short type) {
return VehicleTypeEnum.EXPRESS.type == type
|| VehicleTypeEnum.EXPRESS_JD.type == type
|| VehicleTypeEnum.EXPRESS_SF.type == type
|| VehicleTypeEnum.EXPRESS_STO.type == type
|| VehicleTypeEnum.EXPRESS_YTO.type == type
|| VehicleTypeEnum.EXPRESS_ZTO.type == type
|| VehicleTypeEnum.EXPRESS_YUNDA.type == type;
}
/**
* 是否为 顺丰快递车
*
* @param type 车辆类型
* @return
*/
public static boolean isExpress_SF(short type) {
return VehicleTypeEnum.EXPRESS_SF.type == type;
}
/**
* 是否为 京东快递车
*
* @param type 车辆类型
* @return
*/
public static boolean isExpress_JD(short type) {
return VehicleTypeEnum.EXPRESS_JD.type == type;
}
}
......@@ -8,6 +8,7 @@ import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.activerecord.Model;
@Data
@EqualsAndHashCode(callSuper=false)
......
......@@ -2,45 +2,19 @@ package com.wanji.indicators.mapper;
import com.wanji.indicators.entity.BaseLaneInfo;
import java.util.List;
import java.util.Map;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface BaseLaneInfoMapper extends BaseMapper<BaseLaneInfo>{
/**
* 查询表t_base_lane_info所有信息
*/
List<BaseLaneInfo> findAllBaseLaneInfo();
/**
* 根据主键id查询表t_base_lane_info信息
* @param id
*/
BaseLaneInfo findBaseLaneInfoByid(@Param("id") String id);
/**
* 根据条件查询表t_base_lane_info信息
* @param baseLaneInfo
*/
List<BaseLaneInfo> findBaseLaneInfoByCondition(BaseLaneInfo baseLaneInfo);
* 查询路口进口车道、出口车道信息
* @return
*/
public List<Map<String,Object>> findCrossLaneInfo(Map<String,Object> params);
/**
* 根据主键id查询表t_base_lane_info信息
* @param id
*/
Integer deleteBaseLaneInfoByid(@Param("id") String id);
/**
* 根据主键id更新表t_base_lane_info信息
* @param baseLaneInfo
*/
Integer updateBaseLaneInfoByid(BaseLaneInfo baseLaneInfo);
/**
* 新增表t_base_lane_info信息
* @param baseLaneInfo
*/
Integer addBaseLaneInfo(BaseLaneInfo baseLaneInfo);
}
package com.wanji.indicators.model;
import lombok.Data;
import java.io.Serializable;
/**
* @Author ke.han
* @Date 2020/4/10 19:25
**/
@Data
public class BaseEventResultModel implements Serializable {
/**
* 路口id
*/
private String crossId;
/**
* 路口车辆标识
*/
private Integer trackId;
/**
* 所属车道id
*/
private String laneId;
/**
* rid
*/
private String rid;
/**
* 事件标识
*/
private String eventClass;
/**
* 车辆号牌
*/
private String plateNumber;
/**
* 车牌颜色
*/
private int plateColor;
/**
* 车身颜色
*/
private int vehicleColor;
/**
* 车辆品牌
*/
private String vehicleBrand;
/**
* 车牌类型
*/
private int plateType;
/**
* 车辆类型
*/
private String vehicleType;
/**
* 事件类型
*/
private String eventType;
/**
* 事件类型编码
*/
private int typeCode;
/**
* 开始时间
*/
private long startTime;
/**
* 结束时间
*/
private long endTime;
/**
* 持续时间
*/
private long duration;
/**
* 事件信息
*/
private String desc;
/**
* 事件信息 - 英文
*/
private String descEn;
/**
* 坐标点
*/
private double [] location;
/**
* 车速
*/
private double speed;
/**
* 图片1时间
*/
private long pictureTime1;
/**
* 图片2时间
*/
private long pictureTime2;
/**
* 图片3时间
*/
private long pictureTime3;
private double x;
private double y;
/**
* 时间 必须有
*/
private long time;
/**
* 事件检出时间
*/
private long checkOutTime;
/**
* 路口名称
*/
private String recoveryCross;
/**
* 状态
*/
private int eventState;
/**
* 开始时间,格式:yyyy-MM-dd HH:mm:ss.SSS
*/
private String startDateTime;
/**
* 截止时间,格式:yyyy-MM-dd HH:mm:ss.SSS
*/
private String endDateTime;
/**
* 记录时间,格式:yyyy-MM-dd HH:mm:ss.SSS
*/
private String recordDateTime;
/**
* 影响的rid,多个以逗号分割
*/
private String effectRids;
public BaseEventResultModel() {
}
}
......@@ -3,7 +3,6 @@ package com.wanji.indicators.model;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* @author fengyi
......@@ -62,4 +61,10 @@ public class CarTrackModel implements Serializable {
/*渠化ID*/
private String segmentId;
}
/***********扩展属性*********************************/
//路口内旅行时间,单位秒
private Integer travelTime;
//待转区车道长度,单位米
private Double waitingTurnLaneDist;
}
package com.wanji.indicators.model;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
@Data
public class ConflictPointModelBase extends BaseEventResultModel implements Serializable {
private String trackIdNear;
private List<List<Double>> lines;
private List<List<Double>> linesNear;
private Double xNear;
private Double yNear;
private int num;
}
package com.wanji.indicators.model;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@Data
public class EventCross implements Serializable {
/**
* 路口唯一id
*/
private String crossId;
/**
* 路口车辆唯一id
*/
private int trackID;
/**
* 经度
*/
private double lng;
/**
* 纬度
*/
private double lat;
/**
* 当前所在车道
*/
private String matchLane;
/**
*
*/
private String matchLaneName;
/**
* 路口入口车道
*/
private String entryLane;
/**
* 当前所在rid
*/
private String rid;
/**
* 速度
*/
private double speed;
/**
* 转向角度
*/
private double driveAngle;
/**
* 车牌
*/
private String plateNumber;
/**
* 车辆类型
*/
private int vehicleType;
/**
* 车牌类型
*/
private int plateType;
/**
* 车牌颜色
*/
private int plateColor;
/**
* 车身颜色
*/
private int vehicleColor;
/**
* 车辆品牌
*/
private String vehicleBrand;
/**
* 数据类型 行人机动车 / 2 / 3
*/
private int objectType;
/**
* 是否在路口面 1 = 在路口面 0 = 不在路口面
*/
private int crossing;
/**
* 车道上 车辆离停止线的距离 过了停止线 = -1
*/
private double stopLineDist;
/**
* 路口面上 车辆离开停止线的距离
*/
private double areaDist;
/**
* 事件时间
*/
private long time;
/**
* 持续时间
*/
private long duration;
/**
* 车道功能 1左转 2直行 3右转 4掉头
*/
private String laneFunction;
/**
* 加速度
*/
private double at;
private long startTime;
private long endTime;
//记录当前车辆与其周围所有运动车辆的距离;
private Map<Integer,Double> mark;
private List<EventCross> list;
private double distance;
private double ratio;
private double maxSpeed;
/**
* 信号灯态
* @return
*/
private String lampState;
/**
* 分类事件 的 类型代码
*/
private int typeCode;
/**
* 累计的出口行车距离
*/
private double areaDistOut;
/**
* 旅行时间
*/
private double travelTime;
/**
* 车道位置字典(Lane location)
*/
private String laneLocation;
private int eventState;
private String recoveryCross;//设备名称
private String trackIdNear;
/*路网数据*/
private CarTrackModel.RoadNet roadnet;
}
......@@ -16,7 +16,7 @@ public class FrameModel implements Serializable {
private String timeStamp;
private Long globalTimeStamp;
private Integer participantNum;
private String orgCode ="";
private String orgCode = "";
private List<CarTrackModel> e1FrameParticipant;
......
package com.wanji.indicators.model;
import com.alibaba.fastjson.annotation.JSONField;
import com.google.gson.JsonObject;
import lombok.Data;
import java.io.Serializable;
......
......@@ -2,7 +2,6 @@ package com.wanji.indicators.model;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
import java.util.Date;
......
......@@ -3,7 +3,6 @@ package com.wanji.indicators.model;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @author fengyi
......
......@@ -30,7 +30,7 @@ public class BaseLaneInfoServiceImpl extends ServiceImpl<BaseLaneInfoMapper, Bas
@Override
public List<BaseLaneInfo> findLaneRidInfo() {
List<BaseLaneInfo> list = baseLaneInfoMapper.findAllBaseLaneInfo();
List<BaseLaneInfo> list = null;//baseLaneInfoMapper.findAllBaseLaneInfo();
for (BaseLaneInfo model : list) {
String wkt = model.getRidWkt();
LineString lineString = GeomsConvertUtil.getLineString(wkt);
......
package com.wanji.indicators.source;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
/**
* 待转区数据源
*/
public class WaitingAreaSource extends RichParallelSourceFunction<
HashMap<String, Integer>> {
private static final Logger log = LoggerFactory.getLogger(WaitingAreaSource.class);
private volatile boolean running = true;
@Override
public void run(SourceContext<HashMap<String, Integer>> out) throws Exception {
HashMap<String, Integer> outMap = new HashMap<>();
if (running) {
String sql = "SELECT * FROM cross_flink_settings WHERE waiting_area <> 0";
try {
if (outMap.size() > 0) {
out.collect(outMap);
}
} catch (Exception e) {
log.error("路口待转配置查询异常:" + e.getMessage());
out.collect(outMap);
}
// Thread.sleep(1000L*30L);
}
}
@Override
public void cancel() {
running = false;
}
}
package com.wanji.indicators.task.conflictpoint;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.BaseEventResultModel;
import com.wanji.indicators.model.ConflictPointModelBase;
import com.wanji.indicators.model.EventCross;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.source.WaitingAreaSource;
import com.wanji.indicators.task.conflictpoint.func.*;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Properties;
/**
* 冲突点计算
*/
public class ConflictEventMain implements Serializable {
private StreamExecutionEnvironment env;
private PropertiesHelper instance;
private Properties p;
private boolean isDebug;
private String path;
private ConflictEventMain(StreamExecutionEnvironment env) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = instance.getProperties();
}
public static ConflictEventMain init(StreamExecutionEnvironment env) {
return new ConflictEventMain(env);
}
private ConflictEventMain(StreamExecutionEnvironment env, String path,boolean isDebug) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = instance.getProperties();
this.path = path;
this.isDebug = isDebug;
}
public static ConflictEventMain init(StreamExecutionEnvironment env, String path,boolean isDebug) {
return new ConflictEventMain(env, path, isDebug);
}
public void run(DataStream<FrameModel> streamSource) {
/**
* 转换数据为实体类 并筛选需要计算的数据(路口面 上的数据)
*/
SingleOutputStreamOperator<FrameModel> eventStream = streamSource
.flatMap(new ConflictFrameFlatMap())
.setParallelism(1)
.name("冲突点-解析数据");
/**
* 读取路口待转区配置 并过滤待转区
*/
// SingleOutputStreamOperator<FrameModel> flatMapWaitingArea = eventStream
// .connect(env.addSource(new WaitingAreaSource()).name("冲突点-查询路口待转区配置").broadcast())
// .flatMap(new ConflictWaitingAreaCoFlatMap())
// .setParallelism(1)
// .name("冲突点-匹配待转区并过滤其数据");
/**
* 过滤拥堵状态下的冲突点数据
*/
// SingleOutputStreamOperator<FrameModel> frameModelOutputStream = streamSource
// .flatMap(new ConflictCongestionFilterFlatMap())
// .setParallelism(1)
// .name("冲突点-过滤拥堵状态下的停驶车辆数据");
/**
* 按照 crossId + time 分组
*/
SingleOutputStreamOperator<EventCross> processWindowStopAndRuning = streamSource
.flatMap(new ConflictCheckConflictFlatMap())
.setParallelism(1)
.name("冲突点-计算是否发生冲突");
/**
* 去重
*/
SingleOutputStreamOperator<EventCross> eventMapState = processWindowStopAndRuning
.keyBy("crossId","trackID")
.process(new ConflictDeduplicationKeyedProcess())
.setParallelism(1)
.name("冲突点-结果去重");
/**
* 构建结果集
*/
SingleOutputStreamOperator<String> eventTestSingleOutput = eventMapState
.flatMap(new ConflictEncapsulationResultFlatMap())
.setParallelism(1)
.name("冲突点-封装结果数据");
if (isDebug) {
/**
* 转换数据
*/
SingleOutputStreamOperator<BaseEventResultModel> stringSingleOutputStream = eventTestSingleOutput
.flatMap(new FlatMapFunction<String, BaseEventResultModel>() {
@Override
public void flatMap(String jsonData, Collector<BaseEventResultModel> collector) throws Exception {
ConflictPointModelBase result = JSONObject.parseObject(jsonData,ConflictPointModelBase.class);
collector.collect(result);
}
})
.setParallelism(1);
//stringSingleOutputStream.addSink(new EventFileSink(this.path));
} else {
/**
* 安全专题数据产出
*/
FlinkKafkaProducer producer1 = new FlinkKafkaProducer<>(
p.getProperty("cross.event.data.topic"),
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
instance.getProducerProperties(),
FlinkKafkaProducer.Semantic.NONE);
eventTestSingleOutput.addSink(producer1).setParallelism(1).name("冲突点-结果输出至kafka");
eventTestSingleOutput.print("冲突点-结果").name("冲突点-结果打印");
}
}
}
package com.wanji.indicators.task.conflictpoint;
import com.wanji.indicators.model.BaseEventResultModel;
import com.wanji.indicators.model.ConflictPointModelBase;
import com.wanji.indicators.model.EventCross;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.source.WaitingAreaSource;
import com.wanji.indicators.task.conflictpoint.func.*;
import com.wanji.indicators.util.PropertiesHelper;
import com.wanji.indicators.util.PtInPolyUtil;//PtInPolyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Properties;
public class ConflictEventMainTest implements Serializable {
private StreamExecutionEnvironment env;
private PropertiesHelper instance;
private Properties p;
private String path;
private ConflictEventMainTest(StreamExecutionEnvironment env, String path) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = instance.getProperties();
this.path = path;
}
public static ConflictEventMainTest init(StreamExecutionEnvironment env, String path) {
return new ConflictEventMainTest(env, path);
}
public void run(DataStream<FrameModel> streamSource) {
/**
* 转换数据为实体类 并筛选需要计算的数据(路口面 上的数据)
*/
SingleOutputStreamOperator<FrameModel> eventStream = streamSource
.flatMap(new ConflictFrameFlatMap())
.setParallelism(1)
.name("冲突点-解析数据");
/**
* 读取路口待转区配置 并过滤待转区
*/
SingleOutputStreamOperator<FrameModel> flatMapWaitingArea = eventStream
.connect(env.addSource(new WaitingAreaSource()).name("冲突点-查询路口待转区配置").broadcast())
.flatMap(new ConflictWaitingAreaCoFlatMap())
.setParallelism(1)
.name("冲突点-匹配待转区并过滤其数据");
/**
* 过滤拥堵状态下的冲突点数据
*/
SingleOutputStreamOperator<FrameModel> frameModelOutputStream = flatMapWaitingArea
.flatMap(new ConflictCongestionFilterFlatMap())
.setParallelism(1)
.name("冲突点-过滤拥堵状态下的停驶车辆数据");
/**
* 按照 crossId + time 分组
*/
SingleOutputStreamOperator<EventCross> processWindowStopAndRuning = frameModelOutputStream
.flatMap(new ConflictCheckConflictFlatMap())
.setParallelism(1)
.name("冲突点-计算是否发生冲突");
/**
* 去重
*/
SingleOutputStreamOperator<EventCross> eventMapState = processWindowStopAndRuning
.keyBy("crossId","trackID")
.process(new ConflictDeduplicationKeyedProcess())
.setParallelism(1)
.name("冲突点-结果去重");
/**
* 构建结果集
*/
SingleOutputStreamOperator<BaseEventResultModel> eventTestSingleOutput = eventMapState
.flatMap(new FlatMapFunction<EventCross, BaseEventResultModel>() {
@Override
public void flatMap(EventCross event, Collector<BaseEventResultModel> collector) throws Exception {
if (StringUtils.isNotBlank(event.getPlateNumber())) {
ConflictPointModelBase model = new ConflictPointModelBase();
model.setCrossId(event.getCrossId());
model.setTrackId(event.getTrackID());
model.setPlateNumber(event.getPlateNumber());
model.setVehicleBrand(event.getVehicleBrand());
model.setPlateColor(event.getPlateColor());
model.setPlateType(event.getPlateType());
model.setVehicleType(event.getVehicleType() + "");
model.setTrackIdNear(event.getTrackIdNear());
// model.setEventClass(EventType.EVENT.getName());
// model.setEventType(EventType.CONFLICT_POINT.getName());
// model.setTypeCode(EventType.CONFLICT_POINT.getCode());
model.setDesc("路口面冲突点: 车辆 " + event.getPlateNumber() + "行驶方向为 " + PtInPolyUtil.getDir(event.getDriveAngle()));
String matchLane = event.getMatchLane();
String entryLane = event.getEntryLane();
if (StringUtils.isNotBlank(matchLane)) {
model.setLaneId(matchLane);
model.setRid(matchLane.substring(0, 23));
} else {
if (StringUtils.isNotBlank(entryLane)) {
model.setLaneId(entryLane);
model.setRid(entryLane.substring(0, 23));
}
}
model.setStartTime(event.getTime() - 1000L);
model.setEndTime(event.getTime() + 1000L);
model.setX(event.getLng());
model.setY(event.getLat());
model.setDuration(2000);
model.setTime(model.getStartTime());
if (model.getEndTime() > 0 &&
model.getStartTime() > 0 &&
model.getEndTime() > model.getStartTime()) {
collector.collect(model);
}
}
}
})
.setParallelism(1);
// eventTestSingleOutput.addSink(new EventFileSink(this.path));
}
}
package com.wanji.indicators.task.conflictpoint.func;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.EventCross;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel, EventCross> {
@Override
public void flatMap(FrameModel frameModel, Collector<EventCross> collector) throws Exception {
ConfigModel configModel = new ConfigModel();
//判断该车道内的是否同时存在行驶车辆和停驶车辆 如果同时存在则 set.size = 2 否则 set.size = 1
Set<Integer> keyStop = new HashSet<>();
Set<Integer> keyRun = new HashSet<>();
//存放停止的车辆
List<EventCross> listStop = new ArrayList<>();
//存放运行的车辆
List<EventCross> listRun = new ArrayList<>();
List<CarTrackModel> trackList = frameModel.getE1FrameParticipant();
for (CarTrackModel carTrack : trackList) {
double speed = carTrack.getSpeed();
int trackID = carTrack.getId();
EventCross eventCross = toEventCross(carTrack);
if (speed <= 0.0) {
if (keyStop.add(trackID)) {
listStop.add(eventCross);
}
} else {
if (keyRun.add(trackID)) {
listRun.add(eventCross);
}
}
}
for (EventCross a : listStop) {
int trackIDA = a.getTrackID();
String ridA = a.getRid();
double driveAngleA = a.getDriveAngle();
double lngA = a.getLng();
double latA = a.getLat();
boolean isContinue = false;
for (CarTrackModel carTrack : trackList) {
if (a.getTrackID() != carTrack.getId() &&
a.getLaneFunction().contains("Constant.STRAIGHT_LINE_CODE") &&
a.getLaneFunction().contains("Constant.LEFT_TURN_CODE") &&
StringUtils.isNotBlank(a.getRid()) &&
a.getRid().equals(carTrack.getRoadnet().getRid())) {
//判定停止车辆前是否有同行车辆
double distance = PtInPolyUtil.getDistance(a.getLng(), a.getLat(), carTrack.getLongitude(), carTrack.getLatitude());
if (distance < configModel.getConflictFilterSameDirTravelDist() && carTrack.getWaitingTurnLaneDist() > a.getAreaDist()) {
double angle = PtInPolyUtil.getAngle(a.getLng(), a.getLat(), carTrack.getLongitude(), carTrack.getLatitude());
//计算a车行驶方向和前车连线方向是否一致 确定前车是否在正前方
double angleX = Math.abs(a.getDriveAngle() - angle);
if (angleX < 10.0 || angleX > 350.0) {
isContinue = true;
break;
}
}
}
}
if (isContinue) {
continue;
}
for (EventCross b : listRun) {
int trackIDB = b.getTrackID();
String ridB = b.getRid();
boolean isRun = true;
//两方向车道功能都是直行的且行车方向相反则排除
if ("Constant.STRAIGHT_LINE_CODE".equals(a.getLaneFunction()) && "Constant.STRAIGHT_LINE_CODE".equals(b.getLaneFunction())) {
double angleX = Math.abs(a.getDriveAngle() - b.getDriveAngle());
if (angleX > 160.0 && angleX < 200.0) {
isRun = false;
}
}
//两方向车道功能都是左转的且行车方向相反则排除
if ("Constant.LEFT_TURN_CODE".equals(a.getLaneFunction()) && "Constant.LEFT_TURN_CODE".equals(b.getLaneFunction())) {
double angleX = Math.abs(a.getDriveAngle() - b.getDriveAngle());
if (angleX > 160.0 && angleX < 200.0) {
isRun = false;
}
}
if (isRun && trackIDA != trackIDB && !ridA.equals(ridB)) {
double lngB = b.getLng();
double latB = b.getLat();
//车头->延长1.2后的点作为车头坐标
double[] head = PtInPolyUtil.calLocationByDistanceAndLocationAndDirection(driveAngleA, lngA, latA, configModel.getConflictStopCarAlongLengOfHead());//延长1.2m
double distance = PtInPolyUtil.getDistance(head[0], head[1], lngB, latB);//车头位置与行驶车辆的距离
double driveAngleB = b.getDriveAngle();
double angleX = Math.abs(driveAngleA - driveAngleB);
if (distance < configModel.getConflictTwoCarMaxDist()
&& angleX > configModel.getConflictTwoCarMinAngle()
&& angleX < configModel.getConflictTwoCarMaxAngle()) {//停车车辆与行驶车辆两车角度差计算范围
//A 反向延长2.5m后的坐标作为车尾坐标
double[] doublesA = PtInPolyUtil.calLocationByDistanceAndLocationAndDirection(getBackAngle(driveAngleA), lngA, latA, configModel.getConflictStopCarAlongLengOfRear());
//A前 停车车辆向前延长线
double[] doublesAS = PtInPolyUtil.calLocationByDistanceAndLocationAndDirection(driveAngleA, lngA, latA, configModel.getConflictStopCarForwardAlongLength());
//A后 停车车辆向后延长线
double[] doublesAE = PtInPolyUtil.calLocationByDistanceAndLocationAndDirection(getBackAngle(driveAngleA), lngA, latA, configModel.getConflictStopCarBackAlongLength());
//B 行驶车辆长度阈值
double[] doublesB = PtInPolyUtil.calLocationByDistanceAndLocationAndDirection(getBackAngle(driveAngleB), lngB, latB, configModel.getConflictTravelCarLength());
//B前 行驶车辆向前延长线长度
double[] doublesBS = PtInPolyUtil.calLocationByDistanceAndLocationAndDirection(driveAngleB, lngB, latB, configModel.getConflictTravelCarForwardAlongLength());
//B后 行驶车辆向后延长线长度
double[] doublesBE = PtInPolyUtil.calLocationByDistanceAndLocationAndDirection(getBackAngle(driveAngleB), lngB, latB, configModel.getConflictTravelCarBackAlongLength());
//A前 交 B后
int isIntersectA = PtInPolyUtil.segIntersect(lngA, latA, doublesAS[0], doublesAS[1], doublesB[0], doublesB[1], doublesBE[0], doublesBE[1]);
//B前 交 A后
int isIntersectB = PtInPolyUtil.segIntersect(doublesA[0], doublesA[1], doublesAE[0], doublesAE[1], lngB, latB, doublesBS[0], doublesBS[1]);
if (isIntersectA != 1 && isIntersectB != 1) {
double angle = PtInPolyUtil.getAngle(head[0], head[1], lngB, latB);
double abs = Math.abs(angle - driveAngleA);
if (abs < configModel.getConflictResultAngleDif()) {//停止车辆车头位置与行驶车辆角度差
a.setTrackIdNear(a.getTrackID() + "Constant.COMMA" + b.getTrackID());
collector.collect(a);
}
}
doublesA = null;
doublesB = null;
doublesAS = null;
doublesBS = null;
doublesAE = null;
doublesBE = null;
}
}
}
}
keyStop = null;
keyRun = null;
listStop = null;
listRun = null;
}
/**
* 计算反向角度
* @param angle
* @return
*/
private static double getBackAngle(double angle) {
if (angle >= 180) {
return angle - 180;
} else {
return angle + 180;
}
}
private static EventCross toEventCross(CarTrackModel carTrack){
EventCross e = new EventCross();
e.setPlateColor(carTrack.getLicenseColor());
e.setVehicleColor(carTrack.getOriginalColor());
e.setRoadnet(carTrack.getRoadnet());
e.setTrackID(carTrack.getId());
e.setLng(carTrack.getLongitude());
e.setLat(carTrack.getLatitude());
e.setSpeed(carTrack.getSpeed());
e.setDriveAngle(carTrack.getCourseAngle());
e.setPlateNumber(carTrack.getPicLicense());
e.setVehicleType(carTrack.getOriginalType());
e.setTime(carTrack.getGlobalTimeStamp());
e.setAreaDist(carTrack.getWaitingTurnLaneDist());
e.setTravelTime((long) carTrack.getTravelTime());
return e;
}
}
package com.wanji.indicators.task.conflictpoint.func;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.FrameModel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.*;
public class ConflictCongestionFilterFlatMap implements FlatMapFunction<FrameModel, FrameModel> {
@Override
public void flatMap(FrameModel frameModel, Collector<FrameModel> collector) throws Exception {
List<CarTrackModel> trackList = frameModel.getE1FrameParticipant();
ConfigModel configModel = new ConfigModel();
Map<String, List<CarTrackModel>> listMap = new HashMap<>();
for (CarTrackModel carTrack : trackList) {
if (carTrack.getSpeed() < 8.0
/*&& StringUtils.isNotBlank(carTrack.getLaneFunc()) && !carTrack.getLaneFunc().contains(Constant.LEFT_TURN_CODE)*/) {
String rid = carTrack.getRoadnet().getRid();
List<CarTrackModel> carTrackRidList = listMap.get(rid);
if (carTrackRidList == null) {
carTrackRidList = new ArrayList<>();
}
carTrackRidList.add(carTrack);
listMap.put(rid, carTrackRidList);
}
}
Set<String> ridSet = new HashSet<>();
for (String rid : listMap.keySet()) {
List<CarTrackModel> carTracks = listMap.get(rid);
int size = carTracks.size();
if (size > 5) {
//缓存疑似拥堵的方向数据
ridSet.add(rid);
//CongestionVehicleCache.getInstance().put(EventType.CONFLICT_POINT + frameModel.getCrossId() + rid, frameModel.getTime());
}
}
//获取拥堵状态 若发生拥堵则暂时不计算冲突点
boolean bool = true;
List<CarTrackModel> resultList = new ArrayList<>();
for (CarTrackModel carTrack : trackList) {
//过滤疑似拥堵方向的车辆
Long aLong = 0L;//CongestionVehicleCache.getInstance().get(EventType.CONFLICT_POINT + carTrack.getCrossId() + carTrack.getRid());
if (aLong != null) {
long txCongestion = Math.abs(aLong - carTrack.getGlobalTimeStamp());
if (txCongestion < 90000L) {
bool = false;
} else {
//拥堵消散
//CongestionVehicleCache.getInstance().remove(EventType.CONFLICT_POINT + carTrack.getCrossId() + carTrack.getRid());
}
}
if(bool){
resultList.add(carTrack);
}
}
frameModel.setE1FrameParticipant(resultList);
collector.collect(frameModel);
}
}
package com.wanji.indicators.task.conflictpoint.func;
import com.wanji.indicators.model.EventCross;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class ConflictDeduplicationKeyedProcess extends KeyedProcessFunction<Tuple, EventCross, EventCross> {
MapState<String, EventCross> mapState = null;
@Override
public void open(Configuration parameters) {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("eventMapStateQC", String.class, EventCross.class));
}
@Override
public void processElement(EventCross event, Context ctx, Collector<EventCross> out) throws Exception {
String key = event.getCrossId() + "-" + event.getTrackID();
EventCross value = mapState.get(key);
if (value == null) {
mapState.put(key, event);
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 180000L);
out.collect(event);
}
key = null;
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<EventCross> out) throws Exception {
mapState.clear();
}
}
package com.wanji.indicators.task.conflictpoint.func;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.ConflictPointModelBase;
import com.wanji.indicators.model.EventCross;
import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConflictEncapsulationResultFlatMap implements FlatMapFunction<EventCross, String> {
private static final Logger log = LoggerFactory.getLogger(ConflictEncapsulationResultFlatMap.class);
@Override
public void flatMap(EventCross event, Collector<String> out) throws Exception {
// log.info("冲突点日志: " + event.getTime() + " " + event.getPlateNumber() + " " +
// event.getAreaDist() + " " +
// event.getCrossing() + " " +
// event.getSpeed() + " ");
if(StringUtils.isNotBlank(event.getPlateNumber())){
ConflictPointModelBase model = new ConflictPointModelBase();
model.setCrossId(event.getCrossId());
model.setTrackId(event.getTrackID());
model.setPlateNumber(event.getPlateNumber());
model.setPlateColor(event.getPlateColor());
model.setPlateType(event.getPlateType());
model.setVehicleColor(event.getVehicleColor());
model.setVehicleBrand(event.getVehicleBrand());
model.setVehicleType(event.getVehicleType() + "");
model.setTrackIdNear(event.getTrackIdNear());
// model.setEventClass(EventType.EVENT.getName());
// model.setEventType(EventType.CONFLICT_POINT.getName());
// model.setTypeCode(EventType.CONFLICT_POINT.getCode());
model.setDesc("路口面冲突点: 车辆 " + event.getPlateNumber() + "行驶方向为 " + PtInPolyUtil.getDir(event.getDriveAngle()));
model.setDescEn("Intersection conflict point: the driving direction of vehicle "+ "行驶方向为 " + PtInPolyUtil.getDirEn(event.getDriveAngle()));
model.setSpeed(event.getSpeed());
model.setPictureTime1(event.getTime() - 3000);
model.setPictureTime2(event.getTime());
model.setPictureTime3(event.getTime() + 3000);
String matchLane = event.getMatchLane();
String entryLane = event.getEntryLane();
model.setTime(model.getStartTime());
if (StringUtils.isNotBlank(matchLane)) {
model.setLaneId(matchLane);
model.setRid(matchLane.substring(0, 23));
} else {
if(StringUtils.isNotBlank(entryLane)){
model.setLaneId(entryLane);
model.setRid(entryLane.substring(0, 23));
}
}
long time = event.getTime() / 1000 * 1000;
model.setStartTime(time - 1000L);
model.setEndTime(time + 1000L);
model.setX(event.getLng());
model.setY(event.getLat());
model.setLocation(new double[] {event.getLng(), event.getLat()});
model.setDuration(2000);
model.setTime(model.getStartTime());
model.setRecoveryCross(event.getRecoveryCross());
model.setCheckOutTime(1000);
if (model.getEndTime() > 0L &&
model.getStartTime() > 0L &&
model.getEndTime() > model.getStartTime()) {
out.collect(JSONObject.toJSONString(model));
}
model = null;
}
}
}
package com.wanji.indicators.task.conflictpoint.func;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.FrameModel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConflictFrameFlatMap implements FlatMapFunction<FrameModel, FrameModel> {
static Map<Integer, CarTrackModel.RoadNet> carRoadnetMap = new HashMap<>();
@Override
public void flatMap(FrameModel frameModel, Collector<FrameModel> collector) throws Exception {
List<CarTrackModel> trackList = frameModel.getE1FrameParticipant();
ConfigModel configModel = new ConfigModel();
if (configModel.getConflictIsOpen() == 1) {
List<CarTrackModel> resultList = new ArrayList<>();
for (CarTrackModel carTrack : trackList) {
CarTrackModel.RoadNet roadNet = carTrack.getRoadnet();
//路口内
if (StringUtils.equals(carTrack.getRoadnet().getLaneId(), "null")) {
//路口内无车道ID、路段ID信息,从上一次运行缓存获取标记为从哪个车道出来
resultList.add(carTrack);
} else {
carRoadnetMap.put(carTrack.getId(),roadNet);
}
}
frameModel.setE1FrameParticipant(resultList);
collector.collect(frameModel);
}
}
}
package com.wanji.indicators.task.conflictpoint.func;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class ConflictWaitingAreaCoFlatMap implements CoFlatMapFunction<FrameModel, HashMap<String, Integer>, FrameModel> {
private HashMap<String, Integer> map = new HashMap();
@Override
public void flatMap1(FrameModel frameModel, Collector<FrameModel> collector) throws Exception {
List<CarTrackModel> resultList = new ArrayList<>();
List<CarTrackModel> trackList = frameModel.getE1FrameParticipant();
for(CarTrackModel carTrack : trackList){
String entryLane = carTrack.getRoadnet().getLaneId();
if (StringUtils.isNotBlank(entryLane)) {
Integer integer = map.get(entryLane);
if (integer != null && integer > 0) {
if (carTrack.getWaitingTurnLaneDist() > (1.5 + integer * 1.0)) {
resultList.add(carTrack);
}
} else {
resultList.add(carTrack);
}
}
}
frameModel.setE1FrameParticipant(resultList);
collector.collect(frameModel);
}
@Override
public void flatMap2(HashMap<String, Integer> hashMap, Collector<FrameModel> collector) throws Exception {
this.map = hashMap;
}
}
//package com.wanji.indicators.task.conflictpoint.ttc;
//
//import com.alibaba.fastjson.JSONObject;
//import com.mapabc.constant.Constant;
//import com.mapabc.model.event.EventCross;
//import com.mapabc.project.event.cross.service.minbounding.MinBoundingPolygon;
//import com.mapabc.project.event.cross.service.minbounding.Point;
//import com.mapabc.util.PropertiesHelper;
//import com.mapabc.util.PtInPolyUtil;
//import org.apache.flink.api.common.functions.FlatMapFunction;
//import org.apache.flink.api.java.tuple.Tuple;
//import org.apache.flink.streaming.api.datastream.DataStream;
//import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
//import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
//import org.apache.flink.streaming.api.windowing.time.Time;
//import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
//import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
//import org.apache.flink.util.Collector;
//
//import java.util.*;
//
//public class TTCMain {
//
// private StreamExecutionEnvironment env;
// private PropertiesHelper instance;
// private Properties p;
//
// private TTCMain(StreamExecutionEnvironment env) {
// this.env = env;
// this.instance = PropertiesHelper.getInstance();
// this.p = instance.getProperties();
// }
//
// public static TTCMain init(StreamExecutionEnvironment env) {
// return new TTCMain(env);
// }
//
// public void run(DataStream<String> streamSource) {
// SingleOutputStreamOperator<EventCross> eventCrossStream = streamSource.flatMap(new FlatMapFunction<String, EventCross>() {
// @Override
// public void flatMap(String s, Collector<EventCross> collector) {
// EventCross eventCross = JSONObject.parseObject(s, EventCross.class);
// if (Constant.VEHICLE == eventCross.getObjectType() &&
// eventCross.getCrossing() == Constant.CROSSING) {
// collector.collect(eventCross);
// }
// }
// });
//
// SingleOutputStreamOperator<TTCTrackModel> process = eventCrossStream.keyBy("crossId", "trackID")
// .countWindow(5L)
// .process(new ProcessWindowFunction<EventCross, TTCTrackModel, Tuple, GlobalWindow>() {
// @Override
// public void process(Tuple t, Context ctx, Iterable<EventCross> iterable, Collector<TTCTrackModel> out) {
// Iterator<EventCross> iterator = iterable.iterator();
// long max = Long.MIN_VALUE;
// long min = Long.MAX_VALUE;
// EventCross maxEvent = new EventCross();
// EventCross minEvent = new EventCross();
// while (iterator.hasNext()) {
// EventCross next = iterator.next();
// long time = next.getTime();
// if (time > max) {
// max = time;
// setData(next, maxEvent);
// }
// if (time < min) {
// min = time;
// setData(next, minEvent);
// }
// }
// double distance = PtInPolyUtil.getDistance(maxEvent.getLng(), maxEvent.getLat(), minEvent.getLng(), minEvent.getLat());
// if (distance > 0.0) {
// double mark = 4.5;
// double x = -(maxEvent.getLng() - minEvent.getLng()) * mark;
// double y = -(maxEvent.getLat() - minEvent.getLat()) * mark;
// double lng = maxEvent.getLng() + x;
// double lat = maxEvent.getLat() + y;
// double angle = PtInPolyUtil.getAngle(maxEvent.getLng(), maxEvent.getLat(), lng, lat);
// EventCross endEvent = new EventCross();
// setData(maxEvent, endEvent);
// endEvent.setLng(lng);
// endEvent.setLat(lat);
// endEvent.setDriveAngle(angle);
//// double d = PtInPolyUtil.getDistance(minEvent.getLng(), minEvent.getLat(), lng, lat);
// TTCTrackModel model = new TTCTrackModel();
// model.setCrossId(maxEvent.getCrossId());
// model.setTrackID(maxEvent.getTrackID());
// model.setAngle(angle);
// model.setPlateNumber(maxEvent.getPlateNumber());
// model.setA(minEvent);
// model.setB(endEvent);
// model.setTime(maxEvent.getTime());
// out.collect(model);
// }
// }
//
// private void setData(EventCross setData, EventCross data) {
// data.setLampState(setData.getLampState());
// data.setCrossId(setData.getCrossId());
// data.setTrackID(setData.getTrackID());
// data.setLng(setData.getLng());
// data.setLat(setData.getLat());
// data.setMatchLane(setData.getMatchLane());
// data.setEntryLane(setData.getEntryLane());
// data.setRid(setData.getRid());
// data.setSpeed(setData.getSpeed());
// data.setDriveAngle(setData.getDriveAngle());
// data.setPlateNumber(setData.getPlateNumber());
// data.setVehicleType(setData.getVehicleType());
// data.setTime(setData.getTime());
// data.setStartTime(setData.getStartTime());
// data.setEndTime(setData.getEndTime());
// data.setMark(setData.getMark());
// data.setList(setData.getList());
// data.setCrossing(setData.getCrossing());
// data.setStopLineDist(setData.getStopLineDist());
// data.setAreaDist(setData.getAreaDist());
// data.setDuration(setData.getDuration());
// data.setLaneFunction(setData.getLaneFunction());
// data.setDistance(setData.getDistance());
// data.setAt(setData.getAt());
// data.setRatio(setData.getRatio());
// data.setMaxSpeed(setData.getMaxSpeed());
// data.setTypeCode(setData.getTypeCode());
// data.setObjectType(setData.getObjectType());
// data.setMatchLaneName(setData.getMatchLaneName());
// data.setVehicleColor(setData.getVehicleColor());
// data.setVehicleBrand(setData.getVehicleBrand());
// }
// });
//
//
// process.keyBy("crossId")
// .window(TumblingProcessingTimeWindows.of(Time.milliseconds(450)))
// .process(new ProcessWindowFunction<TTCTrackModel, TTCTrackModel, Tuple, TimeWindow>() {
// @Override
// public void process(Tuple t, Context ctx, Iterable<TTCTrackModel> iterable, Collector<TTCTrackModel> out) throws Exception {
// Iterator<TTCTrackModel> iterator = iterable.iterator();
// List<TTCTrackModel> list = new ArrayList<>();
// Set<Integer> set = new HashSet<>();
// while (iterator.hasNext()) {
// TTCTrackModel next = iterator.next();
// if(set.add(next.getTrackID())){
// list.add(next);
// }
// }
// for(TTCTrackModel a : list){
// for(TTCTrackModel b : list){
// if(a.getTrackID() == b.getTrackID()){
// continue;
// }
// Point p1 = new Point(a.getA().getLng(), a.getA().getLat());
// Point p2 = new Point(a.getB().getLng(),a.getB().getLat());
// Point p3 = new Point(b.getA().getLng(), b.getA().getLat());
// Point p4 = new Point(b.getB().getLng(),b.getB().getLat());
// boolean intersect = MinBoundingPolygon.intersect(p1, p2, p3, p4);
// double angleX = Math.abs(a.getAngle() - b.getAngle());
// if(angleX > 180.0){
// angleX = angleX - 180;
// }
// if(intersect && angleX > 25.0) {
// //log.info(a.getPlateNumber() + " TTC " + b.getPlateNumber() + " 相交角度 " + angleX);
// }
// }
// }
// }
// });
//
// }
//
//
//
//}
//package com.wanji.indicators.task.conflictpoint.ttc;
//
//import com.mapabc.model.event.EventCross;
//
//import java.io.Serializable;
//
//public class TTCTrackModel implements Serializable {
//
// private String crossId;
// private int trackID;
// private double angle;
// private String plateNumber;
//
// private EventCross a;
// private EventCross b;
//
// private long time;
//
// public String getCrossId() {
// return crossId;
// }
//
// public void setCrossId(String crossId) {
// this.crossId = crossId;
// }
//
// public int getTrackID() {
// return trackID;
// }
//
// public void setTrackID(int trackID) {
// this.trackID = trackID;
// }
//
// public double getAngle() {
// return angle;
// }
//
// public void setAngle(double angle) {
// this.angle = angle;
// }
//
// public String getPlateNumber() {
// return plateNumber;
// }
//
// public void setPlateNumber(String plateNumber) {
// this.plateNumber = plateNumber;
// }
//
// public EventCross getA() {
// return a;
// }
//
// public void setA(EventCross a) {
// this.a = a;
// }
//
// public EventCross getB() {
// return b;
// }
//
// public void setB(EventCross b) {
// this.b = b;
// }
//
// public long getTime() {
// return time;
// }
//
// public void setTime(long time) {
// this.time = time;
// }
//}
package com.wanji.indicators.task.track.stream;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.task.track.service.CarRoutePathMainNew;
import com.wanji.indicators.task.track.service.TrackStoreMainNew;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import java.util.Calendar;
import java.util.Properties;
public class CarRoutePathMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
public static void main(String[] args) {
//批量轨迹es index,逗号分割:esIndexName,windowTime
String indexName = args[0];
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
DateTime currentDateTime = new DateTime();
DateTime dateTime = currentDateTime.withMillisOfDay(0).plusDays(-1);
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms","1000")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(properties.getProperty("consumer.topic"))
.setGroupId(properties.getProperty("consumer.group.id")+"-Car-Route-Path")
/* 设置起始偏移量有以下几种情况:
1.从指定的位置消费:OffsetsInitializer.offsets(Map< TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))//设置起始偏移量,也就是从哪里消费
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(),"kafka-car-track-source");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("轨迹帧数据-JsonToObject");
SingleOutputStreamOperator<CarTrackModel> carTrackModelStream =
frameModelStream.
flatMap(new CarTrackFlatMap())
.setParallelism(1)
.name("轨迹帧数据解析-ToCarTrackModel");
try {
//轨迹路径
CarRoutePathMainNew.init(env,indexName).run(carTrackModelStream);
env.execute("路径计算工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.task.track.stream;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.task.track.service.AreaRealTimeIndicatorMainNew;
import com.wanji.indicators.task.track.service.CarRoutePathMainNew;
import com.wanji.indicators.task.track.service.TrackStoreMainNew;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import java.util.Properties;
/**
* 对单辆车轨迹,每30秒封装一个批次
*/
public class TrackUnionMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
public static void main(String[] args) {
//批量轨迹es index,逗号分割:esIndexName,windowTime
String indexName = "";//args[0];
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DateTime currentDateTime = new DateTime();
//过去一天0点
DateTime dateTime = currentDateTime.withMillisOfDay(0).plusDays(-1);
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms","1000")
.setProperty("commit.offsets.on.checkpoint","true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(properties.getProperty("consumer.topic"))
.setGroupId(properties.getProperty("consumer.group.id")+"_BATCH_TRACK")
/* 设置起始偏移量有以下几种情况:
1.从指定的位置消费:OffsetsInitializer.offsets(Map< TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))//设置起始偏移量,也就是从哪里消费
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(),"kafka-car-track-source");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("轨迹帧数据-JsonToObject");
SingleOutputStreamOperator<CarTrackModel> carTrackModelStream =
frameModelStream.
flatMap(new CarTrackFlatMap())
.setParallelism(1)
.name("轨迹帧数据解析-ToCarTrackModel");
try {
//批量轨迹封装
TrackStoreMainNew.init(env,indexName).run(carTrackModelStream);
//AreaRealTimeIndicatorMainNew.init(env,indexName).run(carTrackModelStream);
env.execute("轨迹处理工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
......@@ -9,6 +9,7 @@ package com.wanji.indicators.udf;
import com.vividsolutions.jts.geom.LineString;
import com.wanji.indicators.util.ArithOfBigDecmial;
import com.wanji.indicators.util.GeomsConvertUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
......@@ -19,6 +20,8 @@ public class LineLengthUdf extends ScalarFunction {
public void open(FunctionContext context) {
}
public double eval(String points) {
if (StringUtils.isEmpty(points))
return 0;
LineString lineString = GeomsConvertUtil.getLineString(points);
//度单位转换为米单位
double length = ArithOfBigDecmial.mul(lineString.getLength(), 6371000 * Math.PI / 180);
......
......@@ -24,7 +24,7 @@ import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import sun.misc.BASE64Encoder;
//import sun.misc.BASE64Encoder;
import java.io.IOException;
import java.net.MalformedURLException;
......@@ -43,7 +43,7 @@ public class ElasticSearchSinkUtil {
private static RestHighLevelClient client = null;
private static Gson gson = new Gson().newBuilder().create();
private static BASE64Encoder base64Encoder = new BASE64Encoder();
//private static BASE64Encoder base64Encoder = new BASE64Encoder();
private PropertiesHelper instance;
static {
......
#kafka settings kafka1:9092,kafka2:9092,kafka3:9092 192.168.11.102:9092
bootstrap.servers=10.102.1.182:9092
#bootstrap.servers=192.168.2.103:9092
#bootstrap.servers=10.102.1.182:9092
bootstrap.servers=192.168.2.103:9092
#consumer-settings
consumer.topic=JNMatchResultMiniData
......@@ -15,7 +14,6 @@ producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#elasticsearch.server=10.102.1.182:9200
elasticsearch.server=192.168.2.103:9200
elasticsearch.username=elastic
elasticsearch.password=Wanji300552
......
......@@ -125,82 +125,41 @@
<result column="rid_length" property="ridLength"/>
</resultMap>
<!-- 查询表t_base_lane_info所有信息 -->
<select id="findAllBaseLaneInfo" resultMap="BaseLaneInfoMap">
SELECT a.id,b.`level` rid_level,b.wkt rid_wkt
from t_base_lane_info a
JOIN t_base_rid_info b
on a.rid=b.id
<!-- 查询路口进口车道、出口车道信息 -->
<select id="findCrossLaneInfo" parameterType="Map" resultType="map">
SELECT t.cross_id,t.road_id,t.rid,t.type,t.dir,count(*) lane_num,t.road_wkt,
GROUP_CONCAT(t.cut_point order by t.lane_id SEPARATOR '@') cut_point,
GROUP_CONCAT(t.turn order by t.lane_id) turn_list
FROM
( <!-- 进口方向RID车道 -->
select t1.id lane_id,substring_index(t1.wkt,';',-3) cut_point,t1.rid,t2.cross_id,t2.wkt road_wkt,t1.type,t1.dir,t2.road_id,t1.turn
from t_base_lane_info t1
JOIN (<!-- 取end_cross_id -->
select id,end_cross_id cross_id,in_dir as dir, 2 as type,wkt,if(road_id='#',concat_ws('',id,'#'),road_id) road_id
from t_base_rid_info
where 1=1
<if test="crossId !=null and crossId != ''">
and end_cross_id=#{crossId}
</if>
) t2 on t1.rid=t2.id and t1.type=t2.type
UNION
<!-- 出口方向RID车道 -->
select t1.id lane_id,substring_index(t1.wkt,';',3) cut_point,t1.rid,t2.cross_id,t2.wkt road_wkt,t1.type,t1.dir,t2.road_id,t1.turn
from t_base_lane_info t1
JOIN (<!-- 取start_cross_id -->
select id,start_cross_id cross_id,in_dir as dir, 3 as type,wkt,if(road_id='#',concat_ws('',id,'#'),road_id) road_id
from t_base_rid_info
where 1=1
<if test="crossId !=null and crossId != ''">
and start_cross_id=#{crossId}
</if>
) t2 on t1.rid=t2.id and t1.type=t2.type
) t
where 1=1
<if test="type !=null and type != ''">
and t.type = #{type}
</if>
GROUP BY t.cross_id,t.road_id,t.rid,t.type,t.dir
</select>
<!-- 根据主键id查询表t_base_lane_info信息 -->
<select id="findBaseLaneInfoByid" resultMap="BaseLaneInfoMap">
SELECT
<include refid="Base_Column_List"/>
FROM t_base_lane_info
WHERE id=#{id}
</select>
<!-- 根据条件查询表t_base_lane_info信息 -->
<select id="findBaseLaneInfoByCondition" resultMap="BaseLaneInfoMap">
SELECT
<include refid="Base_Column_List"/>
FROM t_base_lane_info
WHERE 1=1
<include refid="BaseLaneInfoByCondition" />
</select>
<!-- 根据主键id删除表t_base_lane_info信息 -->
<delete id="deleteBaseLaneInfoByid">
DELETE FROM
t_base_lane_info
WHERE id=#{id}
</delete>
<!-- 根据主键id更新表t_base_lane_info信息 -->
<update id="updateBaseLaneInfoByid" parameterType="com.wanji.indicators.entity.BaseLaneInfo">
UPDATE t_base_lane_info
<set>
<include refid="BaseLaneInfoSetColumns"/>
</set>
WHERE
id=#{id}
</update>
<!-- 新增表t_base_lane_info信息 -->
<insert id="addBaseLaneInfo">
INSERT INTO t_base_lane_info (
id
,code
,sort
,type
,dir
,turn
,category
,cross_id
,rid
,segment_id
,length
,width
,wkt
,gmt_create
,gmt_modified
) VALUES (
#{id}
,#{code}
,#{sort}
,#{type}
,#{dir}
,#{turn}
,#{category}
,#{crossId}
,#{rid}
,#{segmentId}
,#{length}
,#{width}
,#{wkt}
,#{gmtCreate}
,#{gmtModified}
)
</insert>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment