Commit dc458b65 authored by zhoushiguang's avatar zhoushiguang

ADD拥堵区域算子

parent 4def243c
......@@ -185,7 +185,7 @@ public class Constant {
/**
* 常用分隔符 分号
*/
public static final String SEMICOLON = ";";
public static final String SEMICOLON = ";";
/**
* 常用分隔符 冒号
......
package com.wanji.indicators.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@EqualsAndHashCode(callSuper=false)
@TableName("t_congestion_area")
public class CongestionArea implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 路口id
*/
private String crossId;
/**
* 出口id
*/
private String rid;
/**
* 出口车道数
*/
private Integer laneNum;
/**
* 可以进入检测区域的进口道rid数组,逗号分隔
*/
private String ridInArr;
/**
* 出口检测面wkt
*/
private String wkt;
}
......@@ -8,6 +8,7 @@ import com.wanji.indicators.model.event.accident.AccidentShowModel;
import com.wanji.indicators.sink.AccidentEventFileSink;
import com.wanji.indicators.sink.EventFileSink;
import com.wanji.indicators.source.ConflictTurnSource;
import com.wanji.indicators.source.ConsulAreaSource;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
......@@ -81,6 +82,13 @@ public class AccidentEventMainNew implements Serializable {
.setParallelism(1)
.name("事故检测-解析数据");
//匹配拥堵检测区域
// SingleOutputStreamOperator<AccidentFrameModel> checkCongestionArea = accidentFrameModelFilter
// .connect(env.addSource(new ConsulAreaSource()).broadcast())
// .flatMap(new AccidentCheckCongestionAreaCoFlatMap())
// .setParallelism(1)
// .name("事故检测-匹配拥堵检测区域");
//加载冲突相位信息
SingleOutputStreamOperator<AccidentFrameModel> loadConflictSourceProcess = accidentFrameModelFilter
.connect(env.addSource(new ConflictTurnSource()).broadcast())
......
package com.wanji.indicators.event.cross.accident.funcnew;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.CongestionArea;
import com.wanji.indicators.model.event.accident.AccidentFrameModel;
import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.*;
public class AccidentCheckCongestionAreaCoFlatMap implements CoFlatMapFunction<AccidentFrameModel, HashMap<String, Map<String, CongestionArea>>, AccidentFrameModel> {
private HashMap<String, Map<String, CongestionArea>> hashMap = new HashMap();
@Override
public void flatMap1(AccidentFrameModel accidentFrameModel, Collector<AccidentFrameModel> collector) throws Exception {
List<AccidentFrameModel.CarTrack> trackList = accidentFrameModel.getTrackList();
ConfigModel configModel = accidentFrameModel.getConfigModel();
List<AccidentFrameModel.CarTrack> list = new ArrayList<>();
Set<Integer> set = new HashSet<>();
Map<String, CongestionArea> congestionAreaDoMap = hashMap.get(accidentFrameModel.getCrossId());
if (congestionAreaDoMap != null) {
Map<String, Integer> map = new HashMap<>();
for (CongestionArea congestionAreaDo : congestionAreaDoMap.values()) {
//存入各出口rid对应的车道数
int laneNum = congestionAreaDo.getLaneNum();
if (laneNum > 0) {
//存入各出口rid对应的车道数
map.put(congestionAreaDo.getRid(), laneNum);
}
}
accidentFrameModel.setCongestionInfo(map);
for (AccidentFrameModel.CarTrack carTrack : trackList) {
String entryLane = carTrack.getRoadnet().getLaneId();
if (carTrack.getRoadnet().getInCrossFlag() == Constant.CROSSING && carTrack.getSpeed() < configModel.getAbnormalParkCongestSpeed() && StringUtils.isNotBlank(entryLane)) {
for (CongestionArea congestionAreaDo : congestionAreaDoMap.values()) {
String ridIn = entryLane.substring(0, 23);
//判断是否属于该方向验证的入口rid数据
if (congestionAreaDo.getRidInArr().contains(ridIn)) {
PtInPolyUtil.Point point = new PtInPolyUtil.Point(carTrack.getLongitude(), carTrack.getLatitude());
boolean isIn = PtInPolyUtil.isPtInPoly(point, congestionAreaDo.getPoints());
if (isIn) {
if (set.add(carTrack.getId())) {
carTrack.setCongestionOutRid(congestionAreaDo.getRid());
list.add(carTrack);
}
}
}
}
}
}
for (AccidentFrameModel.CarTrack carTrack : trackList) {
if (!set.contains(carTrack.getId())) {
list.add(carTrack);
}
}
accidentFrameModel.setTrackList(list);
}
collector.collect(accidentFrameModel);
}
@Override
public void flatMap2(HashMap<String, Map<String, CongestionArea>> stringCongestionAreaDoHashMap, Collector<AccidentFrameModel> collector) throws Exception {
this.hashMap = stringCongestionAreaDoHashMap;
}
}
......@@ -91,7 +91,7 @@ public class AccidentEncapsulationFlatMap implements FlatMapFunction<AccidentFra
AccidentFrameModel.CarTrack carTrack1,
EventAbnormalEnum eventAbnormalEnum) {
AccidentShowModel m = new AccidentShowModel();
m.setEventType(eventAbnormalEnum.getType());
m.setEventType(EventAbnormalEnum.ACCIDENT_YS.getType());
m.setTypeCode(eventAbnormalEnum.getType());
m.setEventClass(eventAbnormalEnum.getDesc());
m.setCrossId(carTrack0.getRoadnet().getCrossId());
......@@ -242,7 +242,7 @@ public class AccidentEncapsulationFlatMap implements FlatMapFunction<AccidentFra
AccidentFrameModel.CarTrack model = accidentFrameModel.getTrackList().get(0);
AccidentShowModel m = new AccidentShowModel();
m.setEventType(EventAbnormalEnum.ACCIDENT_ONE_YS.getType());
m.setEventType(EventAbnormalEnum.ACCIDENT_YS.getType());
m.setTypeCode(EventAbnormalEnum.ACCIDENT_ONE_YS.getType());
m.setEventClass(EventAbnormalEnum.ACCIDENT_ONE_YS.getDesc());
m.setEndTime(model.getGlobalTimeStamp());
......
package com.wanji.indicators.mapper;
import com.wanji.indicators.entity.CongestionArea;
import java.util.List;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface CongestionAreaMapper extends BaseMapper<CongestionArea>{
/**
* 查询表t_congestion_area所有信息
*/
List<CongestionArea> findAllCongestionArea();
/**
* 根据条件查询表t_congestion_area信息
* @param congestionArea
*/
List<CongestionArea> findCongestionAreaByCondition(CongestionArea congestionArea);
}
package com.wanji.indicators.model;
import com.wanji.indicators.util.PtInPolyUtil;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
@Data
public class CongestionArea implements Serializable {
private String crossId;
private String rid;
private int type;
private int typeCode;
private int laneNum;
private String ridInArr;
private String wkt;
private List<PtInPolyUtil.Point> points;
}
......@@ -6,11 +6,7 @@ import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* 2022/1/19 15:53
*
* @auther ke.han
*/
@Data
public class CongestionAreaDo implements Serializable {
......
package com.wanji.indicators.service;
import com.wanji.indicators.entity.CongestionArea;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.core.metadata.IPage;
import java.util.List;
/**
* <p>
* 服务类
* </p>
*
* @author fengyi
* @since 2023-10-25
*/
public interface CongestionAreaService extends IService<CongestionArea> {
/**
* 详情
* @param id
* @return
*/
CongestionArea info(Long id);
/**
* 新增
* @param param 根据需要进行传值
* @return
*/
void add(CongestionArea param);
/**
* 修改
* @param param 根据需要进行传值
* @return
*/
void modify(CongestionArea param);
/**
* 删除(单个条目)
* @param id
* @return
*/
void remove(Long id);
/**
* 删除(多个条目)
* @param ids
* @return
*/
void removes(List<Long> ids);
}
package com.wanji.indicators.service.impl;
import com.wanji.indicators.entity.CongestionArea;
import com.wanji.indicators.mapper.CongestionAreaMapper;
import com.wanji.indicators.service.CongestionAreaService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
/**
* <p>
* 服务实现类
* </p>
*
* @author fengyi
* @since 2023-10-25
*/
@Service
public class CongestionAreaServiceImpl extends ServiceImpl<CongestionAreaMapper, CongestionArea> implements CongestionAreaService {
@Resource
private CongestionAreaMapper congestionAreaMapper;
/**
* 详情
* @param id
* @return
*/
@Override
public CongestionArea info(Long id) {
return getById(id);
}
/**
* 新增
* @param param 根据需要进行传值
* @return
*/
@Override
public void add(CongestionArea param) {
save(param);
}
/**
* 修改
* @param param 根据需要进行传值
* @return
*/
@Override
public void modify(CongestionArea param) {
updateById(param);
}
/**
* 删除(单个条目)
* @param id
* @return
*/
@Override
public void remove(Long id) {
removeById(id);
}
/**
* 删除(多个条目)
* @param ids
* @return
*/
@Override
public void removes(List<Long> ids) {
removeByIds(ids);
}
}
package com.wanji.indicators.source;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CongestionArea;
import com.wanji.indicators.util.PropertiesHelper;
import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.*;
/**
* 查询 溢出 检测区域 数据
*/
public class CongestionSource extends RichParallelSourceFunction<HashMap<String, Map<String, CongestionArea>>> {
private static final Logger log = LoggerFactory.getLogger(CongestionSource.class);
private volatile boolean running = true;
Properties properties = null;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// ApplicationContext beanConf = new ClassPathXmlApplicationContext("spring-container.xml");
// congestionAreaMapper = beanConf.getBean(CongestionAreaMapper.class);
PropertiesHelper instance = PropertiesHelper.getInstance();
properties = instance.getProperties();
}
@Override
public void run(SourceContext<HashMap<String, Map<String, CongestionArea>>> sourceContext) throws Exception {
HashMap<String, Map<String, CongestionArea>> outMap = new HashMap<>();
if (running) {
try {
String sql = "SELECT cross_id AS crossId,rid,lane_num AS laneNum,rid_in_arr AS ridInArr,wkt FROM t_congestion_area ";
List<CongestionArea> dataList = null;// new JdbcUtil().excuteQueryToList(CongestionAreaDo.class, sql);
if (dataList != null && !dataList.isEmpty()) {
outMap.clear();
for (CongestionArea data : dataList) {
Map<String, CongestionArea> congestionAreaDoMap = outMap.get(data.getCrossId());
if (congestionAreaDoMap == null) {
congestionAreaDoMap = new HashMap<>();
}
String wkt = data.getWkt();
String[] wktPointArr = wkt.split(Constant.SEMICOLON);
List<PtInPolyUtil.Point> points = new ArrayList<>();
for (String p : wktPointArr) {
String[] xyArr = p.split(Constant.COMMA);
PtInPolyUtil.Point point = new PtInPolyUtil.Point(Double.valueOf(xyArr[0]), Double.valueOf(xyArr[1]));
points.add(point);
}
data.setPoints(points);
congestionAreaDoMap.put(data.getRid(), data);
outMap.put(data.getCrossId(), congestionAreaDoMap);
}
}
if (outMap.size() > 0) {
sourceContext.collect(outMap);
}
} catch (Exception e) {
log.error("查询溢出检测区域: " + e.getMessage() , e);
}
// Thread.sleep(1000*30);
}
}
@Override
public void cancel() {
running = false;
}
}
......@@ -10,6 +10,7 @@ import com.wanji.indicators.model.event.ConfigModelDo;
import com.wanji.indicators.model.event.congestion.CongestionAreaDo;
import com.wanji.indicators.util.PropertiesHelper;
import com.wanji.indicators.util.PtInPolyUtil;
import javafx.event.EventType;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
......@@ -45,62 +46,43 @@ public class ConsulAreaSource extends RichParallelSourceFunction<HashMap<String,
@Override
public void open(Configuration parameters) throws Exception {
try {
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
String consul_host_properties = properties.getProperty("consul.host");
String consul_port_properties = properties.getProperty("consul.port");
String consul_host_env = System.getenv("CONSUL_HOST");
String consul_port_env = System.getenv("CONSUL_PORT");
if (StringUtils.isNotBlank(consul_host_env)) {
this.host = consul_host_env;
} else {
this.host = consul_host_properties;
}
if (StringUtils.isNotBlank(consul_port_env)) {
this.port = Integer.parseInt(consul_port_env);
} else {
this.port = Integer.parseInt(consul_port_properties);
}
log.info("areaSource consul 服务链接正常: " + host + ":" + port + " ");
} catch (Exception e) {
log.error("consul 服务异常 链接服务异常: " + e.getMessage());
}
}
@Override
public void run(SourceContext<HashMap<String, Map<String, CongestionAreaDo>>> sourceContext) throws Exception {
//key:crossId value:<rid,CongestionAreaDo>
HashMap<String, Map<String, CongestionAreaDo>> outMap = new HashMap<>();
while (running) {
try {
this.client = Consul.builder().withHostAndPort(HostAndPort.fromParts(host, port)).build();
//通过客户端查询kv值 获取配置
KeyValueClient keyValueClient = client.keyValueClient();
List<String> keys = keyValueClient.getKeys(Constant.CONSUL_EVENT_SETTINGS);
for (String key : keys) {
if (StringUtils.isNotBlank(key) && !key.contains(Constant.CONSUL_EVENT_TEMPLATE) && key.contains(Constant.CONSUL_EVENT_AREA_SET)) {
try {
Optional<String> value = keyValueClient.getValue(key).get().getValueAsString();
if (value.isPresent()) {
List<CongestionAreaDo> dataList = JSONObject.parseArray(value.get(), CongestionAreaDo.class);
for (CongestionAreaDo data : dataList) {
//过滤路口溢出监测区域
}
}
} catch (Exception e) {
log.error("溢出检测区域 ==>> consul 查询key异常", e.getMessage(), e);
//过滤路口溢出监测区域
String listJson = "";
List<CongestionAreaDo> dataList = JSONObject.parseArray(listJson, CongestionAreaDo.class);
for (CongestionAreaDo data : dataList) {
Map<String, CongestionAreaDo> congestionAreaDoMap = outMap.get(data.getCrossId());
if (congestionAreaDoMap == null) {
congestionAreaDoMap = new HashMap<>();
}
String wkt = data.getWkt();
if (StringUtils.isNotBlank(wkt)) {
wkt = wkt.replace("POLYGON((", "").replace("))", "").replace(Constant.COMMA, Constant.SEMICOLON).replace(Constant.SPACE, Constant.COMMA);
String[] wktPointArr = wkt.split(Constant.SEMICOLON);
List<PtInPolyUtil.Point> points = new ArrayList<>();
for (String p : wktPointArr) {
String[] xyArr = p.split(Constant.COMMA);
PtInPolyUtil.Point point = new PtInPolyUtil.Point(Double.valueOf(xyArr[0]), Double.valueOf(xyArr[1]));
points.add(point);
}
data.setPoints(points);
}
congestionAreaDoMap.put(data.getRid(), data);
outMap.put(data.getCrossId(), congestionAreaDoMap);
}
if (outMap.size() > 0) {
sourceContext.collect(outMap);
}
} catch (Exception e) {
log.error("查询溢出检测区域: " + e.getMessage() , e);
} finally {
if (this.client != null) {
this.client.destroy();
}
}
TimeUnit.SECONDS.sleep(60);
}
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.wanji.indicators.mapper.CongestionAreaMapper">
<!-- 通用设置 -->
<!-- 通用查询列 -->
<sql id="Base_Column_List">
cross_id, rid, lane_num, rid_in_arr, wkt
</sql>
<!-- 通用条件列 -->
<sql id="CongestionAreaByCondition">
<if test="crossId!=null and crossId!=''">
AND cross_id = #{crossId}
</if>
<if test="rid!=null and rid!=''">
AND rid = #{rid}
</if>
<if test="laneNum!=null and laneNum!=''">
AND lane_num = #{laneNum}
</if>
<if test="ridInArr!=null and ridInArr!=''">
AND rid_in_arr = #{ridInArr}
</if>
<if test="wkt!=null and wkt!=''">
AND wkt = #{wkt}
</if>
</sql>
<!-- 通用设置列 -->
<sql id="CongestionAreaSetColumns">
<if test="crossId!=null and crossId!=''">
cross_id = #{crossId},
</if>
<if test="rid!=null and rid!=''">
rid = #{rid},
</if>
<if test="laneNum!=null and laneNum!=''">
lane_num = #{laneNum},
</if>
<if test="ridInArr!=null and ridInArr!=''">
rid_in_arr = #{ridInArr},
</if>
<if test="wkt!=null and wkt!=''">
wkt = #{wkt},
</if>
</sql>
<!-- 通用查询映射结果 -->
<resultMap id="CongestionAreaMap" type="com.wanji.indicators.entity.CongestionArea">
<result column="cross_id" property="crossId"/>
<result column="rid" property="rid"/>
<result column="lane_num" property="laneNum"/>
<result column="rid_in_arr" property="ridInArr"/>
<result column="wkt" property="wkt"/>
</resultMap>
<!-- 查询表t_congestion_area所有信息 -->
<select id="findAllCongestionArea" resultMap="CongestionAreaMap">
SELECT
<include refid="Base_Column_List"/>
FROM t_congestion_area
</select>
<!-- 根据条件查询表t_congestion_area信息 -->
<select id="findCongestionAreaByCondition" resultMap="CongestionAreaMap">
SELECT
<include refid="Base_Column_List"/>
FROM t_congestion_area
WHERE 1=1
<include refid="CongestionAreaByCondition" />
</select>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment