Commit 819d87fa authored by zhoushiguang's avatar zhoushiguang

冲突点计算任务

parent afbcf6c6
......@@ -2,11 +2,11 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--<parent>
<artifactId>wj-platform</artifactId>
<groupId>com.wanji</groupId>
<version>1.0</version>
</parent>-->
<!-- <parent>-->
<!-- <artifactId>wj-datacenter-platform</artifactId>-->
<!-- <groupId>net.wanji</groupId>-->
<!-- <version>1.0-SNAPSHOT</version>-->
<!-- </parent>-->
<modelVersion>4.0.0</modelVersion>
<groupId>com.wanji</groupId>
......@@ -21,9 +21,9 @@
<elasticsearch.version>7.10.1</elasticsearch.version>
<es.client.version>7.10.1</es.client.version>
<!-- jar包名尾部标识 -->
<jar.tail>all</jar.tail>
<!--<jar.tail>all</jar.tail>-->
<!--<jar.tail>batch-track</jar.tail>-->
<!--<jar.tail>export</jar.tail>-->
<jar.tail>export</jar.tail>
<!--<jar.tail>export</jar.tail>-->
<!--<jar.tail>monitor</jar.tail>-->
</properties>
......@@ -59,32 +59,32 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>4.1.4.RELEASE</version>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.1.4.RELEASE</version>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>4.1.4.RELEASE</version>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.1.4.RELEASE</version>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>4.1.4.RELEASE</version>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>4.1.4.RELEASE</version>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
......@@ -214,7 +214,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<artifactId>slf4j-reload4j</artifactId>
<version>1.7.36</version>
<scope>runtime</scope>
</dependency>
......
package com.wanji.indicators.base.bean;
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public class FlowData {
private String uuid;
private String subTestItem;
private String billNumber;
private String barcode;
private String flowName;
private String flowStatus;
public String getFlowStatus() {
return flowStatus;
}
public void setFlowStatus(String flowStatus) {
this.flowStatus = flowStatus;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public String getSubTestItem() {
return subTestItem;
}
public void setSubTestItem(String subTestItem) {
this.subTestItem = subTestItem;
}
public String getBillNumber() {
return billNumber;
}
public void setBillNumber(String billNumber) {
this.billNumber = billNumber;
}
public String getBarcode() {
return barcode;
}
public void setBarcode(String barcode) {
this.barcode = barcode;
}
public String getFlowName() {
return flowName;
}
public void setFlowName(String flowName) {
this.flowName = flowName;
}
@Override
public String toString() {
return "FlowData{" +
"uuid='" + uuid + '\'' +
", subTestItem='" + subTestItem + '\'' +
", billNumber='" + billNumber + '\'' +
", barcode='" + barcode + '\'' +
", flowName='" + flowName + '\'' +
", flowStatus='" + flowStatus + '\'' +
'}';
}
}
package com.wanji.indicators.base.bean;
import java.sql.Timestamp;
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public class MainData {
private String subTestItem;
private String billNumber;
private String barcode;
private String flowName;
private Timestamp testTime;
private int seqId;
private int stepNumber;
private String stepType;
public String getSubTestItem() {
return subTestItem;
}
public void setSubTestItem(String subTestItem) {
this.subTestItem = subTestItem;
}
public String getBillNumber() {
return billNumber;
}
public void setBillNumber(String billNumber) {
this.billNumber = billNumber;
}
public String getBarcode() {
return barcode;
}
public void setBarcode(String barcode) {
this.barcode = barcode;
}
public String getFlowName() {
return flowName;
}
public void setFlowName(String flowName) {
this.flowName = flowName;
}
public Timestamp getTestTime() {
return testTime;
}
public void setTestTime(Timestamp testTime) {
this.testTime = testTime;
}
public int getSeqId() {
return seqId;
}
public void setSeqId(int seqId) {
this.seqId = seqId;
}
public int getStepNumber() {
return stepNumber;
}
public void setStepNumber(int stepNumber) {
this.stepNumber = stepNumber;
}
public String getStepType() {
return stepType;
}
public void setStepType(String stepType) {
this.stepType = stepType;
}
}
package com.wanji.indicators.base.bean;
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public class ResultBean {
private String flowId;
private int cycleNumber;
private long total;
private int stepNumber;
private String stepType;
public String getFlowId() {
return flowId;
}
public void setFlowId(String flowId) {
this.flowId = flowId;
}
public int getCycleNumber() {
return cycleNumber;
}
public void setCycleNumber(int cycleNumber) {
this.cycleNumber = cycleNumber;
}
public long getTotal() {
return total;
}
public void setTotal(long total) {
this.total = total;
}
public int getStepNumber() {
return stepNumber;
}
public void setStepNumber(int stepNumber) {
this.stepNumber = stepNumber;
}
public String getStepType() {
return stepType;
}
public void setStepType(String stepType) {
this.stepType = stepType;
}
@Override
public String toString() {
return "ResultBean{" +
"flowId='" + flowId + '\'' +
", cycleNumber=" + cycleNumber +
", total=" + total +
", stepNumber=" + stepNumber +
", stepType='" + stepType + '\'' +
'}';
}
}
package com.wanji.indicators.base.bean;
import java.sql.Timestamp;
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public class SourceData {
/**
* 是否触发执行的标志
*/
private String flag;
private String uuid;
/**
* sqlClient也用的这个字段,RowNumberBean 中进行比较,字段类型也是它
*/
private Timestamp testTime;
private int seqId;
private int stepNumber;
private String stepType;
private int cycleNumber;
private int batchNumber;
private String flowId;
private String threadName;
/**
* 用于eventtime
*/
private Timestamp receiveTime;
public SourceData()
{}
public SourceData(String flag, String uuid, Timestamp testTime, int seqId, int stepNumber, String stepType, int cycleNumber,
int batchNumber, String flowId, String threadName, Timestamp receiveTime) {
this.flag = flag;
this.uuid = uuid;
this.testTime = testTime;
this.seqId = seqId;
this.stepNumber = stepNumber;
this.stepType = stepType;
this.cycleNumber = cycleNumber;
this.batchNumber = batchNumber;
this.flowId = flowId;
this.threadName = threadName;
this.receiveTime = receiveTime;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public Timestamp getTestTime() {
return testTime;
}
public void setTestTime(Timestamp testTime) {
this.testTime = testTime;
}
public int getSeqId() {
return seqId;
}
public void setSeqId(int seqId) {
this.seqId = seqId;
}
public int getStepNumber() {
return stepNumber;
}
public void setStepNumber(int stepNumber) {
this.stepNumber = stepNumber;
}
public String getStepType() {
return stepType;
}
public void setStepType(String stepType) {
this.stepType = stepType;
}
public int getCycleNumber() {
return cycleNumber;
}
public void setCycleNumber(int cycleNumber) {
this.cycleNumber = cycleNumber;
}
public int getBatchNumber() {
return batchNumber;
}
public void setBatchNumber(int batchNumber) {
batchNumber = batchNumber;
}
public String getFlowId() {
return flowId;
}
public void setFlowId(String flowId) {
this.flowId = flowId;
}
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
public Timestamp getReceiveTime() {
return receiveTime;
}
public void setReceiveTime(Timestamp receiveTime) {
this.receiveTime = receiveTime;
}
@Override
public String toString() {
return "SourceData{" +
"flowId='" + flowId + '\'' +
", cycleNumber=" + cycleNumber +
", flag='" + flag + '\'' +
", seqId=" + seqId +
", stepNumber='" + stepNumber + '\'' +
", stepType='" + stepType + '\'' +
'}';
}
}
package com.wanji.indicators.base.env;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.io.Resources;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.Resource;
import java.net.URL;
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public class BeanFactory extends DefaultListableBeanFactory {
public static Logger LOG = LoggerFactory.getLogger(BeanFactory.class);
public static final String SPRING_BEAN_FACTORY_XML = "springframework.bean.factory.xml";
public static final String SPRING_BEAN_FACTORY_NAME = "springframework.bean.factory.name";
public static final String DUBBO_SPRING_BEAN_FACTORY_NAME = "dubbo.springframework.bean.factory.name";
private static BeanFactory instance;
public static BeanFactory getInstance() {
return instance;
}
public static void setInstance(BeanFactory instance) {
BeanFactory.instance = instance;
}
private transient final XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(this);
private String xml;
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public BeanFactory(String location) {
super();
try {
Resource resource = reader.getResourceLoader().getResource(location);
URL url = resource.getURL();
this.xml = Resources.toString(url, Charsets.UTF_8);
this.reader.setValidating(false);
this.reader.loadBeanDefinitions(resource);
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public BeanFactory(Configuration stormConf) {
this.xml = stormConf.getString(SPRING_BEAN_FACTORY_XML,null);
System.out.println("----->>>>> xml content is :"+xml);
LOG.info("----->>>>> xml content is :{}"+xml);
this.reader.setValidating(false);
this.reader.loadBeanDefinitions(new ByteArrayResource(xml.getBytes(Charsets.UTF_8)));
}
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public static ApplicationContext getBeanFactory(Configuration globalJobParameters) {
String xmlName = globalJobParameters.getString(BeanFactory.SPRING_BEAN_FACTORY_NAME, null);
System.out.println("xmlName is :"+xmlName);
ApplicationContext beanConf = new ClassPathXmlApplicationContext(xmlName);
return beanConf;
}
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public static ApplicationContext getDubboBeanFactory(Configuration globalJobParameters) {
String xmlName = globalJobParameters.getString(BeanFactory.DUBBO_SPRING_BEAN_FACTORY_NAME, null);
System.out.println("xmlName is :"+xmlName);
ApplicationContext beanConf = new ClassPathXmlApplicationContext(xmlName);
return beanConf;
}
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public String getXml() {
return xml;
}
}
package com.wanji.indicators.base.jdbc;//package com.intsmaze.flink.base.jdbc;
//
//import org.springframework.jdbc.core.JdbcTemplate;
//
//import java.util.List;
//import java.util.Map;
//
//
///**
// * github地址: https://github.com/intsmaze
// * 博客地址:https://www.cnblogs.com/intsmaze/
// * 出版书籍《深入理解Flink核心设计与实践原理》
// *
// * @auther: intsmaze(刘洋)
// * @date: 2020/10/15 18:33
// */
//public class VariableJdbc {
//
// private JdbcTemplate jdbcTemplate;
//
// public JdbcTemplate getJdbcTemplate() {
// return jdbcTemplate;
// }
//
// public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
// this.jdbcTemplate = jdbcTemplate;
// }
//
// /**
// * github地址: https://github.com/intsmaze
// * 博客地址:https://www.cnblogs.com/intsmaze/
// * 出版书籍《深入理解Flink核心设计与实践原理》
// *
// * @auther: intsmaze(刘洋)
// * @date: 2020/10/15 18:33
// */
// public List<Map<String, Object>> findVariable() {
// String sql = "select name,value from variable ";
// List<Map<String, Object>> variableList = jdbcTemplate.queryForList(sql);
// return variableList;
// }
//
//}
package com.wanji.indicators.base.thread;//package com.intsmaze.flink.base.thread;
//
//import com.google.common.collect.Maps;
//import com.intsmaze.flink.base.jdbc.VariableJdbc;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//
//import java.util.List;
//import java.util.Map;
//import java.util.concurrent.ConcurrentMap;
//import java.util.concurrent.TimeUnit;
//
///**
// * github地址: https://github.com/intsmaze
// * 博客地址:https://www.cnblogs.com/intsmaze/
// * 出版书籍《深入理解Flink核心设计与实践原理》
// *
// * @auther: intsmaze(刘洋)
// * @date: 2020/10/15 18:33
// */
//public class SwitchThread {
//
// public static String isSleep = "isSleep";
//
// private final Logger logger = LoggerFactory.getLogger(this.getClass());
//
// public static ConcurrentMap<String, Object> activityConfigs = Maps.newConcurrentMap();
//
// private static volatile boolean inited = false;
//
// private VariableJdbc variableJdbc;
//
// private static volatile Thread updateThread;
//
// /**
// * github地址: https://github.com/intsmaze
// * 博客地址:https://www.cnblogs.com/intsmaze/
// * 出版书籍《深入理解Flink核心设计与实践原理》
// *
// * @auther: intsmaze(刘洋)
// * @date: 2020/10/15 18:33
// */
// public void init() {
// synchronized (SwitchThread.class) {
// if (inited) {
// return;
// }
// logger.info("初始化配置数据......");
// initVariableDictTry();
// inited = true;
// start();
// }
// }
//
// /**
// * github地址: https://github.com/intsmaze
// * 博客地址:https://www.cnblogs.com/intsmaze/
// * 出版书籍《深入理解Flink核心设计与实践原理》
// *
// * @auther: intsmaze(刘洋)
// * @date: 2020/10/15 18:33
// */
// private void initVariableDictTry() {
// List<Map<String, Object>> variableList = variableJdbc.findVariable();
// for (int i = 0; i < variableList.size(); i++) {
// Map<String, Object> stringObjectMap = variableList.get(i);
// String name = (String) stringObjectMap.get("name");
// String value = (String) stringObjectMap.get("value");
// activityConfigs.put(name, value);
// }
// }
//
// /**
// * github地址: https://github.com/intsmaze
// * 博客地址:https://www.cnblogs.com/intsmaze/
// * 出版书籍《深入理解Flink核心设计与实践原理》
// *
// * @auther: intsmaze(刘洋)
// * @date: 2020/10/15 18:33
// */
// public void start() {
// synchronized (SwitchThread.class) {
// if (updateThread != null) {
// return;
// }
// updateThread = new UpdateThread();
// updateThread.start();
// }
// }
//
// /**
// * github地址: https://github.com/intsmaze
// * 博客地址:https://www.cnblogs.com/intsmaze/
// * 出版书籍《深入理解Flink核心设计与实践原理》
// *
// * @auther: intsmaze(刘洋)
// * @date: 2020/10/15 18:33
// */
// private class UpdateThread extends Thread {
// /**
// * github地址: https://github.com/intsmaze
// * 博客地址:https://www.cnblogs.com/intsmaze/
// * 出版书籍《深入理解Flink核心设计与实践原理》
// *
// * @auther: intsmaze(刘洋)
// * @date: 2020/10/15 18:33
// */
// @Override
// public void run() {
// while (true) {
// try {
// TimeUnit.MINUTES.sleep(1);
// } catch (InterruptedException e1) {
// }
// logger.info("更新配置数据......");
// try {
// initVariableDictTry();
// } catch (Exception e) {
// logger.error("", e);
// }
// }
// }
// }
//
// public VariableJdbc getVariableJdbc() {
// return variableJdbc;
// }
//
// public void setVariableJdbc(VariableJdbc variableJdbc) {
// this.variableJdbc = variableJdbc;
// }
//}
package com.wanji.indicators.base.util;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public class PropertiesUtils {
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public static Properties getProperties(String path) throws IOException {
InputStream ips = PropertiesUtils.class
.getClassLoader()
.getResourceAsStream(path);
Properties props = new Properties();
props.load(ips);
ips.close();
return props;
}
}
......@@ -12,7 +12,6 @@ import com.baomidou.mybatisplus.extension.activerecord.Model;
@Data
@EqualsAndHashCode(callSuper=false)
@TableName("t_base_rid_info")
public class BaseRidInfo extends Model<BaseRidInfo> {
private static final long serialVersionUID = 1L;
......
package com.wanji.indicators.task.conflictpoint;
package com.wanji.indicators.event.cross.conflictpoint;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.BaseEventResultModel;
import com.wanji.indicators.model.ConflictPointModelBase;
import com.wanji.indicators.model.EventCross;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.source.WaitingAreaSource;
import com.wanji.indicators.task.conflictpoint.func.*;
import com.wanji.indicators.event.cross.conflictpoint.func.*;
import com.wanji.indicators.model.*;
import com.wanji.indicators.model.event.confilct.EventCross;
import com.wanji.indicators.sink.EventFileSink;
import com.wanji.indicators.source.LaneInfoSource;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
......@@ -48,16 +47,22 @@ public class ConflictEventMain implements Serializable {
this.isDebug = isDebug;
}
public static ConflictEventMain init(StreamExecutionEnvironment env, String path,boolean isDebug) {
public static ConflictEventMain init(StreamExecutionEnvironment env, String path, boolean isDebug) {
return new ConflictEventMain(env, path, isDebug);
}
public void run(DataStream<FrameModel> streamSource) {
public void run(DataStream<CrossFrameModel> streamSource) {
SingleOutputStreamOperator<CrossFrameModel> joinLaneStream = streamSource
.connect(env.addSource(new LaneInfoSource()).broadcast())
.flatMap(new ConflictLaneInfoCoFlatMap())
.setParallelism(1)
.name("冲突点-解析数据");
/**
* 转换数据为实体类 并筛选需要计算的数据(路口面 上的数据)
*/
SingleOutputStreamOperator<FrameModel> eventStream = streamSource
SingleOutputStreamOperator<CrossFrameModel> eventStream = joinLaneStream
.flatMap(new ConflictFrameFlatMap())
.setParallelism(1)
.name("冲突点-解析数据");
......@@ -82,7 +87,7 @@ public class ConflictEventMain implements Serializable {
/**
* 按照 crossId + time 分组
*/
SingleOutputStreamOperator<EventCross> processWindowStopAndRuning = streamSource
SingleOutputStreamOperator<EventCross> processWindowStopAndRuning = eventStream
.flatMap(new ConflictCheckConflictFlatMap())
.setParallelism(1)
.name("冲突点-计算是否发生冲突");
......@@ -117,7 +122,7 @@ public class ConflictEventMain implements Serializable {
}
})
.setParallelism(1);
//stringSingleOutputStream.addSink(new EventFileSink(this.path));
stringSingleOutputStream.addSink(new EventFileSink(this.path));
} else {
/**
* 安全专题数据产出
......
package com.wanji.indicators.task.conflictpoint.func;
package com.wanji.indicators.event.cross.conflictpoint.func;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.EventCross;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.constant.TurnEnum;
import com.wanji.indicators.model.*;
import com.wanji.indicators.model.event.confilct.EventCross;
import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel, EventCross> {
public class ConflictCheckConflictFlatMap implements FlatMapFunction<CrossFrameModel, EventCross> {
@Override
public void flatMap(FrameModel frameModel, Collector<EventCross> collector) throws Exception {
public void flatMap(CrossFrameModel frameModel, Collector<EventCross> collector) throws Exception {
ConfigModel configModel = new ConfigModel();
//判断该车道内的是否同时存在行驶车辆和停驶车辆 如果同时存在则 set.size = 2 否则 set.size = 1
Set<Integer> keyStop = new HashSet<>();
......@@ -26,11 +24,11 @@ public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel,
//存放运行的车辆
List<EventCross> listRun = new ArrayList<>();
List<CarTrackModel> trackList = frameModel.getE1FrameParticipant();
List<CarTrackModel> trackList = frameModel.getTrackList();
for (CarTrackModel carTrack : trackList) {
double speed = carTrack.getSpeed();
int trackID = carTrack.getId();
EventCross eventCross = toEventCross(carTrack);
EventCross eventCross = toEventCross(carTrack,frameModel.getGlobalTimeStamp());
if (speed <= 0.0) {
if (keyStop.add(trackID)) {
listStop.add(eventCross);
......@@ -45,6 +43,7 @@ public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel,
for (EventCross a : listStop) {
int trackIDA = a.getTrackID();
String ridA = a.getRid();
//停止车辆的运行角度
double driveAngleA = a.getDriveAngle();
double lngA = a.getLng();
double latA = a.getLat();
......@@ -52,8 +51,8 @@ public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel,
for (CarTrackModel carTrack : trackList) {
if (a.getTrackID() != carTrack.getId() &&
a.getLaneFunction().contains("Constant.STRAIGHT_LINE_CODE") &&
a.getLaneFunction().contains("Constant.LEFT_TURN_CODE") &&
a.getLaneFunction().contains(Constant.STRAIGHT_LINE_CODE) &&
a.getLaneFunction().contains(Constant.LEFT_TURN_CODE) &&
StringUtils.isNotBlank(a.getRid()) &&
a.getRid().equals(carTrack.getRoadnet().getRid())) {
//判定停止车辆前是否有同行车辆
......@@ -69,6 +68,7 @@ public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel,
}
}
}
//从直左混合车道驶出的停车车辆
if (isContinue) {
continue;
}
......@@ -77,14 +77,14 @@ public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel,
String ridB = b.getRid();
boolean isRun = true;
//两方向车道功能都是直行的且行车方向相反则排除
if ("Constant.STRAIGHT_LINE_CODE".equals(a.getLaneFunction()) && "Constant.STRAIGHT_LINE_CODE".equals(b.getLaneFunction())) {
if (Constant.STRAIGHT_LINE_CODE.equals(a.getLaneFunction()) && Constant.STRAIGHT_LINE_CODE.equals(b.getLaneFunction())) {
double angleX = Math.abs(a.getDriveAngle() - b.getDriveAngle());
if (angleX > 160.0 && angleX < 200.0) {
isRun = false;
}
}
//两方向车道功能都是左转的且行车方向相反则排除
if ("Constant.LEFT_TURN_CODE".equals(a.getLaneFunction()) && "Constant.LEFT_TURN_CODE".equals(b.getLaneFunction())) {
if (Constant.LEFT_TURN_CODE.equals(a.getLaneFunction()) && Constant.LEFT_TURN_CODE.equals(b.getLaneFunction())) {
double angleX = Math.abs(a.getDriveAngle() - b.getDriveAngle());
if (angleX > 160.0 && angleX < 200.0) {
isRun = false;
......@@ -99,6 +99,7 @@ public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel,
double distance = PtInPolyUtil.getDistance(head[0], head[1], lngB, latB);//车头位置与行驶车辆的距离
double driveAngleB = b.getDriveAngle();
double angleX = Math.abs(driveAngleA - driveAngleB);
//默认条件:两车距离小于5米,两车视野角度在(60,120)范围内
if (distance < configModel.getConflictTwoCarMaxDist()
&& angleX > configModel.getConflictTwoCarMinAngle()
&& angleX < configModel.getConflictTwoCarMaxAngle()) {//停车车辆与行驶车辆两车角度差计算范围
......@@ -121,8 +122,10 @@ public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel,
if (isIntersectA != 1 && isIntersectB != 1) {
double angle = PtInPolyUtil.getAngle(head[0], head[1], lngB, latB);
double abs = Math.abs(angle - driveAngleA);
if (abs < configModel.getConflictResultAngleDif()) {//停止车辆车头位置与行驶车辆角度差
a.setTrackIdNear(a.getTrackID() + "Constant.COMMA" + b.getTrackID());
if (abs < configModel.getConflictResultAngleDif()) {
//停止车辆车头位置与行驶车辆角度差
a.setTrackIdNear(a.getTrackID() + Constant.COMMA + b.getTrackID());
a.setNearTrackObj(b);
collector.collect(a);
}
}
......@@ -155,12 +158,22 @@ public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel,
}
}
private static EventCross toEventCross(CarTrackModel carTrack){
private static EventCross toEventCross(CarTrackModel carTrack,long timeTamp){
EventCross e = new EventCross();
e.setPlateColor(carTrack.getLicenseColor());
e.setVehicleColor(carTrack.getOriginalColor());
e.setRoadnet(carTrack.getRoadnet());
e.setCrossId(carTrack.getRoadnet().getCrossId());
e.setRid(carTrack.getRoadnet().getRid());
e.setEntryLane(carTrack.getRoadnet().getLaneId());
try {
String[] turns = TurnEnum.getLaneTurn(carTrack.getRoadnet().getTurn());
List<String> turnList = Arrays.asList(turns);
String laneFunction = turnList.stream().filter(o -> Objects.nonNull(o)).collect(Collectors.joining(","));
e.setLaneFunction(laneFunction);
} catch (Exception e1) {
e1.printStackTrace();
}
e.setTrackID(carTrack.getId());
e.setLng(carTrack.getLongitude());
e.setLat(carTrack.getLatitude());
......@@ -169,12 +182,8 @@ public class ConflictCheckConflictFlatMap implements FlatMapFunction<FrameModel,
e.setDriveAngle(carTrack.getCourseAngle());
e.setPlateNumber(carTrack.getPicLicense());
e.setVehicleType(carTrack.getOriginalType());
e.setTime(carTrack.getGlobalTimeStamp());
e.setTime(timeTamp);
e.setAreaDist(carTrack.getWaitingTurnLaneDist());
e.setTravelTime((long) carTrack.getTravelTime());
return e;
......
package com.wanji.indicators.task.conflictpoint.func;
package com.wanji.indicators.event.cross.conflictpoint.func;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.FrameModel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
......@@ -12,7 +11,7 @@ import java.util.*;
public class ConflictCongestionFilterFlatMap implements FlatMapFunction<FrameModel, FrameModel> {
@Override
public void flatMap(FrameModel frameModel, Collector<FrameModel> collector) throws Exception {
List<CarTrackModel> trackList = frameModel.getE1FrameParticipant();
List<CarTrackModel> trackList = frameModel.getTrackList();
ConfigModel configModel = new ConfigModel();
Map<String, List<CarTrackModel>> listMap = new HashMap<>();
......@@ -59,7 +58,7 @@ public class ConflictCongestionFilterFlatMap implements FlatMapFunction<FrameMod
resultList.add(carTrack);
}
}
frameModel.setE1FrameParticipant(resultList);
frameModel.setTrackList(resultList);
collector.collect(frameModel);
}
}
package com.wanji.indicators.task.conflictpoint.func;
package com.wanji.indicators.event.cross.conflictpoint.func;
import com.wanji.indicators.model.EventCross;
import com.wanji.indicators.model.event.confilct.EventCross;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
......
package com.wanji.indicators.task.conflictpoint.func;
package com.wanji.indicators.event.cross.conflictpoint.func;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.constant.EventType;
import com.wanji.indicators.model.ConflictPointModelBase;
import com.wanji.indicators.model.EventCross;
import com.wanji.indicators.model.event.confilct.EventCross;
import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
......@@ -14,12 +15,13 @@ public class ConflictEncapsulationResultFlatMap implements FlatMapFunction<Event
private static final Logger log = LoggerFactory.getLogger(ConflictEncapsulationResultFlatMap.class);
@Override
public void flatMap(EventCross event, Collector<String> out) throws Exception {
// log.info("冲突点日志: " + event.getTime() + " " + event.getPlateNumber() + " " +
// event.getAreaDist() + " " +
// event.getCrossing() + " " +
// event.getSpeed() + " ");
if(StringUtils.isNotBlank(event.getPlateNumber())){
log.info("冲突点日志: " + event.getTime() + " " + event.getPlateNumber() + " " +
event.getAreaDist() + " " +
event.getCrossing() + " " +
event.getSpeed() + " ");
// if(StringUtils.isNotBlank(event.getPlateNumber())){
ConflictPointModelBase model = new ConflictPointModelBase();
model.setCrossId(event.getCrossId());
model.setTrackId(event.getTrackID());
model.setPlateNumber(event.getPlateNumber());
......@@ -28,10 +30,12 @@ public class ConflictEncapsulationResultFlatMap implements FlatMapFunction<Event
model.setVehicleColor(event.getVehicleColor());
model.setVehicleBrand(event.getVehicleBrand());
model.setVehicleType(event.getVehicleType() + "");
model.setTrackIdNear(event.getTrackIdNear());
// model.setEventClass(EventType.EVENT.getName());
// model.setEventType(EventType.CONFLICT_POINT.getName());
// model.setTypeCode(EventType.CONFLICT_POINT.getCode());
model.setEventClass(EventType.EVENT.getName());
model.setEventType(EventType.CONFLICT_POINT.getName());
model.setTypeCode(EventType.CONFLICT_POINT.getCode());
model.setDesc("路口面冲突点: 车辆 " + event.getPlateNumber() + "行驶方向为 " + PtInPolyUtil.getDir(event.getDriveAngle()));
model.setDescEn("Intersection conflict point: the driving direction of vehicle "+ "行驶方向为 " + PtInPolyUtil.getDirEn(event.getDriveAngle()));
model.setSpeed(event.getSpeed());
......@@ -66,9 +70,10 @@ public class ConflictEncapsulationResultFlatMap implements FlatMapFunction<Event
model.getEndTime() > model.getStartTime()) {
out.collect(JSONObject.toJSONString(model));
}
model.setNearTackObj(event.getNearTrackObj());
model = null;
}
// }
}
......
package com.wanji.indicators.task.conflictpoint.func;
package com.wanji.indicators.event.cross.conflictpoint.func;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.util.CommonUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
......@@ -12,27 +15,25 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConflictFrameFlatMap implements FlatMapFunction<FrameModel, FrameModel> {
static Map<Integer, CarTrackModel.RoadNet> carRoadnetMap = new HashMap<>();
public class ConflictFrameFlatMap implements FlatMapFunction<CrossFrameModel, CrossFrameModel> {
@Override
public void flatMap(FrameModel frameModel, Collector<FrameModel> collector) throws Exception {
List<CarTrackModel> trackList = frameModel.getE1FrameParticipant();
public void flatMap(CrossFrameModel frameModel, Collector<CrossFrameModel> collector) throws Exception {
List<CarTrackModel> trackList = frameModel.getTrackList();
ConfigModel configModel = new ConfigModel();
if (configModel.getConflictIsOpen() == 1) {
List<CarTrackModel> resultList = new ArrayList<>();
for (CarTrackModel carTrack : trackList) {
CarTrackModel.RoadNet roadNet = carTrack.getRoadnet();
//路口内
if (StringUtils.equals(carTrack.getRoadnet().getLaneId(), "null")) {
//路口内无车道ID、路段ID信息,从上一次运行缓存获取标记为从哪个车道出来
//筛选在路口内的车辆
if (roadNet.getInCrossFlag()== Constant.CROSSING) {
resultList.add(carTrack);
} else {
carRoadnetMap.put(carTrack.getId(),roadNet);
}
}
frameModel.setE1FrameParticipant(resultList);
frameModel.setTrackList(resultList);
collector.collect(frameModel);
}
}
......
package com.wanji.indicators.event.cross.conflictpoint.func;
import com.wanji.indicators.entity.BaseLaneInfo;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.FrameModel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConflictLaneInfoCoFlatMap implements CoFlatMapFunction<CrossFrameModel, Map<String, BaseLaneInfo>, CrossFrameModel> {
private Map<String, BaseLaneInfo> map = new HashMap();
@Override
public void flatMap1(CrossFrameModel frameModel, Collector<CrossFrameModel> collector) throws Exception {
List<CarTrackModel> trackList = frameModel.getTrackList();
List<CarTrackModel> resList = new ArrayList<>();
for (CarTrackModel carTrack : trackList) {
String entryLane = carTrack.getRoadnet().getLaneId();
BaseLaneInfo baseLaneInfo = map.get(entryLane);
if (baseLaneInfo != null) {
carTrack.getRoadnet().setTurn(baseLaneInfo.getTurn());
} else {
continue;
}
resList.add(carTrack);
}
frameModel.setTrackList(resList);
collector.collect(frameModel);
}
@Override
public void flatMap2(Map<String, BaseLaneInfo> hashMap, Collector<CrossFrameModel> collector) throws Exception {
this.map = hashMap;
}
}
package com.wanji.indicators.task.conflictpoint.func;
package com.wanji.indicators.event.cross.conflictpoint.func;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
......@@ -17,7 +17,7 @@ public class ConflictWaitingAreaCoFlatMap implements CoFlatMapFunction<FrameMode
@Override
public void flatMap1(FrameModel frameModel, Collector<FrameModel> collector) throws Exception {
List<CarTrackModel> resultList = new ArrayList<>();
List<CarTrackModel> trackList = frameModel.getE1FrameParticipant();
List<CarTrackModel> trackList = frameModel.getTrackList();
for(CarTrackModel carTrack : trackList){
String entryLane = carTrack.getRoadnet().getLaneId();
if (StringUtils.isNotBlank(entryLane)) {
......@@ -31,7 +31,7 @@ public class ConflictWaitingAreaCoFlatMap implements CoFlatMapFunction<FrameMode
}
}
}
frameModel.setE1FrameParticipant(resultList);
frameModel.setTrackList(resultList);
collector.collect(frameModel);
}
......
//package com.wanji.indicators.task.conflictpoint.ttc;
package com.wanji.indicators.event.cross.conflictpoint.ttc;//package com.wanji.indicators.task.conflictpoint.ttc;
//
//import com.alibaba.fastjson.JSONObject;
//import com.mapabc.constant.Constant;
......
//package com.wanji.indicators.task.conflictpoint.ttc;
package com.wanji.indicators.event.cross.conflictpoint.ttc;//package com.wanji.indicators.task.conflictpoint.ttc;
//
//import com.mapabc.model.event.EventCross;
//
......
package com.wanji.indicators.event.cross.congestion;
import com.wanji.indicators.event.cross.congestion.funcnew.*;
import com.wanji.indicators.model.BaseEventResultModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.model.event.congestion.CongestionFrameModel;
import com.wanji.indicators.model.event.congestion.CongestionModel;
import com.wanji.indicators.model.event.congestion.CongestionOverflowModel;
import com.wanji.indicators.source.ConsulAreaSource;
import com.wanji.indicators.source.CrossShapeSource;
import com.wanji.indicators.source.RidDirSource;
import com.wanji.indicators.source.WaitLaneSource;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Properties;
/**
* 拥堵溢出检测
* @author fengyi
*/
public class CongestionEventMainNew implements Serializable {
private static final Logger log = LoggerFactory.getLogger(CongestionEventMainNew.class);
private StreamExecutionEnvironment env;
private PropertiesHelper instance;
private Properties p;
private String path;
//本地调试设为true输出结果保存到文件
private boolean isDebug = false;
private CongestionEventMainNew(StreamExecutionEnvironment env, String path, boolean isDebug) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = instance.getProperties();
this.path = path;
this.isDebug = isDebug;
}
public static CongestionEventMainNew init(StreamExecutionEnvironment env, String path, boolean isDebug) {
return new CongestionEventMainNew(env, path,isDebug);
}
private CongestionEventMainNew(StreamExecutionEnvironment env) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = instance.getProperties();
}
public static CongestionEventMainNew init(StreamExecutionEnvironment env) {
return new CongestionEventMainNew(env);
}
public void run(DataStream<FrameModel> thinningDataStream) {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
p.getProperty("cross.event.data.topic"),
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
instance.getProducerProperties(), FlinkKafkaProducer.Semantic.NONE);
//解析数据
SingleOutputStreamOperator<CongestionFrameModel> accidentFrameModelFilter = null;
// thinningDataStream
// .flatMap(new CongestionFlatMap())
// .setParallelism(1)
// .name("路口溢出-解析数据");
//计算车辆在路口内的旅行时间
SingleOutputStreamOperator<CongestionFrameModel> checkDataKeyedProcess = accidentFrameModelFilter
.keyBy(CongestionFrameModel::getCrossId)
.process(new CongestionCheckDataKeyedProcess())
.setParallelism(1)
.name("路口溢出-计算车辆在路口内的旅行时间");
//匹配待转区
SingleOutputStreamOperator<CongestionFrameModel> checkWaitLaneKeyedProcess = checkDataKeyedProcess
.connect(env.addSource(new WaitLaneSource()).broadcast())
.flatMap(new CongestionCheckWaitLaneCoFlatMap())
.setParallelism(1)
.name("路口溢出-匹配待转区");
//匹配拥堵检测区域
SingleOutputStreamOperator<CongestionFrameModel> checkCongestionArea = checkWaitLaneKeyedProcess
.connect(env.addSource(new ConsulAreaSource()).broadcast())
.flatMap(new CongestionAreaCheckCoFlatMap())
.setParallelism(1)
.name("事故检测-匹配拥堵检测区域");
//匹配对应拥堵出口rid的方向
SingleOutputStreamOperator<CongestionFrameModel> checkDirCoFlatMap = checkCongestionArea
.connect(env.addSource(new RidDirSource()).broadcast())
.flatMap(new CongestionCheckDirCoFlatMap())
.setParallelism(1)
.name("路口溢出-匹配车辆对应的出口rid的方向");
//计算是否疑似路口溢出 开始/消散
SingleOutputStreamOperator<CongestionModel> checkCongestionKeyedProcess = checkDirCoFlatMap
.keyBy(CongestionFrameModel::getCrossId)
.process(new CongestionKeyedProcess())
.setParallelism(1)
.name("路口溢出-计算是否路口溢出");
//读取路口形状并根据 "车道转向功能" 和 "进口rid" 匹配对应的 "出口rid"
// SingleOutputStreamOperator<CongestionModel> checkRidInCoFlatMap = checkCongestionKeyedProcess
// .connect(env.addSource(new CrossShapeSource()).broadcast())
// //.flatMap(new CongestionCheckOutRidCoFlatMap())
// //.setParallelism(1)
// .name("路口溢出-匹配车辆对应的出口rid");
//路口溢出-逻辑去重
SingleOutputStreamOperator<CongestionOverflowModel> congestionOverflowStream = checkCongestionKeyedProcess
.keyBy("crossId", "ridOut", "startTime")
.process(new CongestionValueStateKeyedProcess())
.setParallelism(1)
.name("路口溢出-逻辑去重");
if (!isDebug) {
SingleOutputStreamOperator<String> stringSingleOutputStreamOperator = congestionOverflowStream
.flatMap(new CongestionToStringFlatMap())
.setParallelism(1)
.name("路口溢出-封装结果集");
stringSingleOutputStreamOperator.addSink(kafkaProducer).setParallelism(1).name("路口溢出-结果输出至kafka");
stringSingleOutputStreamOperator.print("拥堵溢出检测-结果").name("路口溢出-结果打印");
} else {
SingleOutputStreamOperator<BaseEventResultModel> baseEventResultModelStream = congestionOverflowStream.flatMap(new FlatMapFunction<CongestionOverflowModel, BaseEventResultModel>() {
@Override
public void flatMap(CongestionOverflowModel congestionOverflowModel, Collector<BaseEventResultModel> collector) throws Exception {
collector.collect(congestionOverflowModel);
}
});
// baseEventResultModelStream.addSink(new EventFileSink(this.path));
}
}
}
package com.wanji.indicators.event.cross.congestion.funcnew;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.event.congestion.CongestionAreaDo;
import com.wanji.indicators.model.event.congestion.CongestionFrameModel;
import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.*;
/**
* 2022/3/17 15:52
*
* @auther fengyi
*/
public class CongestionAreaCheckCoFlatMap implements CoFlatMapFunction<CongestionFrameModel, HashMap<String, Map<String, CongestionAreaDo>>, CongestionFrameModel> {
private HashMap<String, Map<String, CongestionAreaDo>> hashMap = new HashMap();
@Override
public void flatMap1(CongestionFrameModel congestionFrameModel, Collector<CongestionFrameModel> collector) throws Exception {
List<CongestionFrameModel.CarTrack> trackList = congestionFrameModel.getTrackList();
ConfigModel configModel = congestionFrameModel.getConfigModel();
List<CongestionFrameModel.CarTrack> list = new ArrayList<>();
Set<Integer> set = new HashSet<>();
Map<String, CongestionAreaDo> congestionAreaDoMap = hashMap.get(congestionFrameModel.getCrossId());
if (congestionAreaDoMap != null) {
Map<String, Integer> map = new HashMap<>();
for (CongestionAreaDo congestionAreaDo : congestionAreaDoMap.values()) {
int laneNum = congestionAreaDo.getLaneNum();
if (laneNum > 0) {
//存入各出口rid对应的车道数
map.put(congestionAreaDo.getRid(), laneNum);
}
}
congestionFrameModel.setCongestionInfo(map);
for (CongestionFrameModel.CarTrack carTrack : trackList) {
String entryLane = carTrack.getEntryLane();
if (carTrack.getCrossing() == Constant.CROSSING && carTrack.getSpeed() < configModel.getAbnormalParkCongestSpeed() && StringUtils.isNotBlank(entryLane)) {
for (CongestionAreaDo congestionAreaDo : congestionAreaDoMap.values()) {
String ridIn = entryLane.substring(0, 23);
//判断是否属于该方向验证的入口rid数据
if (congestionAreaDo.getRidInArr().contains(ridIn)) {
PtInPolyUtil.Point point = new PtInPolyUtil.Point(carTrack.getLng(), carTrack.getLat());
boolean isIn = PtInPolyUtil.isPtInPoly(point, congestionAreaDo.getPoints());
if (isIn) {
if (set.add(carTrack.getTrackID())) {
carTrack.setRidOut(congestionAreaDo.getRid());
list.add(carTrack);
}
}
}
}
}
}
for (CongestionFrameModel.CarTrack carTrack : trackList) {
if (!set.contains(carTrack.getTrackID())) {
list.add(carTrack);
}
}
congestionFrameModel.setTrackList(list);
}
collector.collect(congestionFrameModel);
}
@Override
public void flatMap2(HashMap<String, Map<String, CongestionAreaDo>> stringCongestionAreaDoHashMap, Collector<CongestionFrameModel> collector) throws Exception {
this.hashMap = stringCongestionAreaDoHashMap;
}
}
package com.wanji.indicators.event.cross.congestion.funcnew;
import com.wanji.indicators.model.event.congestion.CongestionFrameModel;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.*;
/**
* 2021/11/18 16:47
*
* @auther fengyi
*/
public class CongestionCheckDataKeyedProcess extends KeyedProcessFunction<String, CongestionFrameModel, CongestionFrameModel> {
private MapState<Integer, CongestionFrameModel.CarTrack> mapState;
@Override
public void open(Configuration parameters) throws Exception {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("mapStateCongestion_ridCheck_11x", Integer.class, CongestionFrameModel.CarTrack.class));
}
@Override
public void processElement(CongestionFrameModel congestionFrameModel, Context context, Collector<CongestionFrameModel> collector) throws Exception {
long timeNow = congestionFrameModel.getTime();
Map<Integer,CongestionFrameModel.CarTrack> map = new HashMap<>();
List<CongestionFrameModel.CarTrack> trackList = congestionFrameModel.getTrackList();
List<CongestionFrameModel.CarTrack> list = new ArrayList<>();
for (CongestionFrameModel.CarTrack carTrack : trackList) {
int trackID = carTrack.getTrackID();
map.put(trackID, carTrack);
CongestionFrameModel.CarTrack carTrackFirst = mapState.get(trackID);
if (carTrackFirst == null) {
carTrack.setTravelTime(0);
mapState.put(trackID, carTrack);
} else {
long duration = carTrack.getTime() - carTrackFirst.getTime();
if (duration > 0L) {
carTrack.setTravelTime(duration/1000.0);
}
}
list.add(carTrack);
}
//清除过期数据
Iterator<CongestionFrameModel.CarTrack> iterator = mapState.values().iterator();
List<CongestionFrameModel.CarTrack> listMapState = new ArrayList<>();
while (iterator.hasNext()) {
CongestionFrameModel.CarTrack next = iterator.next();
listMapState.add(next);
}
for (CongestionFrameModel.CarTrack next : listMapState) {
int trackID = next.getTrackID();
CongestionFrameModel.CarTrack carTrack = map.get(trackID);
//查询当前帧数据中没有此trackID 说明该目标 上一帧有 当前帧没有
if (carTrack == null) {
//计算当前此目标与时间差多少毫秒 若超过10秒则清除该目标
long timeX = timeNow - next.getTime();
if (timeX > 10000L) {
mapState.remove(trackID);
// System.out.println(" 数据过期 " + trackID + " " + timeX + " " + list.size());
}
}
}
//发送数据
if (!list.isEmpty()) {
congestionFrameModel.setTrackList(list);
collector.collect(congestionFrameModel);
}
}
}
package com.wanji.indicators.event.cross.congestion.funcnew;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.event.congestion.CongestionFrameModel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* 2022/1/12 1:01
*
* @auther fengyi
*/
public class CongestionCheckDirCoFlatMap implements CoFlatMapFunction<CongestionFrameModel, HashMap<String, String>, CongestionFrameModel> {
private HashMap<String, String> ridDirMap = new HashMap<>();
@Override
public void flatMap1(CongestionFrameModel congestionFrameModel, Collector<CongestionFrameModel> collector) throws Exception {
List<CongestionFrameModel.CarTrack> trackList = congestionFrameModel.getTrackList();
List<CongestionFrameModel.CarTrack> list = new ArrayList<>();
for (CongestionFrameModel.CarTrack carTrack : trackList) {
String ridOut = carTrack.getRidOut();
if (StringUtils.isNotBlank(ridOut)) {
String dirCode = ridDirMap.get(ridOut + Constant.MARK);
if (StringUtils.isNotBlank(dirCode)){
carTrack.setRidOutDir(dirCode);
} else {
carTrack.setRidOutDir("");
}
}
list.add(carTrack);
}
congestionFrameModel.setTrackList(list);
collector.collect(congestionFrameModel);
}
@Override
public void flatMap2(HashMap<String, String> stringStringHashMap, Collector<CongestionFrameModel> collector) throws Exception {
this.ridDirMap = stringStringHashMap;
}
}
package com.wanji.indicators.event.cross.congestion.funcnew;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.event.congestion.CongestionModel;
import com.wanji.indicators.model.event.congestion.CrossShapeModel;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
/**
* 2022/1/11 18:13
*
* @auther fengyi
*/
public class CongestionCheckOutRidCoFlatMap implements CoFlatMapFunction<CongestionModel, HashMap<String, CrossShapeModel>, CongestionModel> {
private HashMap<String, CrossShapeModel> hashMap = new HashMap<>();
@Override
public void flatMap1(CongestionModel congestionFrameModel, Collector<CongestionModel> collector) throws Exception {
String ridOut = congestionFrameModel.getRidOut();
String key = ridOut + Constant.MARK + Constant.STRAIGHT_LINE_CODE;
CrossShapeModel crossShapeModel = hashMap.get(key);
if (crossShapeModel != null) {
congestionFrameModel.setRidIn(crossShapeModel.getRidIn());
}
collector.collect(congestionFrameModel);
}
@Override
public void flatMap2(HashMap<String, CrossShapeModel> stringCrossShapeModelHashMap, Collector<CongestionModel> collector) throws Exception {
this.hashMap = stringCrossShapeModelHashMap;
}
}
package com.wanji.indicators.event.cross.congestion.funcnew;
import com.wanji.indicators.model.event.congestion.CongestionFrameModel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* 2021/11/19 17:31
*
* @auther fengyi
*/
public class CongestionCheckWaitLaneCoFlatMap implements CoFlatMapFunction<CongestionFrameModel, HashMap<String, Integer>, CongestionFrameModel> {
private HashMap<String, Integer> mapWaitLane = new HashMap<>();
@Override
public void flatMap1(CongestionFrameModel accidentCongestionFrameModel, Collector<CongestionFrameModel> collector) throws Exception {
List<CongestionFrameModel.CarTrack> trackList = accidentCongestionFrameModel.getTrackList();
List<CongestionFrameModel.CarTrack> list = new ArrayList<>();
for(CongestionFrameModel.CarTrack carTrack : trackList) {
String entryLane = carTrack.getEntryLane();
if (StringUtils.isNotBlank(entryLane)) {
//待转区距离停止线的距离
Integer waitLaneDistance = mapWaitLane.get(entryLane);
if( waitLaneDistance != null ) {
if (carTrack.getAreaDist() > waitLaneDistance + 2.5) {
carTrack.setWaitLaneDistance(waitLaneDistance);
list.add(carTrack);
}
} else {
list.add(carTrack);
}
}
}
accidentCongestionFrameModel.setTrackList(list);
collector.collect(accidentCongestionFrameModel);
}
@Override
public void flatMap2(HashMap<String, Integer> stringIntegerHashMap, Collector<CongestionFrameModel> collector) throws Exception {
this.mapWaitLane = stringIntegerHashMap;
}
}
package com.wanji.indicators.event.cross.congestion.funcnew;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.FrameModel;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
/**
* 2022/2/16 16:42
*
* @auther fengyi
*/
public class CongestionConfigCoFlatMap implements CoFlatMapFunction<CrossFrameModel, HashMap<String, ConfigModel>, CrossFrameModel> {
private HashMap<String, ConfigModel> hashMap;
@Override
public void flatMap1(CrossFrameModel frameModel, Collector<CrossFrameModel> collector) throws Exception {
if (hashMap != null) {
ConfigModel model = hashMap.get(frameModel.getCrossId());
if (model != null) {
if (model.getOverFlowCongestIsOpen() == Constant.OPEN) {
frameModel.setConfigModel(model);
collector.collect(frameModel);
}
} else {
frameModel.setConfigModel(new ConfigModel());
collector.collect(frameModel);
}
}
}
@Override
public void flatMap2(HashMap<String, ConfigModel> stringConfigModelHashMap, Collector<CrossFrameModel> collector) throws Exception {
this.hashMap = stringConfigModelHashMap;
}
}
package com.wanji.indicators.event.cross.congestion.funcnew;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.model.event.congestion.CongestionFrameModel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.springframework.beans.BeanUtils;
import java.util.ArrayList;
import java.util.List;
/**
* 2021/11/18 16:44
*
* @auther fengyi
*/
public class CongestionFlatMap implements FlatMapFunction<CrossFrameModel, CongestionFrameModel> {
@Override
public void flatMap(CrossFrameModel frameModel, Collector<CongestionFrameModel> collector) throws Exception {
List<CarTrackModel> trackList = frameModel.getTrackList();
ConfigModel configModel = frameModel.getConfigModel();
if (configModel.getOverFlowCongestIsOpen() == Constant.OPEN) {
List<CongestionFrameModel.CarTrack> list = new ArrayList<>();
for (CarTrackModel carTrack : trackList) {
if (StringUtils.equals(carTrack.getRoadnet().getLaneId(), "null")) {
CongestionFrameModel.CarTrack car = new CongestionFrameModel.CarTrack();
BeanUtils.copyProperties(carTrack,car);
list.add(car);
}
}
CongestionFrameModel model = new CongestionFrameModel();
model.setConfigModel(configModel);
model.setCrossId(frameModel.getCrossId());
model.setTime(frameModel.getGlobalTimeStamp());
model.setTrackList(list);
collector.collect(model);
}
}
}
package com.wanji.indicators.event.cross.congestion.funcnew;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.event.congestion.CongestionOverflowModel;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
public class CongestionToStringFlatMap implements FlatMapFunction<CongestionOverflowModel, String> {
@Override
public void flatMap(CongestionOverflowModel model, Collector<String> collector) throws Exception {
String s = JSONObject.toJSONString(model);
collector.collect(s);
}
}
package com.wanji.indicators.event.cross.congestion.funcnew;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.constant.DirectionFromCnToEn;
import com.wanji.indicators.constant.EventType;
import com.wanji.indicators.model.event.congestion.CongestionModel;
import com.wanji.indicators.model.event.congestion.CongestionOverflowModel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* 2022/1/12 17:48
*
* @auther fengyi
*/
public class CongestionValueStateKeyedProcess extends KeyedProcessFunction<Tuple, CongestionModel, CongestionOverflowModel> {
private ValueState<CongestionModel> valueState;
@Override
public void open(Configuration parameters) {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<>("congestionOverFlowValueState_1111III_ox",CongestionModel.class));
}
@Override
public void processElement(CongestionModel congestionModel, Context context, Collector<CongestionOverflowModel> collector) throws Exception {
CongestionModel value = valueState.value();
if (value == null) {
CongestionOverflowModel model = new CongestionOverflowModel();
model.setCrossId(congestionModel.getCrossId());
model.setRid(congestionModel.getRidIn());
model.setRidOut(congestionModel.getRidOut());
model.setTrackId(0);
model.setEventClass(EventType.EVENT.getName());
model.setTypeCode(EventType.OVERFLOW_CONGESTION.getCode());
model.setEventType(EventType.OVERFLOW_CONGESTION.getName());
model.setStartTime(congestionModel.getStartTime());
model.setEndTime(congestionModel.getEndTime());
model.setTime(congestionModel.getStartTime());
model.setDesc("路口溢出: 路口" + congestionModel.getDirection() + "车辆行驶缓慢,疑似路口溢出。");
model.setRecoveryCross(congestionModel.getRecoveryCross());
if (StringUtils.isNotBlank(congestionModel.getDirection())) {
String dirCnDesc = congestionModel.getDirection();
String dirCn = dirCnDesc.substring(0,dirCnDesc.length()-2);
String dirEn = DirectionFromCnToEn.getEnDir(dirCn);
model.setDescEn("Overflow congestion: vehicles at the "+dirEn+" exit of the intersection drive slowly, which is suspected of overflow congestion");
model.setPlateNumber(congestionModel.getDirection());
} else {
model.setPlateNumber("路口溢出");
}
model.setVehicleType("");
model.setX(congestionModel.getLng());
model.setY(congestionModel.getLat());
model.setLocation(new double[]{congestionModel.getLng(), congestionModel.getLat()});
model.setDuration(congestionModel.getDuration());
model.setDir(congestionModel.getDirection());
model.setPictureTime1(model.getStartTime() - 3000L);
model.setPictureTime2(model.getStartTime());
model.setPictureTime3(model.getStartTime() + 3000L);
model.setCheckOutTime(model.getDuration());
model.setId(model.getCrossId() + Constant.MARK + model.getRidOut() + Constant.MARK + model.getTypeCode() + Constant.MARK + model.getStartTime());
collector.collect(model);
valueState.update(congestionModel);
context.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 7200000L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<CongestionOverflowModel> out) throws Exception {
valueState.clear();
}
}
package com.wanji.indicators.event.cross.deadlock;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.event.cross.deadlock.func.*;
import com.wanji.indicators.model.BaseEventResultModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.model.event.deadlock.CrossDeadlockResultModel;
import com.wanji.indicators.model.event.deadlock.DeadLockFrameModel;
import com.wanji.indicators.sink.EventFileSink;
import com.wanji.indicators.source.ConflictTurnSource;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import org.springframework.beans.BeanUtils;
import java.io.Serializable;
import java.util.Properties;
/**
* 死锁监测
*
* @author fengyi
*/
public class DeadlockEventMainNew implements Serializable {
private StreamExecutionEnvironment env;
private PropertiesHelper instance;
private Properties p;
private String path;
//本地调试设为true输出结果保存到文件
private boolean isDebug = false;
private DeadlockEventMainNew(StreamExecutionEnvironment env, String path, boolean isDebug) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = instance.getProperties();
this.path = path;
this.isDebug = isDebug;
}
private DeadlockEventMainNew(StreamExecutionEnvironment env) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = instance.getProperties();
}
public static DeadlockEventMainNew init(StreamExecutionEnvironment env, String path, boolean isDebug) {
return new DeadlockEventMainNew(env, path, isDebug);
}
public static DeadlockEventMainNew init(StreamExecutionEnvironment env) {
return new DeadlockEventMainNew(env);
}
public void run(DataStream<CrossFrameModel> thinningDataStream) {
SingleOutputStreamOperator<CrossFrameModel> time = thinningDataStream
.keyBy(CrossFrameModel::getCrossId)
.countWindow(3)
.maxBy("time");
//解析数据
SingleOutputStreamOperator<DeadLockFrameModel> frameModelFilter = time
.flatMap(new DeadlockFlatMap())
.setParallelism(1)
.name("路口死锁-解析数据");
//处理RID挂接、车道功能转换
SingleOutputStreamOperator<DeadLockFrameModel> ridFuncProcess = frameModelFilter
.keyBy(DeadLockFrameModel::getCrossId)
.process(new DeadlockCheckDataKeyedProcess())
.setParallelism(1)
.name("路口死锁-处理RID挂接、车道功能转换");
//加载冲突相位信息
SingleOutputStreamOperator<DeadLockFrameModel> loadConflictSourceProcess = ridFuncProcess
.connect(env.addSource(new ConflictTurnSource()).broadcast())
.flatMap(new DeadlockConflictSlotCoFlatMap())
.setParallelism(1)
.name("路口死锁-进口转向关联的冲突转向");
//冲突方向检测
SingleOutputStreamOperator<DeadLockFrameModel> conflictDireProcess = loadConflictSourceProcess
.keyBy(DeadLockFrameModel::getCrossId)
.process(new DeadlockConflictCheckKeyedProcess())
.setParallelism(1)
.name("路口死锁-转向冲突线检测");
//死锁检测
SingleOutputStreamOperator<CrossDeadlockResultModel> deadlockCheckProcess = conflictDireProcess
.keyBy(DeadLockFrameModel::getCrossId)
.process(new DeadlockStopCheckKeyedProcess())
.setParallelism(1)
.name("路口死锁-死锁检测");
//路口死锁-逻辑去重
SingleOutputStreamOperator<CrossDeadlockResultModel> deadlockDistinctProcess = deadlockCheckProcess
.keyBy("crossId", "startTime")
.process(new DeadlockDistinctProcess())
.setParallelism(1)
.name("路口死锁-逻辑去重");
if (isDebug) {
SingleOutputStreamOperator<BaseEventResultModel> resultOutProcess = deadlockDistinctProcess
.flatMap(new FlatMapFunction<CrossDeadlockResultModel, BaseEventResultModel>() {
@Override
public void flatMap(CrossDeadlockResultModel baseEventResultModel, Collector<BaseEventResultModel> collector) throws Exception {
BaseEventResultModel target = new BaseEventResultModel();
BeanUtils.copyProperties(baseEventResultModel, target);
target.setPictureTime1(baseEventResultModel.getStartTime());
target.setPictureTime2(baseEventResultModel.getTime() + baseEventResultModel.getDuration()/2);
target.setPictureTime3(baseEventResultModel.getEndTime());
collector.collect(target);
}
})
.setParallelism(1)
.name("路口死锁-死锁检测");
resultOutProcess.addSink(new EventFileSink(path)).setParallelism(1).name("路口死锁-结果输出至本地文件");
resultOutProcess.print("路口死锁-结果").name("路口死锁-结果打印");
} else {
//输出json
SingleOutputStreamOperator<String> jsonOutProcess = deadlockDistinctProcess
.flatMap(new FlatMapFunction<CrossDeadlockResultModel, String>() {
@Override
public void flatMap(CrossDeadlockResultModel baseEventResultModel, Collector<String> collector) throws Exception {
BaseEventResultModel target = new BaseEventResultModel();
BeanUtils.copyProperties(baseEventResultModel, target);
target.setPictureTime1(baseEventResultModel.getStartTime());
target.setPictureTime2(baseEventResultModel.getTime() + baseEventResultModel.getDuration()/2);
target.setPictureTime3(baseEventResultModel.getEndTime());
collector.collect(JSONObject.toJSONString(target));
}
})
.setParallelism(1)
.name("路口死锁-死锁检测");
/**
* 数据输出 -> kafka
*/
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
p.getProperty("cross.event.data.topic"),
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
instance.getProducerProperties(),
FlinkKafkaProducer.Semantic.NONE);
jsonOutProcess.addSink(kafkaProducer).setParallelism(1).name("路口死锁-结果输出至kafka");
jsonOutProcess.print("路口死锁-结果").name("路口死锁-结果打印");
}
}
}
package com.wanji.indicators.event.cross.deadlock.func;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.event.deadlock.DeadLockFrameModel;
import com.wanji.indicators.util.CommonUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.*;
/**
* 处理RID挂接、车道功能转换
*/
public class DeadlockCheckDataKeyedProcess extends KeyedProcessFunction<String, DeadLockFrameModel, DeadLockFrameModel> {
private MapState<Integer, DeadLockFrameModel.CarTrack> mapState;
@Override
public void open(Configuration parameters) throws Exception {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("mapStateDeadlock_ridCheck_1", Integer.class, DeadLockFrameModel.CarTrack.class));
}
@Override
public void processElement(DeadLockFrameModel deadLockFrameModel, Context context, Collector<DeadLockFrameModel> collector) throws Exception {
long timeNow = deadLockFrameModel.getTime();
List<DeadLockFrameModel.CarTrack> trackList = deadLockFrameModel.getTrackList();
Map<Integer, DeadLockFrameModel.CarTrack> map = new HashMap<>();
List<DeadLockFrameModel.CarTrack> list = new ArrayList<>();
for (DeadLockFrameModel.CarTrack carTrack : trackList) {
map.put(carTrack.getTrackID(), carTrack);
DeadLockFrameModel.CarTrack carTrackState = mapState.get(carTrack.getTrackID());
String matchLane = carTrack.getMatchLane();
if (StringUtils.isNotBlank(matchLane)) {
if (carTrackState == null) {
carTrack.setEntryLane(matchLane);
carTrack.setRid(carTrack.getMatchLane().substring(0, 23));
carTrack.setLaneFunc(CommonUtil.getLaneFunc(carTrack.getMatchLaneName()));
mapState.put(carTrack.getTrackID(), carTrack);
} else {
carTrack.setMatchLane(carTrackState.getMatchLane());
carTrack.setEntryLane(carTrackState.getEntryLane());
carTrack.setRid(carTrackState.getRid());
carTrack.setLaneFunc(carTrackState.getLaneFunc());
}
} else {
if (carTrackState == null) {
String entryLane = carTrack.getEntryLane();
if (StringUtils.isNotBlank(entryLane)) {
carTrack.setRid(entryLane.substring(0, 23));
carTrack.setLaneFunc(Constant.STRAIGHT_LINE_CODE);
double stopLineDist = carTrack.getStopLineDist();
if (stopLineDist > 0.0 && stopLineDist < 6.0) {
carTrack.setRid(entryLane.substring(0, 23));
carTrack.setLaneFunc(CommonUtil.getLaneFunc(carTrack.getMatchLaneName()));
mapState.put(carTrack.getTrackID(), carTrack);
}
}
} else {
if (StringUtils.isNotBlank(carTrack.getEntryLane())) {
carTrack.setRid(carTrack.getEntryLane().substring(0, 23));
carTrack.setLaneFunc(carTrackState.getLaneFunc());
}
}
}
if (StringUtils.isNotBlank(carTrack.getLaneFunc())) {
list.add(carTrack);
} else {
//补录无车道属性的数据
if (carTrack.getCrossing() == Constant.CROSSING) {
carTrack.setMatchLane(Constant.RID_MARK_LANE);
carTrack.setEntryLane(Constant.RID_MARK_LANE);
carTrack.setRid(Constant.RID_MARK);
carTrack.setLaneFunc(Constant.STRAIGHT_LINE_CODE);
list.add(carTrack);
}
}
}
//清除过期数据
Iterator<DeadLockFrameModel.CarTrack> iterator = mapState.values().iterator();
List<DeadLockFrameModel.CarTrack> listMapState = new ArrayList<>();
while (iterator.hasNext()) {
DeadLockFrameModel.CarTrack next = iterator.next();
listMapState.add(next);
}
for (DeadLockFrameModel.CarTrack next : listMapState) {
int trackID = next.getTrackID();
DeadLockFrameModel.CarTrack carTrack = map.get(trackID);
//查询当前帧数据中没有此trackID 说明该目标 上一帧有 当前帧没有
if (carTrack == null) {
//计算当前此目标与时间差多少毫秒 若超过3秒则清除该目标
long timeX = timeNow - next.getTime();
if (timeX > 3000L) {
mapState.remove(trackID);
}
}
}
//发送数据
if (!list.isEmpty()) {
deadLockFrameModel.setTrackList(list);
collector.collect(deadLockFrameModel);
}
}
}
package com.wanji.indicators.event.cross.deadlock.func;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.event.deadlock.DeadLockFrameModel;
import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.*;
import java.util.stream.Collectors;
/**
* 检测是否存在冲突运行的车辆
*/
public class DeadlockConflictCheckKeyedProcess extends KeyedProcessFunction<String, DeadLockFrameModel, DeadLockFrameModel> {
@Override
public void open(Configuration parameters) {
}
@Override
public void processElement(DeadLockFrameModel deadLockFrameModel, Context context, Collector<DeadLockFrameModel> collector) throws Exception {
ConfigModel configModel = deadLockFrameModel.getConfigModel();
long nowTime = deadLockFrameModel.getTime();
List<DeadLockFrameModel.CarTrack> trackList = deadLockFrameModel.getTrackList();
List<DeadLockFrameModel.CarTrack> conflictTrackList = new ArrayList<>();
//冲突基础数据key:rid-turn
Map<String, List<Map<String, Object>>> conflictMap = deadLockFrameModel.getConflictSlotMap();
//按RID分组
Map<String, List<DeadLockFrameModel.CarTrack>> groupList = trackList.stream().collect(Collectors.groupingBy(o -> o.getRid()));
if (groupList.size() >= 2) {
//按RID+转向分组
Map<String, List<DeadLockFrameModel.CarTrack>> ridTurnCarList = new HashMap<>();
for (Map.Entry<String, List<DeadLockFrameModel.CarTrack>> entry : groupList.entrySet()) {
String rid = entry.getKey();
List<DeadLockFrameModel.CarTrack> value = entry.getValue();
for (DeadLockFrameModel.CarTrack ridCar : value) {
String laneFunc = ridCar.getLaneFunc();
String[] sps = laneFunc.split(Constant.COMMA);
for (String func : sps) {
String ridTurnKey = null;
if (func.contains(Constant.LEFT_TURN_CODE)) {
ridTurnKey = rid + Constant.MARK + Constant.LEFT_TURN_CODE;
} else if (func.contains(Constant.STRAIGHT_LINE_CODE)) {
ridTurnKey = rid + Constant.MARK + Constant.STRAIGHT_LINE_CODE;
}
List<DeadLockFrameModel.CarTrack> tmplist = ridTurnCarList.get(ridTurnKey);
if (Objects.isNull(tmplist)) {
tmplist = new ArrayList<>();
}
tmplist.add(ridCar);
ridTurnCarList.put(ridTurnKey, tmplist);
}
}
}
for (Map.Entry<String, List<DeadLockFrameModel.CarTrack>> entry : ridTurnCarList.entrySet()) {
String ridTurnKey = entry.getKey();
//冲突进口及转向
List<Map<String, Object>> maplist = conflictMap.get(ridTurnKey);
if (Objects.nonNull(maplist)) {
for (Map<String, Object> cmap : maplist) {
String rid = cmap.get("f_rid").toString();
String turn = cmap.get("turn_dir_no").toString();
String ckey = rid + Constant.MARK + turn;
//检测冲突方向是否有车辆
boolean isMatched = ridTurnCarList.containsKey(ckey);
if (isMatched) {
List<DeadLockFrameModel.CarTrack> carList = entry.getValue();
List<DeadLockFrameModel.CarTrack> carConflictList = ridTurnCarList.get(ckey);
long curDirStopSize = carList.stream().filter(o -> o.getSpeed() < configModel.getDeadlockStopCarSpeed()).count();
long conflictDirStopSize = carConflictList.stream().filter(o -> o.getSpeed() < configModel.getDeadlockStopCarSpeed()).count();
double minDist = this.calculateConlictCarMinDist(carList, carConflictList);
long conflictMinStopCarNums = configModel.getDeadlockConflictMinStopCarNums();
long conflictCarMaxDist = configModel.getDeadlockConflictCarMaxDistance();
//每个冲突方向至少停车数量,所有冲突车辆间最小距离满足阈值
if (curDirStopSize >= conflictMinStopCarNums && conflictDirStopSize >= conflictMinStopCarNums && minDist < conflictCarMaxDist) {
conflictTrackList.addAll(carList);
}
}
}
}
}
}
deadLockFrameModel.setTrackList(conflictTrackList);
collector.collect(deadLockFrameModel);
}
//计算所有冲突车辆的最小距离
private double calculateConlictCarMinDist(List<DeadLockFrameModel.CarTrack> carList, List<DeadLockFrameModel.CarTrack> carConflictList) {
double minDist = Double.MAX_VALUE;
for (DeadLockFrameModel.CarTrack carTrack1 : carList) {
double x1 = carTrack1.getLng();
double y1 = carTrack1.getLat();
for (DeadLockFrameModel.CarTrack carTrack2 : carConflictList) {
double x2 = carTrack2.getLng();
double y2 = carTrack2.getLat();
double distance = PtInPolyUtil.getDistance(x1, y1, x2, y2);
if (distance < minDist) {
minDist = distance;
}
}
}
return minDist;
}
/**
* 获取key
*
* @param carTrack
* @return
*/
private static String getKey(DeadLockFrameModel.CarTrack carTrack) {
return carTrack.getCrossId() + Constant.MARK + carTrack.getTrackID();
}
}
package com.wanji.indicators.event.cross.deadlock.func;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.event.deadlock.DeadLockFrameModel;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.*;
/**
* 2021/11/19 17:31
*
* @auther fengyi
*/
public class DeadlockConflictSlotCoFlatMap implements CoFlatMapFunction<DeadLockFrameModel, Map<String, List<Map<String, Object>>>, DeadLockFrameModel> {
private Map<String, List<Map<String, Object>>> conflictSlotMap = new HashMap<>();
@Override
public void flatMap1(DeadLockFrameModel frameModel, Collector<DeadLockFrameModel> collector) throws Exception {
if (conflictSlotMap != null) {
Map<String, List<Map<String, Object>>> filterMap = new HashMap<>();
Set<String> keys = conflictSlotMap.keySet();
//过滤出当前路口的数据
for (String key : keys) {
String[] sps = key.split(Constant.MARK);
String crossId = sps[2];
if (Objects.equals(crossId, frameModel.getCrossId())) {
String subKey = key.substring(0,key.lastIndexOf(Constant.MARK));
filterMap.put(subKey,conflictSlotMap.get(key));
}
}
frameModel.setConflictSlotMap(filterMap);
collector.collect(frameModel);
}
}
@Override
public void flatMap2(Map<String, List<Map<String, Object>>> sourceMap, Collector<DeadLockFrameModel> collector) throws Exception {
this.conflictSlotMap = sourceMap;
}
}
package com.wanji.indicators.event.cross.deadlock.func;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.event.deadlock.CrossDeadlockResultModel;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 2022/1/12 17:48
*
* @auther fengyi
*/
public class DeadlockDistinctProcess extends KeyedProcessFunction<Tuple, CrossDeadlockResultModel, CrossDeadlockResultModel> {
private static final Logger log = LoggerFactory.getLogger(DeadlockDistinctProcess.class);
private ValueState<CrossDeadlockResultModel> valueState;
@Override
public void open(Configuration parameters) {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<>("DeadlockDistinctProcess",CrossDeadlockResultModel.class));
}
@Override
public void processElement(CrossDeadlockResultModel deadlockResult, Context context, Collector<CrossDeadlockResultModel> collector) throws Exception {
CrossDeadlockResultModel value = valueState.value();
if (value == null) {
deadlockResult.setId(deadlockResult.getCrossId() + Constant.MARK + deadlockResult.getCrossId() + Constant.MARK + deadlockResult.getTypeCode() + Constant.MARK + deadlockResult.getStartTime());
log.info("路口发生死锁事件:crossId={},startTime={},endTime={},持续时长:{}ms",deadlockResult.getCrossId(), DateUtil.toDateTime(deadlockResult.getStartTime(),"yyyy-MM-dd HH:mm:ss"),DateUtil.toDateTime(deadlockResult.getEndTime(),"yyyy-MM-dd HH:mm:ss"),deadlockResult.getDuration());
collector.collect(deadlockResult);
valueState.update(deadlockResult);
context.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 7200000L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<CrossDeadlockResultModel> out) throws Exception {
valueState.clear();
}
}
package com.wanji.indicators.event.cross.deadlock.func;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.model.event.deadlock.DeadLockFrameModel;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.springframework.beans.BeanUtils;
import java.util.ArrayList;
import java.util.List;
/**
* 死锁事件--过滤出路口内轨迹
*/
public class DeadlockFlatMap implements FlatMapFunction<CrossFrameModel, DeadLockFrameModel> {
@Override
public void flatMap(CrossFrameModel frameModel, Collector<DeadLockFrameModel> collector) throws Exception {
List<CarTrackModel> trackList = frameModel.getTrackList();
List<DeadLockFrameModel.CarTrack> list = new ArrayList<>();
ConfigModel configModel = frameModel.getConfigModel();
if (configModel.getThreeColorAlarmIsOpen() == Constant.OPEN) {
for (CarTrackModel carTrack : trackList) {
if (/*carTrack.getObjectType() == Constant.VEHICLE
&& carTrack.getCrossing() == Constant.CROSSING
&& carTrack.getStopLineDist() < 0
&& carTrack.getAreaDist() > 8*/
true
) {
DeadLockFrameModel.CarTrack car = new DeadLockFrameModel.CarTrack();
BeanUtils.copyProperties(carTrack, car);
list.add(car);
}
}
DeadLockFrameModel model = new DeadLockFrameModel();
model.setCrossId(frameModel.getCrossId());
model.setTime(frameModel.getGlobalTimeStamp());
model.setTrackList(list);
model.setRidTurnLampList(frameModel.getRidTurnLampList());
model.setConfigModel(frameModel.getConfigModel());
collector.collect(model);
}
}
}
package com.wanji.indicators.event.cross.deadlock.func;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.constant.EventType;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.event.deadlock.CrossDeadLockStatusModel;
import com.wanji.indicators.model.event.deadlock.CrossDeadlockResultModel;
import com.wanji.indicators.model.event.deadlock.DeadLockFrameModel;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* 检测是否存在冲突运行的车辆
*/
public class DeadlockStopCheckKeyedProcess extends KeyedProcessFunction<String, DeadLockFrameModel, CrossDeadlockResultModel> {
private static final Logger log = LoggerFactory.getLogger(DeadlockStopCheckKeyedProcess.class);
private MapState<String, CrossDeadLockStatusModel> mapState;
private MapState<String, CrossDeadlockResultModel> resultMapState;
@Override
public void open(Configuration parameters) {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("mapStateDeadlock_stop_1", String.class, CrossDeadLockStatusModel.class));
resultMapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("mapStateDeadlock_result_1", String.class, CrossDeadlockResultModel.class));
}
@Override
public void processElement(DeadLockFrameModel deadLockFrameModel, Context context, Collector<CrossDeadlockResultModel> collector) throws Exception {
ConfigModel configModel = deadLockFrameModel.getConfigModel();
long nowTime = deadLockFrameModel.getTime();
List<DeadLockFrameModel.CarTrack> conflictTrackList = deadLockFrameModel.getTrackList();
//去重
conflictTrackList = conflictTrackList.stream().distinct().collect(Collectors.toList());
List<String> ridList = conflictTrackList.stream().map(DeadLockFrameModel.CarTrack::getRid).distinct().collect(Collectors.toList());
int allSize = conflictTrackList.size();
//停止车辆数
List<DeadLockFrameModel.CarTrack> stopList = conflictTrackList.stream().filter(o -> o.getSpeed() < configModel.getDeadlockStopCarSpeed()).collect(Collectors.toList());
double stopCarRate = stopList.size() / Double.valueOf(allSize);
if (ridList.size() > 1 && !stopList.isEmpty() && allSize > configModel.getDeadlockMinCarNums() && stopCarRate > configModel.getDeadlockStopCarRate()) {
//log.info(deadLockFrameModel.getCrossId() + "=>满足条件帧时间:" + DateUtil.toDateTime(nowTime, "yyyy-MM-dd HH:mm:ss.SSS"));
long duration = 0;
String effectRids = String.join(",",ridList);
CrossDeadLockStatusModel statusModel = mapState.get(deadLockFrameModel.getCrossId());
if (Objects.isNull(statusModel)) {
statusModel = new CrossDeadLockStatusModel();
statusModel.setCrossId(deadLockFrameModel.getCrossId());
statusModel.setStartTime(nowTime / 1000 * 1000);
statusModel.setEndTime(nowTime);
statusModel.setDuration(0);
statusModel.setTime(nowTime);
statusModel.setEffectRids(effectRids);
mapState.put(deadLockFrameModel.getCrossId(), statusModel);
} else {
duration = nowTime - statusModel.getStartTime();
statusModel.setDuration(duration);
statusModel.setEndTime(nowTime);
statusModel.setTime(nowTime);
mapState.put(deadLockFrameModel.getCrossId(), statusModel);
}
if (duration > configModel.getDeadlockStopCarDuration()) {
CrossDeadlockResultModel resultModel = new CrossDeadlockResultModel();
BeanUtils.copyProperties(statusModel, resultModel);
resultModel.setEventClass(EventType.EVENT.getName());
resultModel.setEventType(EventType.CROSS_DEADLOCK.getName());
resultModel.setTypeCode(EventType.CROSS_DEADLOCK.getCode());
resultModel.setRecoveryCross(stopList.get(0).getRecoveryCross());
resultModel.setDesc("死锁事件:路口内存在冲突方向车辆缓慢运行,疑似出现死锁现象");
resultModel.setDescEn("Deadlock event: there are vehicles running slowly in the conflicting direction at the intersection, which is suspected of deadlock");
CrossDeadlockResultModel disspModel = resultMapState.get(deadLockFrameModel.getCrossId());
if (disspModel == null) {
resultModel.setCheckOutTime(duration);
}else{
resultModel.setCheckOutTime(disspModel.getCheckOutTime());
}
resultModel.setDuration(duration);
//resultModel.setRid(stopList.get(0).getRid());
resultModel.setLocation(new double[]{stopList.get(0).getLng(), stopList.get(0).getLat()});
resultModel.setId(statusModel.getCrossId() + Constant.MARK + statusModel.getCrossId() + Constant.MARK + EventType.CROSS_DEADLOCK.getCode() + Constant.MARK + statusModel.getStartTime());
// List<String> xyList = conflictTrackList.stream().map(o->o.getLng()+","+o.getLat()).distinct().collect(Collectors.toList());
// String xys = String.join(";",xyList);
// Point point = GeomsConvertUtil.genGeometry(xys).getCentroid();
// resultModel.setLocation(new double[]{point.getX(),point.getY()});
resultMapState.put(resultModel.getCrossId(), resultModel);
collector.collect(resultModel);
//log.info("路口发生死锁事件:crossId={},startTime={},endTime={},持续时长:{},停车车辆数:{},总车辆数:{}",resultModel.getCrossId(),DateUtil.toDateTime(resultModel.getStartTime(),"yyyy-MM-dd HH:mm:ss"),DateUtil.toDateTime(resultModel.getEndTime(),"yyyy-MM-dd HH:mm:ss"),duration,stopList.size(),allSize);
} else {
dissp(collector, deadLockFrameModel, configModel, nowTime);
}
} else {//消散清理
//log.info(deadLockFrameModel.getCrossId() + "=>未满足条件帧时间:" + DateUtil.toDateTime(nowTime, "yyyy-MM-dd HH:mm:ss.SSS"));
dissp(collector, deadLockFrameModel, configModel, nowTime);
}
if (!mapState.isEmpty()) {
//缓存清理
Iterable<CrossDeadLockStatusModel> iterable = mapState.values();
List<CrossDeadLockStatusModel> list = StreamSupport.stream(iterable.spliterator(),false).collect(Collectors.toList());
list = list.stream().filter(o->(nowTime-o.getTime())>5000).collect(Collectors.toList());
//缓存超过5秒没有新数据,
for (CrossDeadLockStatusModel model : list){
mapState.remove(model.getCrossId());
}
}
}
//消散清理
private void dissp(Collector<CrossDeadlockResultModel> collector, DeadLockFrameModel deadLockFrameModel, ConfigModel configModel, long nowTime) throws Exception {
CrossDeadlockResultModel disspModel = resultMapState.get(deadLockFrameModel.getCrossId());
if (disspModel != null) {//消散清理
long disspDuration = nowTime - disspModel.getEndTime();
if (disspDuration > configModel.getDeadlockDissipationDuration()) {
mapState.remove(deadLockFrameModel.getCrossId());
resultMapState.remove(deadLockFrameModel.getCrossId());
disspModel.setIsDissipate(1);
collector.collect(disspModel);
//log.info("路口死锁事件消散,清理缓存,当前帧时间{},死锁事件终止i时间{}",DateUtil.toDateTime(nowTime,"yyyy-MM-dd HH:mm:ss"),DateUtil.toDateTime(disspModel.getEndTime(),"yyyy-MM-dd HH:mm:ss"));
}
} else {
CrossDeadLockStatusModel statusModel = mapState.get(deadLockFrameModel.getCrossId());
if (statusModel != null) {
long difTime = Math.abs(nowTime - statusModel.getEndTime());
//未满足持续时间清理状态时间阈值
if (difTime > configModel.getDeadlockStateUnsatisfiedDurationClear()) {
mapState.remove(deadLockFrameModel.getCrossId());
resultMapState.remove(deadLockFrameModel.getCrossId());
}
}
}
}
/**
* 获取key
*
* @param carTrack
* @return
*/
private static String getKey(DeadLockFrameModel.CarTrack carTrack) {
return carTrack.getCrossId() + Constant.MARK + carTrack.getTrackID();
}
}
package com.wanji.indicators.event.cross.stream;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.task.travelTime.service.MaxTravelTimeMainNew;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
public class CalculateTravelTimeMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
public static void main(String[] args) {
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60*1000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri")));
//env.setStateBackend(new FsStateBackend(properties.getProperty("check.point.uri")));
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// env.setStateBackend(
// new MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE * 100));
DateTime currentDateTime = new DateTime();
DateTime dateTime = currentDateTime.withMillisOfDay(0).minusHours(-4);
String topic = properties.getProperty("consumer.topic");
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic)
.setGroupId(properties.getProperty("consumer.group.id")+"_TRAVEL"+ UUID.randomUUID().toString())
/* 设置起始偏移量有以下几种情况
1.从指定的位置消费:OffsetsInitializer.offsets(Map<TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-car-track-source");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("轨迹帧数据-JsonToObject");
SingleOutputStreamOperator<CarTrackModel> carTrackModelStream =
frameModelStream.
flatMap(new CarTrackFlatMap())
.setParallelism(1)
.name("轨迹帧数据解析-ToCarTrackModel");
//筛选机动车类型数据
SingleOutputStreamOperator<CarTrackModel> filterStream =
carTrackModelStream.filter(new FilterFunction<CarTrackModel>() {
@Override
public boolean filter(CarTrackModel value) throws Exception {
String motorObjectType = Constant.MOTOR_TYPES;
String[] sps = motorObjectType.split(",");
List<String> typeList = Arrays.asList(sps);
String carType = value.getOriginalType().toString();
if (typeList.contains(carType)){
return true;
}
return false;
}
}).name("筛选机动车数据");
try {
//批量轨迹封装
MaxTravelTimeMainNew.init(env, "indexName").run(filterStream);
env.execute("旅行时间速度计算工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.event.cross.stream;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.task.track.service.CarRoutePathMainNew;
import com.wanji.indicators.task.track.service.TrackStoreMainNew;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class TrackAndRouteMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
public static void main(String[] args) {
//批量轨迹es index,逗号分割:esIndexName,windowTime
String indexName = args[0];
String indexName1 = args[1];
// ParameterTool parameter = ParameterTool.fromArgs(args);
// String indexName = parameter.get("Track_ES_IndexName");
// String indexName1 = parameter.get("Route_ES_IndexName");
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
String topic = properties.getProperty("consumer.topic");
if (args.length>2){
topic = args[2];
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60*1000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri")));
//env.setStateBackend(new FsStateBackend(properties.getProperty("check.point.uri")));
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// env.setStateBackend(
// new MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE * 100));
DateTime currentDateTime = new DateTime();
DateTime dateTime = currentDateTime.withMillisOfDay(0).minusHours(-4);
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic)
.setGroupId(properties.getProperty("consumer.group.id"))
/* 设置起始偏移量有以下几种情况
1.从指定的位置消费:OffsetsInitializer.offsets(Map<TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-car-track-source");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("轨迹帧数据-JsonToObject");
SingleOutputStreamOperator<CarTrackModel> carTrackModelStream =
frameModelStream.
flatMap(new CarTrackFlatMap())
.setParallelism(1)
.name("轨迹帧数据解析-ToCarTrackModel");
//筛选机动车类型数据
SingleOutputStreamOperator<CarTrackModel> filterStream =
carTrackModelStream.filter(new FilterFunction<CarTrackModel>() {
@Override
public boolean filter(CarTrackModel value) throws Exception {
String motorObjectType = Constant.MOTOR_TYPES;
String[] sps = motorObjectType.split(",");
List<String> typeList = Arrays.asList(sps);
String carType = value.getOriginalType().toString();
if (typeList.contains(carType)){
return true;
}
return false;
}
}).name("筛选机动车数据");
try {
//批量轨迹封装
TrackStoreMainNew.init(env, indexName).run(carTrackModelStream);
CarRoutePathMainNew.init(env, indexName1).run(carTrackModelStream);
env.execute("轨迹处理工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.event.stream;
import com.wanji.indicators.event.cross.conflictpoint.ConflictEventMain;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.source.ConsulConfigSource;
import com.wanji.indicators.source.debug.DebugSourceCarFrame;
import com.wanji.indicators.util.CommonUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 离线计算入口 com.mapabc.project.event.cross.stream.OfflineTestMain
* @author ke.han
*/
public class OfflineTestMain {
private static final Logger log = LoggerFactory.getLogger(OfflineTestMain.class);
public static void main(String[] args) {
//初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//加载配置项 并获取工作路径
String path = new CommonUtil().getWorkSpasePath();
String outPath = "d:/flink/out/";
String osName = System.getProperties().getProperty("os.name");
if (StringUtils.isNotBlank(osName) && osName.contains("Windows")) {
//此处是 Windows 配置
} else {
path = "/opt/flink/workspace/";
}
//读取离线数据
DataStreamSource<CrossFrameModel> streamSource = env.addSource(new DebugSourceCarFrame(path));
SingleOutputStreamOperator<CrossFrameModel> thinningStream = streamSource
.keyBy(CrossFrameModel::getCrossId)
.countWindow(1)
.maxBy("globalTimeStamp");
//离线调试模式
boolean isDebug = true;
ConflictEventMain.init(env,outPath,isDebug).run(thinningStream);
try {
env.execute("事故事件测试");
} catch (Exception e) {
log.error("事件测试 - 计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.event.stream;
import com.wanji.indicators.event.cross.conflictpoint.ConflictEventMain;
import com.wanji.indicators.event.stream.func.CrossFrameFlatMap;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class TrafficEventMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
public static void main(String[] args) {
//初始化配置文件
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
//初始化环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String topic = properties.getProperty("consumer.topic");
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic)
.setGroupId(properties.getProperty("consumer.group.id")+"-event")
/* 设置起始偏移量有以下几种情况
1.从指定的位置消费:OffsetsInitializer.offsets(Map<TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> streamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-car-track-source-event");
SingleOutputStreamOperator<CrossFrameModel> thinningDataStream =
streamSource
.flatMap(new CrossFrameFlatMap())
.setParallelism(1)
.name("全域轨迹帧数据-按路口分组输出");
//抽帧
SingleOutputStreamOperator<CrossFrameModel> thinningStream = thinningDataStream
.keyBy(CrossFrameModel::getCrossId)
.countWindow(3)
.maxBy("globalTimeStamp")
.name("数据抽帧");
//路口内冲突点计算
ConflictEventMain.init(env).run(thinningStream);
try {
env.execute("全域安全专题计算");
} catch (Exception e) {
log.error("全域安全专题计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.event.stream.func;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.*;
import com.wanji.indicators.model.event.confilct.EventCross;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.*;
public class CrossFrameFlatMap implements FlatMapFunction<String, CrossFrameModel> {
@Override
public void flatMap(String value, Collector<CrossFrameModel> out) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
long dataTime = jsonObject.getLongValue("globalTimeStamp");
JSONArray track = jsonObject.getJSONArray("e1FrameParticipant");
if (!track.isEmpty()) {
Map<String, CrossFrameModel> crossFrameModelMap = new HashMap<>();
for (Object obj : track) {
CarTrackModel carTrack = JSONObject.parseObject(obj.toString(), CarTrackModel.class);
String crossId = carTrack.getRoadnet().getCrossId();
CrossFrameModel crossFrameModel = crossFrameModelMap.get(crossId);
if (Objects.isNull(crossFrameModel)) {
crossFrameModel = new CrossFrameModel();
}
crossFrameModel.setCrossId(crossId);
crossFrameModel.setGlobalTimeStamp(dataTime);
crossFrameModel.getTrackList().add(carTrack);
crossFrameModelMap.put(crossId,crossFrameModel);
}
for (Map.Entry<String,CrossFrameModel> entry : crossFrameModelMap.entrySet()){
out.collect(entry.getValue());
}
}
}
/**
* 计算反向角度
* @param angle
* @return
*/
private static double getBackAngle(double angle) {
if (angle >= 180) {
return angle - 180;
} else {
return angle + 180;
}
}
private static EventCross toEventCross(CarTrackModel carTrack){
EventCross e = new EventCross();
e.setPlateColor(carTrack.getLicenseColor());
e.setVehicleColor(carTrack.getOriginalColor());
e.setTrackID(carTrack.getId());
e.setLng(carTrack.getLongitude());
e.setLat(carTrack.getLatitude());
e.setSpeed(carTrack.getSpeed());
e.setDriveAngle(carTrack.getCourseAngle());
e.setPlateNumber(carTrack.getPicLicense());
e.setVehicleType(carTrack.getOriginalType());
e.setTime(carTrack.getGlobalTimeStamp());
e.setAreaDist(carTrack.getWaitingTurnLaneDist());
e.setTravelTime((long) carTrack.getTravelTime());
return e;
}
}
package com.wanji.indicators.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.wanji.indicators.entity.BaseLaneInfo;
import java.util.List;
import java.util.Map;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface BaseLaneInfoMapper extends BaseMapper<BaseLaneInfo>{
/**
* 查询路口进口车道、出口车道信息
* 根据条件查询表t_base_lane_info信息
* @param baseLaneInfo
*/
List<BaseLaneInfo> findBaseLaneInfoByCondition(BaseLaneInfo baseLaneInfo);
/**
* 查询路口道路方向坐标、车道坐标,
* @param params
* @return
*/
public List<Map<String,Object>> findCrossLaneInfo(Map<String,Object> params);
List<Map<String,Object>> findCrossLaneInfo(Map<String,Object> params);
}
......
......@@ -5,12 +5,12 @@ import lombok.Data;
import java.io.Serializable;
/**
* @Author ke.han
* @Author fengyi
* @Date 2020/4/10 19:25
**/
@Data
public class BaseEventResultModel implements Serializable {
String id;
/**
* 路口id
*/
......
......@@ -24,12 +24,6 @@ public class CarTrackModel implements Serializable {
private Integer originalColor;
/*车辆类型*/
private Integer originalType;
/*车辆所有人*/
private String owner;
/*使用性质*/
private Integer function;
/*所有权*/
private Integer ownership;
/*路网数据*/
private RoadNet roadnet;
......@@ -60,11 +54,16 @@ public class CarTrackModel implements Serializable {
private String laneId;
/*渠化ID*/
private String segmentId;
/*是否在路口范围内*/
private int inCrossFlag;
/*车道功能转向*/
private Integer turn=0;
}
/***********扩展属性*********************************/
//路口内旅行时间,单位秒
private Integer travelTime;
private Integer travelTime=0;
//待转区车道长度,单位米
private Double waitingTurnLaneDist;
private Double waitingTurnLaneDist=0D;
}
......@@ -423,4 +423,5 @@ public class ConfigModel implements Serializable {
private long abnormalParkingSpecialTime;
}
package com.wanji.indicators.model;
import com.wanji.indicators.model.event.confilct.EventCross;
import lombok.Data;
import java.io.Serializable;
......@@ -20,4 +21,6 @@ public class ConflictPointModelBase extends BaseEventResultModel implements Seri
private int num;
private EventCross nearTackObj;
}
package com.wanji.indicators.model;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* 单路口帧数据
* @author fengyi
* @date 2023/3/1
* @description
*/
@Data
public class CrossFrameModel implements Serializable {
private String crossId;
private ConfigModel configModel = new ConfigModel();
//路口各进口道灯态数据
private List<CrossRidTurnLampStatusModel> ridTurnLampList = new ArrayList<>();
private String timeStamp;
private Long globalTimeStamp;
private Integer participantNum;
private String orgCode = "";
private List<CarTrackModel> trackList = new ArrayList<>();
}
package com.wanji.indicators.model;
import java.io.Serializable;
public class CrossRidTurnLampStatusModel implements Serializable {
private String rid;
private String lampState;
private int remainTime;
private String turn;
private String type;
public String getRid() {
return rid;
}
public void setRid(String rid) {
this.rid = rid;
}
public String getLampState() {
return lampState;
}
public void setLampState(String lampState) {
this.lampState = lampState;
}
public int getRemainTime() {
return remainTime;
}
public void setRemainTime(int remainTime) {
this.remainTime = remainTime;
}
public String getTurn() {
return turn;
}
public void setTurn(String turn) {
this.turn = turn;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "CrossRidTurnLampStatusModel{" +
"rid='" + rid + '\'' +
", lampState='" + lampState + '\'' +
", remainTime=" + remainTime +
", turn='" + turn + '\'' +
", type='" + type + '\'' +
'}';
}
}
package com.wanji.indicators.model;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* 全域帧数据
* @author fengyi
* @date 2023/3/1
* @description
......@@ -13,11 +16,16 @@ import java.util.List;
@Data
public class FrameModel implements Serializable {
private ConfigModel configModel = new ConfigModel();
//路口各进口道灯态数据
private List<CrossRidTurnLampStatusModel> ridTurnLampList = new ArrayList<>();
private String timeStamp;
private Long globalTimeStamp;
private Integer participantNum;
private String orgCode = "";
private List<CarTrackModel> e1FrameParticipant;
@JSONField(name="e1FrameParticipant")
private List<CarTrackModel> trackList;
}
......@@ -49,7 +49,7 @@ public class SingleCarTrackListModel implements Serializable {
@JSONField(format="yyyy-MM-dd HH:mm:ss.SSS")
private Date globalEndTime;
//路网数据
private Set<CarTrackModel.RoadNet> roadnets;
private Set<CarTrackModel.RoadNet> roadNets;
//private CarTrackCommonProperty staticProperty;
......
package com.wanji.indicators.model.event;
import java.io.Serializable;
/**
* 2022/2/15 15:08
*
* @auther ke.han
*/
public class ConfigModelDo implements Serializable {
private String crossId;
private int typeCode;
private String key;
private String value;
private int isOpen;
public String getCrossId() {
return crossId;
}
public void setCrossId(String crossId) {
this.crossId = crossId;
}
public int getTypeCode() {
return typeCode;
}
public void setTypeCode(int typeCode) {
this.typeCode = typeCode;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public int getIsOpen() {
return isOpen;
}
public void setIsOpen(int isOpen) {
this.isOpen = isOpen;
}
@Override
public String toString() {
return "ConfigModelDo{" +
"crossId='" + crossId + '\'' +
", typeCode=" + typeCode +
", key='" + key + '\'' +
", value='" + value + '\'' +
", isOpen=" + isOpen +
'}';
}
}
package com.wanji.indicators.model.event;
import com.wanji.indicators.util.PtInPolyUtil;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* 2022/1/19 15:53
*
* @auther ke.han
*/
@Data
public class CongestionAreaDo implements Serializable {
private String crossId;
private String rid;
private int type;
private int typeCode;
private int laneNum;
private String ridInArr;
private String wkt;
private List<PtInPolyUtil.Point> points;
}
package com.wanji.indicators.model.event;
import lombok.Data;
import java.io.Serializable;
@Data
public class CrossShapeDo implements Serializable {
private String rid_in;
private String rid_out;
private int turn_dir;
private String wkt;
private double angle_out;
private double angle_in;
}
package com.wanji.indicators.model.event;
import lombok.Data;
import java.io.Serializable;
@Data
public class CrossShapeModel implements Serializable {
private double length;
private double angle;
private double angleIn;
private String ridIn;
private String ridOut;
private double[] pointOutStart;
private int turnDir;
}
package com.wanji.indicators.model.event;
import lombok.Data;
import java.io.Serializable;
@Data
public class RidDirDo implements Serializable {
private String rid;
private String dir;
private Double angle;
}
package com.wanji.indicators.model.event;
import lombok.Data;
import java.io.Serializable;
@Data
public class WaitingAreaDo implements Serializable {
private String interId;
private String rid;
private String laneId;
private Integer waitingArea;
}
package com.wanji.indicators.model;
package com.wanji.indicators.model.event.confilct;
import lombok.Data;
......@@ -143,7 +143,8 @@ public class EventCross implements Serializable {
private String trackIdNear;
/*路网数据*/
private CarTrackModel.RoadNet roadnet;
//冲突临近对象
private EventCross nearTrackObj;
}
package com.wanji.indicators.model.event.congestion;
import lombok.Data;
import java.io.Serializable;
/**
* 2021/11/19 19:12
*
* @auther ke.han
*/
@Data
public class AccidentCongestionModel implements Serializable {
private String key;
private long startTime;
private long time;
private long duration;
private long traveTimeAvg;
private String ridOut;
private String ridOutDir;
private double lng;
private double lat;
private String recoveryCross;//设备名称
}
package com.wanji.indicators.model.event.congestion;
import com.wanji.indicators.util.PtInPolyUtil;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* 2022/1/19 15:53
*
* @auther ke.han
*/
@Data
public class CongestionAreaDo implements Serializable {
private String crossId;
private String rid;
private int type;
private int typeCode;
private int laneNum;
private String ridInArr;
private String wkt;
private List<PtInPolyUtil.Point> points;
}
package com.wanji.indicators.model.event.congestion;
import com.wanji.indicators.model.ConfigModel;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@Data
public class CongestionFrameModel implements Serializable {
private String crossId;
private String rid;
private long time;
private List<CarTrack> trackList;
private ConfigModel configModel;
private Map<String,Integer> congestionInfo;
public CongestionFrameModel() {
}
public CongestionFrameModel(String crossId, long time, List<CarTrack> trackList) {
this.crossId = crossId;
this.time = time;
this.trackList = trackList;
}
public CongestionFrameModel(String crossId, String rid, long time, List<CarTrack> trackList) {
this.crossId = crossId;
this.rid = rid;
this.time = time;
this.trackList = trackList;
}
@Data
public static class CarTrack implements Serializable {
//待转区长度 >> 待转区停止线距离入口车道停止线的距离
private int waitLaneDistance;
//是否拥堵
private boolean isCongestion;
//拥堵开始时间
private long congestionStartTime;
//拥堵结束时间
private long congestionEndTime;
//根据车道功能和进口rid匹配的出口rid
private String ridOut;
//出口rid方向
private String ridOutDir;
//路口当前rid方向行车距离长度
private double crossLength;
private String recoveryCross;//设备名称
/**
* 轨迹数据字段
*/
private String crossId;//: "10I4M0830K0"
private int crossing;// 1
private String lampState;// G
private int trackID;//: 408732
private int objectType;//: 1
private int vehicleColor;//: 4
private int vehicleType;//: 1
private String vehicleBrand;//: "\346\234\254\347\224\260"
private String vehicleSubbrand;//: "\351\233\205\351\230\201"
private String vehicleYearBrand;//: "2003,2004,2007,2008,2010,2011,2012,2013,2014"
private String entryLane;//: "10I6S082VM010I4M0830K0090012"
private String exitLane;//: "10I4M0830K010I360830D0010011"
private String matchLane;//: "10I4M0830K010I360830D0020011"
private String matchLaneName;//: "\350\245\277 1"
private String laneFunc;
private String rid;
private double driveAngle;//: 262.88
private double lat;//: 26.52352862096375
private double lng;//: 106.71488115347212
private int plateColor;//: 6
private String plateNumber;//: "\350\264\265CEA922"
private int plateType;//: 2
private double speed;//: 49.37
private double at;//: -0.09
private double areaDist;//: 61.68
private double stopLineDist;//: -1.0
private double delayTime;//: 0.72
private double travelTime;//: 10.1
private long time;//: 1617279936374
private String plateBgColor;//: "#0000ff"
private String plateTextColor;//: "#ffffff"
}
@Override
public String toString() {
return "FrameModel{" +
"crossId='" + crossId + '\'' +
", time=" + time +
'}';
}
}
package com.wanji.indicators.model.event.congestion;
import com.wanji.indicators.model.ConfigModel;
import lombok.Data;
import java.io.Serializable;
/**
* 2022/1/11 20:36
*
* @auther ke.han
*/
@Data
public class CongestionModel implements Serializable {
private String crossId;
private String ridIn;
private String ridOut;
private String direction;
private long startTime;
private long endTime;
private long duration;
private double lng;
private double lat;
private int eventState;
private int isDissipation;
private ConfigModel configModel;
private String recoveryCross;//设备名称
}
package com.wanji.indicators.model.event.congestion;
import com.wanji.indicators.model.BaseEventResultModel;
import lombok.Data;
import java.io.Serializable;
@Data
public class CongestionOverflowModel extends BaseEventResultModel implements Serializable {
private long duration;
private int eventState;
private String dir;
private String ridOut;
}
package com.wanji.indicators.model.event.congestion;
import lombok.Data;
import java.io.Serializable;
@Data
public class CrossShapeDo implements Serializable {
private String rid_in;
private String rid_out;
private int turn_dir;
private String wkt;
private double angle_out;
private double angle_in;
}
package com.wanji.indicators.model.event.congestion;
import lombok.Data;
import java.io.Serializable;
@Data
public class CrossShapeModel implements Serializable {
private double length;
private double angle;
private double angleIn;
private String ridIn;
private String ridOut;
private double[] pointOutStart;
private int turnDir;
}
package com.wanji.indicators.model.event.congestion;
import lombok.Data;
import java.io.Serializable;
/**
* 2022/1/12 3:02
*
* @auther ke.han
*/
@Data
public class IsCongestionModel implements Serializable {
private double lng;
private double lat;
private String ridOut;
private String ridOutDir;
private boolean isCongestion;
}
package com.wanji.indicators.model.event.deadlock;
import lombok.Data;
import java.io.Serializable;
@Data
public class CrossDeadLockStatusModel implements Serializable {
private String crossId;
private String rid;
private long time;
/**
* 事件开始事件
*/
private long startTime;
/**
* 事件结束事件
*/
private long endTime;
/**
* 持续事件
*/
private long duration;
/**
* 坐标
*/
private double [] location;
/**
* 唯一id
*/
private String id;
/**
* 三色预警状态
*/
private int eventState;
/**
* 影响的rid,多个以逗号分割
*/
private String effectRids="";
}
package com.wanji.indicators.model.event.deadlock;
import lombok.Data;
import java.io.Serializable;
@Data
public class CrossDeadlockResultModel extends CrossDeadLockStatusModel implements Serializable {
/**
* 事件标识
*/
private String eventClass;
/**
* 事件类型
*/
private String eventType;
/**
* 事件类型编码
*/
private int typeCode;
/**
* 事件信息
*/
private String desc;
/**
* 事件信息 - 英文
*/
private String descEn;
/**
* 事件检出时间
*/
private long checkOutTime;
/**
* 路口名称
*/
private String recoveryCross;
/**
* 是否消散 0否 1是
*/
private int isDissipate;
}
package com.wanji.indicators.model.event.deadlock;
import com.wanji.indicators.model.ConfigModel;
import com.wanji.indicators.model.CrossRidTurnLampStatusModel;
import lombok.Data;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 2021/11/9 14:53
*
* @auther ke.han
*/
@Data
public class DeadLockFrameModel implements Serializable {
private int accidentType;
private String crossId;
private String rid;
private long time;
private List<CarTrack> trackList;
private List<CarTrack> conflictTrackList;
private Map<String,Integer> congestionInfo;
private ConfigModel configModel;
//路口各进口道灯态数据
private List<CrossRidTurnLampStatusModel> ridTurnLampList = new ArrayList<>();
//冲突相位信息
private Map<String, List<Map<String, Object>>> conflictSlotMap = new HashMap<>();
public DeadLockFrameModel() {
}
public DeadLockFrameModel(String crossId, long time, List<CarTrack> trackList) {
this.crossId = crossId;
this.time = time;
this.trackList = trackList;
}
public DeadLockFrameModel(String crossId, String rid, long time, List<CarTrack> trackList) {
this.crossId = crossId;
this.rid = rid;
this.time = time;
this.trackList = trackList;
}
@Data
public static class CarTrack implements Serializable {
/**
* 辅助计算字段
*/
//当前帧是否停车 true停车 false行驶
private boolean isStop;
//停车持续时间 单位毫秒
private long stopDuration;
//停车开始时间 时间戳
private long stopTime;
//车辆绕行累计持续时间
private long detourDuration;
//绕行车辆数
private int detourNum;
//待转区长度 >> 待转区停止线距离入口车道停止线的距离
private int waitLaneDistance;
//是否拥堵
private boolean isCongestion;
//拥堵开始时间
private long congestionStartTime;
//拥堵结束时间
private long congestionEndTime;
//拥堵区域出口rid
private String congestionOutRid;
//是否紧急制动
private boolean isEmergencyBraking;
//紧急制动最大加速度
private double emergencyBrakingAt;
//紧急制动车速
private double emergencyBrakingSpeed;
//ttc碰撞预警
private boolean isTTC;
//ttc碰撞预警时的车速
private double ttcSpeed;
//ttc检测到的相交车辆trackId
private int ttcTrackId;
//事故车辆组合后匹配的最近距离trackId
private int accidentTrackId;
//事故车辆组合后与其他车辆的距离
private double accidentDis;
//停车轨迹稳定持续时间
private long stopStableDuration;
//停车时间阈值
private long checkTime;
//是否绿灯停车
private boolean isGreenStop;
//绿灯停车持续时间
private long greenStopDuration;
//绿灯停车开始时间
private long greenStopStartTime;
//分组key = crossId + trackId
private String key;
//key:frid value:右转出口道可进入的其他方向进口道,格式:rid;turn_dir_no
private Map<String,String> ridTurnRefMap = new HashMap<>();
//三色预警蓝色预警开关
private String isOpenBlue;
/**
* 轨迹数据字段
*/
private String crossId;//: "10I4M0830K0"
private int crossing;// 1
private String lampState;// G
private int trackID;//: 408732
private int objectType;//: 1
private int vehicleColor;//: 4
private int vehicleType;//: 1
private String vehicleBrand;//: "\346\234\254\347\224\260"
private String vehicleSubbrand;//: "\351\233\205\351\230\201"
private String vehicleYearBrand;//: "2003,2004,2007,2008,2010,2011,2012,2013,2014"
private String entryLane;//: "10I6S082VM010I4M0830K0090012"
private String exitLane;//: "10I4M0830K010I360830D0010011"
private String matchLane;//: "10I4M0830K010I360830D0020011"
private String matchLaneName;//: "\350\245\277 1"
private String laneFunc;
private String rid;
private double driveAngle;//: 262.88
private double lat;//: 26.52352862096375
private double lng;//: 106.71488115347212
private int plateColor;//: 6
private String plateNumber;//: "\350\264\265CEA922"
private int plateType;//: 2
private double speed;//: 49.37
private double at;//: -0.09
private double areaDist;//: 61.68
private double stopLineDist;//: -1.0
private double delayTime;//: 0.72
private double travelTime;//: 10.1
private long time;//: 1617279936374
private String plateBgColor;//: "#0000ff"
private String plateTextColor;//: "#ffffff
/**
* 路口名称
*/
private String recoveryCross;
}
@Override
public String toString() {
return "FrameModel{" +
"crossId='" + crossId + '\'' +
", time=" + time +
'}';
}
}
......@@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.core.metadata.IPage;
import java.util.List;
import java.util.Map;
/**
* <p>
......@@ -17,4 +18,6 @@ import java.util.List;
public interface BaseLaneInfoService extends IService<BaseLaneInfo> {
public List<BaseLaneInfo> findLaneRidInfo();
public Map<String,BaseLaneInfo> findLaneInfo(String crossId);
}
package com.wanji.indicators.service.impl;
import com.vividsolutions.jts.geom.LineString;
import com.vividsolutions.jts.geom.*;
import com.wanji.indicators.entity.BaseLaneInfo;
import com.wanji.indicators.entity.BaseRidInfo;
import com.wanji.indicators.mapper.BaseLaneInfoMapper;
......@@ -9,10 +9,14 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.wanji.indicators.util.GeomsConvertUtil;
import com.wanji.indicators.util.Tools;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.List;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
/**
* <p>
......@@ -25,7 +29,7 @@ import java.util.List;
@Service
public class BaseLaneInfoServiceImpl extends ServiceImpl<BaseLaneInfoMapper, BaseLaneInfo> implements BaseLaneInfoService {
@Autowired
@Resource
private BaseLaneInfoMapper baseLaneInfoMapper;
@Override
......@@ -39,4 +43,32 @@ public class BaseLaneInfoServiceImpl extends ServiceImpl<BaseLaneInfoMapper, Bas
}
return list;
}
@Override
public Map<String,BaseLaneInfo> findLaneInfo(String crossList) {
String[] arr = StringUtils.split(crossList,",");
List<String> crosslist = Arrays.asList(arr);
Map<String, Object> params = new HashMap<>();
params.put("crossList", crosslist);
params.put("type",2);
List<Map<String, Object>> list = this.baseLaneInfoMapper.findCrossLaneInfo(params);
// t.cross_id,t.road_id,t.rid,t.type,t.dir,count(*) lane_num,t.road_wkt
Map<String,BaseLaneInfo> laneInfoMap = new HashMap<>();
for (Map<String, Object> dataMap : list) {
String crossId = Tools.getMapValue("cross_id", dataMap);
String rid = Tools.getMapValue("rid", dataMap);
String laneId = Tools.getMapValue("lane_id", dataMap);
String turn = Tools.getMapValue("turn", dataMap);
BaseLaneInfo baseLaneInfo = new BaseLaneInfo();
baseLaneInfo.setCrossId(crossId);
baseLaneInfo.setRid(rid);
baseLaneInfo.setId(laneId);
baseLaneInfo.setTurn(Integer.valueOf(turn));
laneInfoMap.put(laneId,baseLaneInfo);
}
return laneInfoMap ;
}
}
......@@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
/**
......@@ -26,7 +27,7 @@ import java.util.List;
@Service
public class BaseRidInfoServiceImpl extends ServiceImpl<BaseRidInfoMapper, BaseRidInfo> implements BaseRidInfoService {
@Autowired
@Resource
private BaseRidInfoMapper baseRidInfoMapper;
@Override
......
package com.wanji.indicators.sink;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.BaseEventResultModel;
import com.wanji.indicators.util.FileUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventFileSink implements SinkFunction<BaseEventResultModel> {
private static final Logger log = LoggerFactory.getLogger(EventFileSink.class);
private String path;
public EventFileSink(String path) {
this.path = path;
}
@Override
public void invoke(BaseEventResultModel accidentShowModel, Context context) {
if(StringUtils.isNotBlank(path)){
String eventType = accidentShowModel.getEventType();
String plateNumber = accidentShowModel.getPlateNumber();
String timeStr = DateFormatUtils.format(accidentShowModel.getStartTime(),"yyyyMMdd_HHmmss");
String timeStrs = DateFormatUtils.format(accidentShowModel.getStartTime(),"yyyy-MM-dd HH:mm:ss");
FileUtil.writeApend(path + "event_" + eventType + "_" + plateNumber + "_" + timeStr + ".json", JSONObject.toJSONString(accidentShowModel));
log.info("\n{********** The result has been written to the file ********** ==> " + path +
"event_" + eventType + "_" + plateNumber + "_" + timeStrs + ".json}\n");
}
}
}
package com.wanji.indicators.source;
import com.wanji.indicators.constant.Constant;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* 查询 rid 转向数据 对应的冲突转向
*/
public class ConflictTurnSource extends RichParallelSourceFunction<Map<String, List<Map<String, Object>>>> {
private static final Logger log = LoggerFactory.getLogger(ConflictTurnSource.class);
private volatile boolean running = true;
public void open(Configuration parameters) throws Exception {
}
public static void main(String[] args) {
String s = getQuerySql();
System.out.println(s);
}
private static String getQuerySql() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("SELECT t1.inter_id,t1.f_rid,t1.t_rid,t1.turn_dir_no as f_turn_dir_no,t2.clash_f_rid,t2.turn_dir_no as t_turn_dir_no ");
stringBuilder.append("FROM ( ");
stringBuilder.append("SELECT b.inter_id,a.f_rid,a.t_rid,b.turn_dir_no,a.clash_f_rid,a.clash_t_rid,a.rr_id,a.clash_rr_id ");
stringBuilder.append("FROM gis_turn_clash a JOIN cross_relation b ");
stringBuilder.append("on a.rr_id=b.rr_id and b.turn_dir_no !=4 ");
stringBuilder.append(") t1 JOIN ( ");
stringBuilder.append("SELECT b.inter_id,a.f_rid,a.t_rid,b.turn_dir_no,a.clash_f_rid,a.clash_t_rid,a.rr_id,a.clash_rr_id ");
stringBuilder.append("FROM gis_turn_clash a JOIN cross_relation b ");
stringBuilder.append("on a.clash_rr_id=b.rr_id and a.f_rid != a.clash_f_rid and b.turn_dir_no !=4 ");
stringBuilder.append(") t2 ON t1.f_rid = t2.f_rid and t1.t_rid=t2.t_rid and t1.clash_f_rid=t2.clash_f_rid and t1.clash_t_rid=t2.clash_t_rid ");
stringBuilder.append("AND t1.inter_id=t2.inter_id ");
stringBuilder.append("JOIN (SELECT DISTINCT inter_id from lane_obj where data_source='2' and inter_id is not null) t3 ");
stringBuilder.append("ON t1.inter_id=t3.inter_id and t2.inter_id=t3.inter_id");
return stringBuilder.toString();
}
@Override
public void run(SourceContext<Map<String, List<Map<String, Object>>>> sourceContext) throws Exception {
//key:rid->turn value:冲突rid及转向
Map<String, List<Map<String, Object>>> retMap = new HashMap<>();
try {
List<HashMap> dataList = null;//new JdbcUtil().excuteQueryToList(HashMap.class, getQuerySql());
if (dataList != null) {
retMap.clear();
for (HashMap<String, Object> map : dataList) {
String crossId = map.get("inter_id").toString();
String rid = map.get("f_rid").toString();
String f_turn_dir_no = map.get("f_turn_dir_no").toString();
String clash_f_rid = map.get("clash_f_rid").toString();//冲突方向rid
String t_turn_dir_no = map.get("t_turn_dir_no").toString();//冲突方向转向
String key = rid + Constant.MARK + f_turn_dir_no + Constant.MARK + crossId;
List<Map<String, Object>> tmplist = retMap.get(key);
if (Objects.isNull(tmplist)) {
tmplist = new ArrayList<>();
}
Map<String, Object> conflictTurnMap = new HashMap<>();
conflictTurnMap.put("f_rid", clash_f_rid);
conflictTurnMap.put("turn_dir_no", t_turn_dir_no);
tmplist.add(conflictTurnMap);
retMap.put(key, tmplist);
sourceContext.collect(retMap);
}
}
} catch (Exception e) {
log.error("查询 rid 冲突信息数据异常: " + e.getMessage(), e);
}
}
@Override
public void cancel() {
running = false;
}
/**
* 理论冲突相位数据,
* 相位标号定义 以北方向为基准定义 1:北直行相位 2:东直行相位 3:南直行相位 4:西直行相位 5:北左转相位 6:东左转 7:南左转 8:西左转
*/
public Map<Integer, List<Integer>> createConflictSlots() {
Map<Integer, List<Integer>> maplist = new HashMap<Integer, List<Integer>>();
for (int i = 1; i < 9; i++) {
Integer sidewalkDir = null;
List<Integer> conflictList = maplist.get(i);
if (Objects.isNull(conflictList)) {
conflictList = new ArrayList<>();
}
if (i < 5) { // 直行相位
// 直行相位的冲突相位点
// 顺时针直行相位
int clockwise = (i + 1) % 4 == 0 ? 4 : (i + 1) % 4;
// 逆时针直行相位
int antiClockWise = (i - 1) % 4 == 0 ? 4 : (i - 1) % 4;
// 对向左转冲突点
int acrossLeft = ((i + 2) % 4 == 0 ? 4 : (i + 2) % 4) + 4;
// 逆时针左转冲突相位点
int antiLeft = ((i - 1) % 4 == 0 ? 4 : (i - 1) % 4) + 4;
conflictList.add(clockwise);
conflictList.add(antiClockWise);
conflictList.add(acrossLeft);
conflictList.add(antiLeft);
} else {// 左转相位冲突相位
// 顺时针直行相位
int antiClockWiseStraight = (i + 1) % 4 == 0 ? 4 : (i + 1) % 4;
// 对向直行冲突点
int acrossStraight = (i + 2) % 4 == 0 ? 4 : (i + 2) % 4;
// 顺时针左转相位
int clockwiseLeft = ((i + 1) % 4 == 0 ? 4 : (i + 1) % 4) + 4;
// 逆时针左转相位
int antiClockWiseLeft = ((i - 1) % 4 == 0 ? 4 : (i - 1) % 4) + 4;
conflictList.add(antiClockWiseStraight);
conflictList.add(acrossStraight);
conflictList.add(clockwiseLeft);
conflictList.add(antiClockWiseLeft);
}
maplist.put(i, conflictList);
}
return maplist;
}
// private static String getQuerySql1() {
// StringBuilder stringBuilder = new StringBuilder();
// stringBuilder.append("SELECT a.inter_id,a.f_rid,a.turn_dir_no,b.dir_4_no, ");
// stringBuilder.append("( ");
// stringBuilder.append("CASE ");
// stringBuilder.append("WHEN b.dir_4_no=3 and turn_dir_no=2 THEN 1 ");
// stringBuilder.append("WHEN b.dir_4_no=3 and turn_dir_no=1 THEN 5 ");
// stringBuilder.append("WHEN b.dir_4_no=4 and turn_dir_no=2 THEN 2 ");
// stringBuilder.append("WHEN b.dir_4_no=4 and turn_dir_no=1 THEN 6 ");
// stringBuilder.append("WHEN b.dir_4_no=1 and turn_dir_no=2 THEN 3 ");
// stringBuilder.append("WHEN b.dir_4_no=1 and turn_dir_no=1 THEN 7 ");
// stringBuilder.append("WHEN b.dir_4_no=2 and turn_dir_no=2 THEN 4 ");
// stringBuilder.append("WHEN b.dir_4_no=2 and turn_dir_no=1 THEN 8 ");
// stringBuilder.append("END ");
// stringBuilder.append(") as slot_no ");
// stringBuilder.append("FROM cross_relation a ");
// stringBuilder.append("JOIN rid b on a.f_rid=b.rid ");
// stringBuilder.append("WHERE turn_dir_no not in (3,4)");
// return stringBuilder.toString();
// }
// public void run1(SourceContext<Map<String, List<Map<String, Object>>>> sourceContext) throws Exception {
// //key:rid->turn value:冲突rid及转向
// Map<String, List<Map<String, Object>>> retMap = new HashMap<>();
// try {
// List<HashMap> dataList = new JdbcUtil().excuteQueryToList(HashMap.class, getQuerySql1());
// Map<Integer, List<Integer>> baseConflictInfo = createConflictSlots();
//
// if (dataList != null && !dataList.isEmpty()) {
// retMap.clear();
// for (HashMap<String, Object> map : dataList) {
// String inter_id = map.get("inter_id").toString();
// String rid = map.get("f_rid").toString();
// String turn_dir_no = map.get("turn_dir_no").toString();
//
// //当前相位编号
// Integer slot_no = Integer.valueOf(map.get("slot_no").toString());
// //冲突相位
// List<Integer> conflictSlot = baseConflictInfo.get(slot_no);
// //冲突相位关联rid等信息
// List<HashMap<String, Object>> conflictSlotList = dataList.stream().filter(m ->Objects.equals(inter_id,m.get("inter_id")) && conflictSlot.contains(Integer.valueOf(m.get("slot_no").toString()))).collect(Collectors.toList());
// if (!conflictSlotList.isEmpty()) {
// String key = rid + Constant.MARK + turn_dir_no;
// List<Map<String, Object>> tmplist = retMap.get(key);
// if (Objects.isNull(tmplist)) {
// tmplist = new ArrayList<>();
// }
// tmplist.addAll(conflictSlotList);
// retMap.put(key, tmplist);
// sourceContext.collect(retMap);
// }
// }
//
//
// }
// } catch (Exception e) {
// log.error("查询 rid 冲突信息数据异常: " + e.getMessage(), e);
// }
// }
}
package com.wanji.indicators.source;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.event.CrossShapeDo;
import com.wanji.indicators.model.event.CrossShapeModel;
import com.wanji.indicators.util.PtInPolyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
/**
* 查询 rid 转向数据
*/
public class CrossShapeSource extends RichParallelSourceFunction<HashMap<String, CrossShapeModel>> {
private static final Logger log = LoggerFactory.getLogger(CrossShapeSource.class);
private volatile boolean running = true;
@Override
public void run(SourceContext<HashMap<String, CrossShapeModel>> sourceContext) throws Exception {
HashMap<String, CrossShapeModel> outMap = new HashMap<>();
try {
if (running) {
String sql = "SELECT aa.rid_in,aa.rid_out,aa.turn_dir,aa.wkt,bb.angle AS angle_out,cc.angle AS angle_in " +
" FROM cross_turn_line aa " +
" LEFT JOIN rid bb ON aa.rid_out = bb.rid " +
" LEFT JOIN rid cc ON aa.rid_in = cc.rid " +
" WHERE aa.lane_num > 0";
List<CrossShapeDo> ridLaneNumDos = null;//new JdbcUtil().excuteQueryToList(CrossShapeDo.class, sql);
if (ridLaneNumDos != null && !ridLaneNumDos.isEmpty()) {
outMap.clear();
for (CrossShapeDo data : ridLaneNumDos) {
String rid_in = data.getRid_in();
String wkt = data.getWkt();
int turn_dir = data.getTurn_dir();
String rid_out = data.getRid_out();
double angle_out = data.getAngle_out();
double angle_in = data.getAngle_in();
CrossShapeModel m = new CrossShapeModel();
m.setAngle(angle_out);
m.setAngleIn(angle_in);
m.setRidIn(rid_in);
m.setRidOut(rid_out);
m.setTurnDir(turn_dir);
if (StringUtils.isNotBlank(wkt)) {
String[] wktArr = wkt.split(";");
String[] firstPoint = wktArr[0].split(",");
String[] lastPoint = wktArr[wktArr.length - 1].split(",");
double distance = PtInPolyUtil.getDistance(Double.parseDouble(firstPoint[0]), Double.parseDouble(firstPoint[1]), Double.parseDouble(lastPoint[0]), Double.parseDouble(lastPoint[1]));
m.setLength(distance);
m.setPointOutStart(new double[]{Double.parseDouble(lastPoint[0]), Double.parseDouble(lastPoint[1])});
}
outMap.put(rid_in + Constant.MARK + turn_dir, m);
outMap.put(rid_out + Constant.MARK + turn_dir, m);
}
}
if (outMap.size() > 0) {
sourceContext.collect(outMap);
}
// Thread.sleep(1000*30);
}
} catch (Exception e) {
log.error("查询 rid 转向数据异常: " + e.getMessage(), e);
}
}
@Override
public void cancel() {
running = false;
}
}
set table.exec.sink.not-null-enforcer=drop;
-- cron:0 0 */1 * * ?
-- 任务名称:EVENT_BIG_CATEGORY_STATISTIC
-- 任务描述:事件大类统计
-- 输出数据表:t_analysis_event_trend_hour、t_analysis_event_trend_day
-- 依赖包:flink-connector-jdbc_2.11-1.14.3.jar、mysql-connector-java-8.0.21.jar
set table.exec.sink.not-null-enforcer=drop;
CREATE TABLE t_event_info(
oid BIGINT NOT NULL,
type STRING,
......@@ -72,6 +77,7 @@ CREATE TABLE t_analysis_event_trend_hour (
'username' = 'root',
'password' = 'Wanji300552'
);
insert into t_analysis_event_trend_hour(category,number,window_start_time,window_end_time,granularity)
SELECT event_category,SUM(event_number),window_start_time,window_end_time,'1h'
FROM t_analysis_event
......
-- Flink CDC 参考:https://blog.csdn.net/qq_44326412/article/details/127064981
SET 'execution.checkpointing.interval' = '30s';
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment