Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
W
wj-datacenter-platform
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
jinan
wj-datacenter-platform
Commits
d51c2793
Commit
d51c2793
authored
Nov 03, 2023
by
xululu@wanji.net.cn
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
交通感知指标计算:拥堵、溢出、失衡、死锁和相位空放
parent
dc2df39a
Changes
27
Show whitespace changes
Inline
Side-by-side
Showing
27 changed files
with
1713 additions
and
3 deletions
+1713
-3
pom.xml
wj-realtime-computing/pom.xml
+7
-2
CarTrackModel.java
...c/main/java/com/wanji/indicators/model/CarTrackModel.java
+4
-0
CarTrajectoryAnalysisMain.java
...indicators/task/trajectory/CarTrajectoryAnalysisMain.java
+122
-0
CarTrajectoryIndexMain.java
...ji/indicators/task/trajectory/CarTrajectoryIndexMain.java
+569
-0
Test2.java
...main/java/com/wanji/indicators/task/trajectory/Test2.java
+16
-0
CarDataValidatorFlatMap.java
...icators/task/trajectory/func/CarDataValidatorFlatMap.java
+93
-0
CarRecordInfoFlatMap.java
...indicators/task/trajectory/func/CarRecordInfoFlatMap.java
+40
-0
CrossRoadLightStatusCoFlatMap.java
...s/task/trajectory/func/CrossRoadLightStatusCoFlatMap.java
+76
-0
ProcessCarInfo.java
...wanji/indicators/task/trajectory/func/ProcessCarInfo.java
+29
-0
ProcessCarTrace.java
...anji/indicators/task/trajectory/func/ProcessCarTrace.java
+10
-0
ProcessOverFlowFunction.java
...icators/task/trajectory/func/ProcessOverFlowFunction.java
+48
-0
VehicleGapTimeProcessFunction.java
...s/task/trajectory/func/VehicleGapTimeProcessFunction.java
+196
-0
FileReadingHelper.java
.../indicators/task/trajectory/helper/FileReadingHelper.java
+37
-0
KeySelectorByPlateAndRid.java
...icators/task/trajectory/key/KeySelectorByPlateAndRid.java
+18
-0
CarRecordInfo.java
.../wanji/indicators/task/trajectory/pojo/CarRecordInfo.java
+30
-0
CarTrackInCrossRoadFlatMap.java
...tors/task/trajectory/pojo/CarTrackInCrossRoadFlatMap.java
+73
-0
CarTrackModelInNorthAndSouth.java
...rs/task/trajectory/pojo/CarTrackModelInNorthAndSouth.java
+55
-0
CrossExitInfo.java
.../wanji/indicators/task/trajectory/pojo/CrossExitInfo.java
+17
-0
CrossRoadLightStatusModel.java
...ators/task/trajectory/pojo/CrossRoadLightStatusModel.java
+45
-0
OverFlowDetail.java
...wanji/indicators/task/trajectory/pojo/OverFlowDetail.java
+21
-0
OverFlowIndexResult.java
.../indicators/task/trajectory/pojo/OverFlowIndexResult.java
+22
-0
RidIndexResultOfEastAndWest.java
...ors/task/trajectory/pojo/RidIndexResultOfEastAndWest.java
+24
-0
SingleCarInfo.java
.../wanji/indicators/task/trajectory/pojo/SingleCarInfo.java
+37
-0
UnbalanceResult.java
...anji/indicators/task/trajectory/pojo/UnbalanceResult.java
+21
-0
GeomsConvertUtil.java
...main/java/com/wanji/indicators/util/GeomsConvertUtil.java
+60
-0
config.properties
wj-realtime-computing/src/main/resources/config.properties
+29
-1
cross_exit_info.txt
...ime-computing/src/main/resources/data/cross_exit_info.txt
+14
-0
No files found.
wj-realtime-computing/pom.xml
View file @
d51c2793
...
...
@@ -26,7 +26,7 @@
<!--<jar.tail>export</jar.tail>-->
<!--<jar.tail>export</jar.tail>-->
<!--<jar.tail>monitor</jar.tail>-->
<jar.tail>
area
</jar.tail>
<jar.tail>
test
</jar.tail>
<!--<jar.tail>event</jar.tail>-->
<!--<jar.tail>period</jar.tail>-->
</properties>
...
...
@@ -176,7 +176,12 @@
<version>0.9.0</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-connector-jdbc_2.11
</artifactId>
<version>
1.14.3
</version>
</dependency>
<dependency>
...
...
wj-realtime-computing/src/main/java/com/wanji/indicators/model/CarTrackModel.java
View file @
d51c2793
...
...
@@ -44,6 +44,9 @@ public class CarTrackModel implements Serializable {
/*航向角*/
private
Double
courseAngle
;
//是否位于指定区域内 默认0:不在指定区域内 1:位于指定区域内
private
int
isInSpecificArea
=
0
;
/*******************************************/
@Data
...
...
@@ -60,6 +63,7 @@ public class CarTrackModel implements Serializable {
/*是否在路口范围内*/
private
int
inCrossFlag
;
/*车道功能转向*/
//车道转向:1左转;2直行;3右转;4掉头;5直左;6直右;7左直右;8左右;9左转掉头;10直行掉头;11右转掉头;12左直掉头;13直右掉头;14左直右掉头;15左右掉头
private
Integer
turn
=
0
;
/*车道终点坐标:x,y*/
private
String
laneEndPoint
;
...
...
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/CarTrajectoryAnalysisMain.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
;
import
com.wanji.indicators.model.FrameModel
;
import
com.wanji.indicators.task.track.service.func.FrameFlatMap
;
import
com.wanji.indicators.task.trajectory.func.CarRecordInfoFlatMap
;
import
com.wanji.indicators.task.trajectory.func.ProcessCarInfo
;
import
com.wanji.indicators.task.trajectory.pojo.CarRecordInfo
;
import
com.wanji.indicators.util.PropertiesHelper
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.connector.jdbc.JdbcConnectionOptions
;
import
org.apache.flink.connector.jdbc.JdbcSink
;
import
org.apache.flink.connector.jdbc.JdbcStatementBuilder
;
import
org.apache.flink.connector.kafka.source.KafkaSource
;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
;
import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
;
import
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.CheckpointConfig
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
;
import
org.apache.flink.streaming.api.windowing.time.Time
;
import
org.apache.kafka.clients.consumer.OffsetResetStrategy
;
import
org.apache.kafka.common.serialization.StringDeserializer
;
import
java.sql.Date
;
import
java.sql.PreparedStatement
;
import
java.sql.SQLException
;
import
java.util.Properties
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/24 15:00
* @Description :
*/
public
class
CarTrajectoryAnalysisMain
{
public
static
void
main
(
String
[]
args
)
{
try
{
PropertiesHelper
instance
=
PropertiesHelper
.
getInstance
();
Properties
properties
=
instance
.
getProperties
();
//获取配置文件中的kafka消费topic
String
topic
=
properties
.
getProperty
(
"consumer.topic"
);
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
//配置检查点机制
env
.
enableCheckpointing
(
60
*
1000
);
env
.
getCheckpointConfig
().
setTolerableCheckpointFailureNumber
(
3
);
env
.
getCheckpointConfig
().
setCheckpointTimeout
(
10
*
60
*
1000
);
env
.
getCheckpointConfig
().
setCheckpointStorage
(
new
FileSystemCheckpointStorage
(
properties
.
getProperty
(
"check.point.uri"
)));
env
.
getCheckpointConfig
().
setExternalizedCheckpointCleanup
(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.
DELETE_ON_CANCELLATION
);
KafkaSource
<
String
>
source
=
KafkaSource
.<
String
>
builder
()
.
setProperties
(
instance
.
getConsumerProperties
())
.
setProperty
(
"auto.offset.commit"
,
"true"
)
.
setProperty
(
"auto.commit.interval.ms"
,
"1000"
)
.
setProperty
(
"commit.offsets.on.checkpoint"
,
"true"
)
.
setBootstrapServers
(
properties
.
getProperty
(
"bootstrap.servers"
))
.
setTopics
(
topic
)
.
setGroupId
(
properties
.
getProperty
(
"consumer.group.id"
)+
"test1"
)
/* 设置起始偏移量有以下几种情况
1.从指定的位置消费:OffsetsInitializer.offsets(Map<TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.
setStartingOffsets
(
OffsetsInitializer
.
committedOffsets
(
OffsetResetStrategy
.
LATEST
))
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.
setDeserializer
(
KafkaRecordDeserializationSchema
.
valueOnly
(
StringDeserializer
.
class
))
.
build
();
DataStream
<
String
>
stream
=
env
.
fromSource
(
source
,
WatermarkStrategy
.
noWatermarks
(),
"kafka-car-trajectory-source"
);
SingleOutputStreamOperator
<
FrameModel
>
frameModelStream
=
stream
.
flatMap
(
new
FrameFlatMap
())
.
setParallelism
(
1
)
.
name
(
"轨迹帧数据-JsonToObject"
);
SingleOutputStreamOperator
<
CarRecordInfo
>
filteredStream
=
frameModelStream
.
flatMap
(
new
CarRecordInfoFlatMap
())
.
setParallelism
(
1
)
.
name
(
"筛选中有车牌的车辆"
);
SingleOutputStreamOperator
<
CarRecordInfo
>
sink
=
filteredStream
.
keyBy
(
CarRecordInfo:
:
getCarPlate
)
.
window
(
TumblingProcessingTimeWindows
.
of
(
Time
.
seconds
(
60
)))
.
process
(
new
ProcessCarInfo
())
.
setParallelism
(
2
)
.
name
(
"统计每分钟内不同车牌信息"
);
sink
.
addSink
(
JdbcSink
.
sink
(
"insert into car_record_info (car_id, car_plate, create_time) values(?,?,?)"
,
new
JdbcStatementBuilder
<
CarRecordInfo
>()
{
@Override
public
void
accept
(
PreparedStatement
preparedStatement
,
CarRecordInfo
carRecordInfo
)
throws
SQLException
{
preparedStatement
.
setInt
(
1
,
carRecordInfo
.
getCarId
());
preparedStatement
.
setString
(
2
,
carRecordInfo
.
getCarPlate
());
preparedStatement
.
setDate
(
3
,
new
Date
(
carRecordInfo
.
getCreateTime
().
getTime
()));
}
},
new
JdbcConnectionOptions
.
JdbcConnectionOptionsBuilder
()
.
withUrl
(
"jdbc:mysql://10.102.1.182:3306/flink_test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
)
.
withUsername
(
"root"
)
.
withPassword
(
"Wanji300552"
)
.
withDriverName
(
"com.mysql.cj.jdbc.Driver"
)
.
withConnectionCheckTimeoutSeconds
(
60
)
.
build
()
)).
name
(
"写入mysql"
);
env
.
execute
(
"从kafka消费轨迹数据并且写到mysql中"
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/CarTrajectoryIndexMain.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
;
import
com.alibaba.fastjson.JSONObject
;
import
com.wanji.indicators.event.stream.func.CrossFrameFlatMap
;
import
com.wanji.indicators.model.CarTrackModel
;
import
com.wanji.indicators.model.CrossFrameModel
;
import
com.wanji.indicators.model.FrameModel
;
import
com.wanji.indicators.source.LaneInfoSource
;
import
com.wanji.indicators.task.lightstatus.BindCrossLaneInfoCoFlatMap
;
import
com.wanji.indicators.task.lightstatus.model.VehicleHeadTimeModel
;
import
com.wanji.indicators.task.track.service.func.FrameFlatMap
;
import
com.wanji.indicators.task.trajectory.func.CarDataValidatorFlatMap
;
import
com.wanji.indicators.task.trajectory.func.CrossRoadLightStatusCoFlatMap
;
import
com.wanji.indicators.task.trajectory.func.VehicleGapTimeProcessFunction
;
import
com.wanji.indicators.task.trajectory.helper.FileReadingHelper
;
import
com.wanji.indicators.task.trajectory.pojo.*
;
import
com.wanji.indicators.util.DateUtil
;
import
com.wanji.indicators.util.PropertiesHelper
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.connector.jdbc.JdbcConnectionOptions
;
import
org.apache.flink.connector.jdbc.JdbcSink
;
import
org.apache.flink.connector.jdbc.JdbcStatementBuilder
;
import
org.apache.flink.connector.kafka.source.KafkaSource
;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
;
import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
;
import
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
;
import
org.apache.flink.streaming.api.datastream.ConnectedStreams
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.CheckpointConfig
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
;
import
org.apache.flink.streaming.api.functions.windowing.WindowFunction
;
import
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
;
import
org.apache.flink.streaming.api.windowing.time.Time
;
import
org.apache.flink.streaming.api.windowing.windows.TimeWindow
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
org.apache.flink.util.Collector
;
import
org.apache.kafka.clients.consumer.OffsetResetStrategy
;
import
org.apache.kafka.common.serialization.StringDeserializer
;
import
org.springframework.util.CollectionUtils
;
import
java.sql.Date
;
import
java.sql.PreparedStatement
;
import
java.sql.SQLException
;
import
java.time.Duration
;
import
java.util.*
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.atomic.AtomicReference
;
import
java.util.stream.Collectors
;
import
java.util.stream.StreamSupport
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/25 14:03
* @Description :
*/
public
class
CarTrajectoryIndexMain
{
private
static
final
ConcurrentHashMap
<
String
,
String
>
eastAndWestMap
;
public
static
final
ConcurrentHashMap
<
String
,
String
>
northAndSouthMap
;
private
static
final
Set
<
String
>
crossIdSet
;
public
static
final
Map
<
String
,
String
>
virtualCrossRoadArea
=
new
HashMap
<>();
private
static
final
Map
<
String
,
Double
>
ridFreeTimeMap
=
new
HashMap
<>();
public
static
final
Map
<
String
,
List
<
CrossExitInfo
>>
crossExitMap
=
new
ConcurrentHashMap
<>();
private
static
final
Properties
properties
=
PropertiesHelper
.
getInstance
().
getProperties
();
private
static
final
PropertiesHelper
instance
=
PropertiesHelper
.
getInstance
();
static
{
String
ridInfoOfEastAndWest
=
properties
.
getProperty
(
"east.west.rid.direction.list"
);
Map
<
String
,
String
>
eastAndWest
=
Arrays
.
stream
(
ridInfoOfEastAndWest
.
split
(
","
)).
map
(
string
->
string
.
split
(
":"
)).
collect
(
Collectors
.
toMap
(
array
->
array
[
0
],
array
->
array
[
1
]));
eastAndWestMap
=
new
ConcurrentHashMap
<>(
eastAndWest
);
String
ridInfoOfNorthAndSouth
=
properties
.
getProperty
(
"north.south.rid.direction.list"
);
Map
<
String
,
String
>
northAndSouth
=
Arrays
.
stream
(
ridInfoOfNorthAndSouth
.
split
(
","
)).
map
(
string
->
string
.
split
(
":"
)).
collect
(
Collectors
.
toMap
(
array
->
array
[
0
],
array
->
array
[
1
]));
northAndSouthMap
=
new
ConcurrentHashMap
<>(
northAndSouth
);
String
listOfCrossId
=
properties
.
getProperty
(
"full.area.cross.list"
);
crossIdSet
=
Arrays
.
stream
(
listOfCrossId
.
split
(
","
)).
collect
(
Collectors
.
toSet
());
}
static
{
List
<
String
>
content
=
FileReadingHelper
.
getFileContent
(
"data/cross_exit_info.txt"
);
if
(!
CollectionUtils
.
isEmpty
(
content
)){
//数组结构[路口id][路段id][出口区域经纬度坐标集]
Map
<
String
,
List
<
String
[]>>
groupMapByCrossId
=
content
.
stream
().
map
(
s
->
s
.
split
(
":"
)).
collect
(
Collectors
.
groupingBy
(
array
->
array
[
0
]));
crossIdSet
.
forEach
(
crossId
->
{
if
(
groupMapByCrossId
.
containsKey
(
crossId
)){
List
<
String
[]>
list
=
groupMapByCrossId
.
get
(
crossId
);
List
<
CrossExitInfo
>
exitList
=
list
.
stream
().
map
(
info
->
{
CrossExitInfo
exitInfo
=
new
CrossExitInfo
();
exitInfo
.
setRid
(
info
[
1
]);
exitInfo
.
setCoordinates
(
info
[
2
]);
return
exitInfo
;
}).
collect
(
Collectors
.
toList
());
crossExitMap
.
put
(
crossId
,
exitList
);
}
});
}
}
static
{
String
area1
=
properties
.
getProperty
(
"virtual.crossroad.13NED0B5Q90"
);
String
area2
=
properties
.
getProperty
(
"virtual.crossroad.13NH20B5RH0"
);
String
[]
array1
=
area1
.
split
(
":"
);
virtualCrossRoadArea
.
put
(
array1
[
0
],
array1
[
1
]);
String
[]
array2
=
area2
.
split
(
":"
);
virtualCrossRoadArea
.
put
(
array2
[
0
],
array2
[
1
]);
}
static
{
ridFreeTimeMap
.
put
(
"13NED0B5Q9013NF80B5QN00"
,
5.0
);
ridFreeTimeMap
.
put
(
"13NF80B5QN013NGH0B5RC00"
,
26.0
);
ridFreeTimeMap
.
put
(
"13NGH0B5RC013NI00B5RM00"
,
29.0
);
ridFreeTimeMap
.
put
(
"13NI00B5RM013NID0B5RM00"
,
5.0
);
ridFreeTimeMap
.
put
(
"13NID0B5RM013NH20B5RH00"
,
4.4
);
ridFreeTimeMap
.
put
(
"13NH20B5RH013NID0B5RM00"
,
4.4
);
ridFreeTimeMap
.
put
(
"13NID0B5RM013NI00B5RM00"
,
5.0
);
ridFreeTimeMap
.
put
(
"13NI00B5RM013NGH0B5RC00"
,
27.0
);
ridFreeTimeMap
.
put
(
"13NGH0B5RC013NF80B5QN00"
,
26.0
);
ridFreeTimeMap
.
put
(
"13NF80B5QN013NED0B5Q900"
,
5.0
);
}
public
static
void
main
(
String
[]
args
)
{
try
{
// PropertiesHelper instance = PropertiesHelper.getInstance();
// Properties properties = instance.getProperties();
//获取配置文件中的kafka消费topic
String
topic
=
properties
.
getProperty
(
"consumer.topic"
);
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
//配置检查点机制
env
.
enableCheckpointing
(
60
*
1000
);
env
.
getCheckpointConfig
().
setTolerableCheckpointFailureNumber
(
3
);
env
.
getCheckpointConfig
().
setCheckpointTimeout
(
10
*
60
*
1000
);
env
.
getCheckpointConfig
().
setCheckpointStorage
(
new
FileSystemCheckpointStorage
(
properties
.
getProperty
(
"check.point.uri"
)));
env
.
getCheckpointConfig
().
setExternalizedCheckpointCleanup
(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.
DELETE_ON_CANCELLATION
);
KafkaSource
<
String
>
source
=
KafkaSource
.<
String
>
builder
()
.
setProperties
(
instance
.
getConsumerProperties
())
.
setProperty
(
"auto.offset.commit"
,
"true"
)
.
setProperty
(
"auto.commit.interval.ms"
,
"1000"
)
.
setProperty
(
"commit.offsets.on.checkpoint"
,
"true"
)
.
setBootstrapServers
(
properties
.
getProperty
(
"bootstrap.servers"
))
.
setTopics
(
topic
)
.
setGroupId
(
properties
.
getProperty
(
"consumer.group.id"
)
+
"-car-trace"
)
/* 设置起始偏移量有以下几种情况
1.从指定的位置消费:OffsetsInitializer.offsets(Map<TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.
setStartingOffsets
(
OffsetsInitializer
.
committedOffsets
(
OffsetResetStrategy
.
LATEST
))
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.
setDeserializer
(
KafkaRecordDeserializationSchema
.
valueOnly
(
StringDeserializer
.
class
))
.
build
();
DataStream
<
String
>
stream
=
env
.
fromSource
(
source
,
WatermarkStrategy
.
noWatermarks
(),
"kafka-car-trajectory-data-source"
);
SingleOutputStreamOperator
<
FrameModel
>
frameModelStream
=
stream
.
flatMap
(
new
FrameFlatMap
())
.
setParallelism
(
1
)
.
name
(
"轨迹帧数据-JsonToObject"
);
SingleOutputStreamOperator
<
CarTrackModel
>
qualifiedStream
=
frameModelStream
.
flatMap
(
new
CarDataValidatorFlatMap
())
.
setParallelism
(
1
)
.
name
(
"校验轨迹帧数据-ToCarTrackModel"
);
SingleOutputStreamOperator
<
CarTrackModel
>
filteredStream
=
qualifiedStream
.
filter
(
new
FilterFunction
<
CarTrackModel
>()
{
@Override
public
boolean
filter
(
CarTrackModel
carTrackModel
)
throws
Exception
{
return
carTrackModel
.
getRoadnet
().
getInCrossFlag
()
==
1
;
}
}).
setParallelism
(
1
).
name
(
"过滤到达路口的轨迹数据"
);
//根据车辆上报时间每隔1分钟统计10分钟的数据
SingleOutputStreamOperator
<
RidIndexResultOfEastAndWest
>
trafficIndexStream
=
filteredStream
.
assignTimestampsAndWatermarks
(
WatermarkStrategy
.<
CarTrackModel
>
forBoundedOutOfOrderness
(
Duration
.
ofSeconds
(
5
))
.
withTimestampAssigner
((
event
,
timestamp
)
->
event
.
getGlobalTimeStamp
()))
.
keyBy
(
CarTrackModel:
:
getPicLicense
)
.
window
(
SlidingEventTimeWindows
.
of
(
Time
.
minutes
(
10
),
Time
.
minutes
(
1
)))
.
apply
(
new
WindowFunction
<
CarTrackModel
,
SingleCarInfo
,
String
,
TimeWindow
>()
{
@Override
public
void
apply
(
String
plate
,
TimeWindow
timeWindow
,
Iterable
<
CarTrackModel
>
iterable
,
Collector
<
SingleCarInfo
>
collector
)
throws
Exception
{
long
windowStart
=
timeWindow
.
getStart
();
long
windowEnd
=
timeWindow
.
getEnd
();
List
<
CarTrackModel
>
models
=
StreamSupport
.
stream
(
iterable
.
spliterator
(),
false
).
collect
(
Collectors
.
toList
());
List
<
CarTrackModel
>
sorted
=
models
.
stream
().
sorted
(
Comparator
.
comparing
(
CarTrackModel:
:
getGlobalTimeStamp
)).
collect
(
Collectors
.
toList
());
if
(!
CollectionUtils
.
isEmpty
(
sorted
)
&&
sorted
.
size
()
>
1
)
{
for
(
int
i
=
0
;
i
<
sorted
.
size
()
-
1
;
i
++)
{
CarTrackModel
current
=
sorted
.
get
(
i
);
CarTrackModel
next
=
sorted
.
get
(
i
+
1
);
if
(!
current
.
getRoadnet
().
getCrossId
().
equals
(
next
.
getRoadnet
().
getCrossId
()))
{
SingleCarInfo
singleCarInfo
=
new
SingleCarInfo
();
singleCarInfo
.
setId
(
next
.
getId
());
singleCarInfo
.
setPlate
(
plate
);
singleCarInfo
.
setCrossId
(
next
.
getRoadnet
().
getCrossId
());
singleCarInfo
.
setRid
(
next
.
getRoadnet
().
getRid
());
singleCarInfo
.
setLaneId
(
next
.
getRoadnet
().
getLaneId
());
singleCarInfo
.
setSegmentId
(
next
.
getRoadnet
().
getSegmentId
());
singleCarInfo
.
setStartTime
(
current
.
getGlobalTimeStamp
());
singleCarInfo
.
setEndTime
(
next
.
getGlobalTimeStamp
());
singleCarInfo
.
setTransitTime
(
next
.
getGlobalTimeStamp
()
-
current
.
getGlobalTimeStamp
());
singleCarInfo
.
setWindowStartTime
(
windowStart
);
singleCarInfo
.
setWindowEndTime
(
windowEnd
);
collector
.
collect
(
singleCarInfo
);
}
}
}
}
}).
windowAll
(
SlidingEventTimeWindows
.
of
(
Time
.
minutes
(
10
),
Time
.
minutes
(
1
)))
.
apply
(
new
AllWindowFunction
<
SingleCarInfo
,
RidIndexResultOfEastAndWest
,
TimeWindow
>()
{
@Override
public
void
apply
(
TimeWindow
timeWindow
,
Iterable
<
SingleCarInfo
>
iterable
,
Collector
<
RidIndexResultOfEastAndWest
>
collector
)
throws
Exception
{
List
<
SingleCarInfo
>
list
=
StreamSupport
.
stream
(
iterable
.
spliterator
(),
false
).
collect
(
Collectors
.
toList
());
if
(!
CollectionUtils
.
isEmpty
(
list
)){
List
<
RidIndexResultOfEastAndWest
>
resultList
=
new
ArrayList
<>();
Map
<
String
,
List
<
SingleCarInfo
>>
mapGroupByRid
=
list
.
stream
().
collect
(
Collectors
.
groupingBy
(
SingleCarInfo:
:
getRid
));
mapGroupByRid
.
forEach
((
rid
,
value
)
->
{
if
(
ridFreeTimeMap
.
containsKey
(
rid
)){
RidIndexResultOfEastAndWest
result
=
new
RidIndexResultOfEastAndWest
();
result
.
setRid
(
rid
);
result
.
setCrossId
(
value
.
get
(
0
).
getCrossId
());
long
totalTransitTime
=
value
.
stream
().
mapToLong
(
SingleCarInfo:
:
getTransitTime
).
sum
();
double
avgIndex
=
Double
.
parseDouble
(
String
.
valueOf
(
totalTransitTime
))
/
(
value
.
size
()
*
1000
);
double
freeTime
=
ridFreeTimeMap
.
get
(
rid
);
double
index
=
avgIndex
/
freeTime
;
result
.
setIndex
(
index
);
String
indexName
;
if
(
index
<=
1.8
)
{
indexName
=
"畅通"
;
}
else
if
(
index
<=
2.5
)
{
indexName
=
"轻度拥堵"
;
}
else
if
(
index
<=
3.5
)
{
indexName
=
"中度拥堵"
;
}
else
{
indexName
=
"重度拥堵"
;
}
result
.
setIndexName
(
indexName
);
result
.
setTimestamp
(
DateUtil
.
getNowTime
(
DateUtil
.
YYYY_MM_DD_HH_MM_SS
));
result
.
setGlobalTimeStamp
(
System
.
currentTimeMillis
());
// resultList.add(result);
collector
.
collect
(
result
);
}
});
//根据crossId进行分组
// Map<String, List<RidIndexResultOfEastAndWest>> collect = resultList.stream().collect(Collectors.groupingBy(RidIndexResultOfEastAndWest::getCrossId));
// collector.collect(collect);
}
}
}).
setParallelism
(
1
).
name
(
"统计每辆车通过路段的通行时间"
);
SingleOutputStreamOperator
<
String
>
toJsonStream
=
trafficIndexStream
.
flatMap
(
new
FlatMapFunction
<
RidIndexResultOfEastAndWest
,
String
>()
{
@Override
public
void
flatMap
(
RidIndexResultOfEastAndWest
result
,
Collector
<
String
>
collector
)
throws
Exception
{
collector
.
collect
(
JSONObject
.
toJSONString
(
result
));
}
}).
setParallelism
(
1
).
name
(
"路段拥堵指数统计转成json字符串"
);
FlinkKafkaProducer
<
String
>
kafkaProducer
=
new
FlinkKafkaProducer
<>(
properties
.
getProperty
(
"bootstrap.servers"
),
properties
.
getProperty
(
"rid.traffic.index.analysis.topic"
),
new
SimpleStringSchema
());
toJsonStream
.
addSink
(
kafkaProducer
).
setParallelism
(
1
).
name
(
"将拥堵指数指标发送至kafka"
);
//写入mysql
trafficIndexStream
.
addSink
(
JdbcSink
.
sink
(
"insert into t_event_info (detect_time, type, rid, cross_id, event_serial_number) values(?,?,?,?,?)"
,
new
JdbcStatementBuilder
<
RidIndexResultOfEastAndWest
>()
{
@Override
public
void
accept
(
PreparedStatement
preparedStatement
,
RidIndexResultOfEastAndWest
carRecordInfo
)
throws
SQLException
{
preparedStatement
.
setDate
(
1
,
new
Date
(
carRecordInfo
.
getGlobalTimeStamp
()));
String
type
=
"501"
;
switch
(
carRecordInfo
.
getIndexName
()){
case
"畅通"
:
type
=
"501"
;
break
;
case
"轻度拥堵"
:
type
=
"502"
;
break
;
case
"中度拥堵"
:
type
=
"503"
;
break
;
case
"重度拥堵"
:
type
=
"504"
;
break
;
}
preparedStatement
.
setString
(
2
,
type
);
preparedStatement
.
setString
(
3
,
carRecordInfo
.
getRid
());
preparedStatement
.
setString
(
4
,
carRecordInfo
.
getCrossId
());
String
eventSerialNumber
=
UUID
.
randomUUID
().
toString
().
replace
(
"-"
,
""
);
preparedStatement
.
setString
(
5
,
eventSerialNumber
);
}
},
new
JdbcConnectionOptions
.
JdbcConnectionOptionsBuilder
()
.
withUrl
(
"jdbc:mysql://37.12.182.29:3306/holo_roadnet?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
)
.
withUsername
(
"root"
)
.
withPassword
(
"Wanji300552"
)
.
withDriverName
(
"com.mysql.cj.jdbc.Driver"
)
.
withConnectionCheckTimeoutSeconds
(
60
)
.
build
()
)).
name
(
"写入mysql"
);
//计算南北方向饱和度
String
lightStatusTopic
=
properties
.
getProperty
(
"light.status.topic"
);
KafkaSource
<
String
>
lightStatusSource
=
KafkaSource
.<
String
>
builder
()
.
setProperties
(
instance
.
getConsumerProperties
())
.
setProperty
(
"auto.offset.commit"
,
"true"
)
.
setProperty
(
"auto.commit.interval.ms"
,
"1000"
)
.
setProperty
(
"commit.offsets.on.checkpoint"
,
"true"
)
.
setBootstrapServers
(
properties
.
getProperty
(
"bootstrap.servers"
))
.
setTopics
(
lightStatusTopic
)
.
setGroupId
(
properties
.
getProperty
(
"consumer.group.id"
)
+
"-cross-light-status"
)
/* 设置起始偏移量有以下几种情况
1.从指定的位置消费:OffsetsInitializer.offsets(Map<TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.
setStartingOffsets
(
OffsetsInitializer
.
committedOffsets
(
OffsetResetStrategy
.
LATEST
))
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.
setDeserializer
(
KafkaRecordDeserializationSchema
.
valueOnly
(
StringDeserializer
.
class
))
.
build
();
DataStream
<
String
>
lightStatusStream
=
env
.
fromSource
(
lightStatusSource
,
WatermarkStrategy
.
noWatermarks
(),
"cross_lights_status-data-source"
);
SingleOutputStreamOperator
<
CrossFrameModel
>
thinningDataStream
=
stream
.
flatMap
(
new
CrossFrameFlatMap
()).
setParallelism
(
1
).
name
(
"全域轨迹帧数据-按路口分组输出"
);
ConnectedStreams
<
CrossFrameModel
,
String
>
connStream
=
thinningDataStream
.
connect
(
lightStatusStream
);
SingleOutputStreamOperator
<
CrossFrameModel
>
bindLightStatusStream
=
connStream
.
flatMap
(
new
CrossRoadLightStatusCoFlatMap
()).
setParallelism
(
1
).
name
(
"全域轨迹帧数据-路口灯态绑定"
);
SingleOutputStreamOperator
<
CrossFrameModel
>
joinLaneStream
=
bindLightStatusStream
.
connect
(
env
.
addSource
(
new
LaneInfoSource
()).
broadcast
())
.
flatMap
(
new
BindCrossLaneInfoCoFlatMap
())
.
setParallelism
(
1
)
.
name
(
"全域轨迹帧数据-车道属性数据绑定"
);
SingleOutputStreamOperator
<
VehicleHeadTimeModel
>
midResultStream
=
joinLaneStream
.
keyBy
(
CrossFrameModel:
:
getCrossId
)
.
process
(
new
VehicleGapTimeProcessFunction
())
.
setParallelism
(
1
)
.
name
(
"绿灯时间内,进口道转向车道空当时间计算"
);
SingleOutputStreamOperator
<
RidIndexResultOfEastAndWest
>
northAndSouthTrafficIndex
=
null
;
//计算溢出
SingleOutputStreamOperator
<
CarTrackModel
>
carTrackStreamInCrossRoad
=
frameModelStream
.
flatMap
(
new
CarTrackInCrossRoadFlatMap
())
.
setParallelism
(
1
)
.
name
(
"过滤出位于路口和指定路段区域带有车牌的车辆轨迹数据"
);
//计算1分钟内的路口溢出指数和是否死锁
double
avgSpeedEvaluationIndex
=
Double
.
parseDouble
(
properties
.
getProperty
(
"road.overflow.avg.speed"
));
String
durationEvaluationIndex
=
properties
.
getProperty
(
"road.overflow.duration"
);
int
carNumber
=
Integer
.
parseInt
(
properties
.
getProperty
(
"cross.road.deadlock.car.number"
));
double
crossAvgSpeedIndex
=
Double
.
parseDouble
(
properties
.
getProperty
(
"cross.road.deadlock.avg.speed"
));
SingleOutputStreamOperator
<
OverFlowIndexResult
>
overFlowIndexStream
=
carTrackStreamInCrossRoad
.
assignTimestampsAndWatermarks
(
WatermarkStrategy
.<
CarTrackModel
>
forBoundedOutOfOrderness
(
Duration
.
ofSeconds
(
5
))
.
withTimestampAssigner
((
event
,
timestamp
)
->
event
.
getGlobalTimeStamp
()))
.
keyBy
(
carTrackModel
->
carTrackModel
.
getRoadnet
().
getCrossId
())
.
window
(
TumblingEventTimeWindows
.
of
(
Time
.
minutes
(
1
)))
.
process
(
new
ProcessWindowFunction
<
CarTrackModel
,
OverFlowIndexResult
,
String
,
TimeWindow
>()
{
@Override
public
void
process
(
String
crossId
,
ProcessWindowFunction
<
CarTrackModel
,
OverFlowIndexResult
,
String
,
TimeWindow
>.
Context
context
,
Iterable
<
CarTrackModel
>
iterable
,
Collector
<
OverFlowIndexResult
>
collector
)
throws
Exception
{
List
<
CarTrackModel
>
carInCrossRoadList
=
StreamSupport
.
stream
(
iterable
.
spliterator
(),
false
).
collect
(
Collectors
.
toList
());
if
(!
CollectionUtils
.
isEmpty
(
carInCrossRoadList
)){
//过滤出位于指定区域内的数据
List
<
CarTrackModel
>
carTracesInSpecificArea
=
carInCrossRoadList
.
stream
().
filter
(
car
->
car
.
getIsInSpecificArea
()
==
1
).
collect
(
Collectors
.
toList
());
List
<
CarTrackModel
>
carTracesInCrossRoad
=
carInCrossRoadList
.
stream
().
filter
(
car
->
car
.
getRoadnet
().
getInCrossFlag
()
==
1
).
collect
(
Collectors
.
toList
());
//根据路口进行预取划分
Map
<
String
,
List
<
CarTrackModel
>>
groupMapInExitArea
=
carTracesInSpecificArea
.
stream
().
collect
(
Collectors
.
groupingBy
(
car
->
car
.
getRoadnet
().
getRid
()));
//对每个路口区域进行统计
if
(!
groupMapInExitArea
.
isEmpty
()){
List
<
OverFlowDetail
>
details
=
new
ArrayList
<>();
groupMapInExitArea
.
forEach
((
rid
,
cars
)
->
{
//统计路口出口内的车辆数量
Map
<
String
,
List
<
CarTrackModel
>>
carMapGroupByPlate
=
cars
.
stream
().
collect
(
Collectors
.
groupingBy
(
CarTrackModel:
:
getPicLicense
));
if
(
carMapGroupByPlate
.
size
()
>
5
){
//按照轨迹上报时间进行升序排列
List
<
CarTrackModel
>
sorted
=
cars
.
stream
().
sorted
(
Comparator
.
comparing
(
CarTrackModel:
:
getGlobalTimeStamp
)).
collect
(
Collectors
.
toList
());
//根据时间戳进行分组,找出同一时刻的轨迹数据
Map
<
Long
,
List
<
CarTrackModel
>>
groupMapByTimestamp
=
sorted
.
stream
().
collect
(
Collectors
.
groupingBy
(
CarTrackModel:
:
getGlobalTimeStamp
));
//存放时间戳和其对应的平均速度
Map
<
Long
,
Double
>
map
=
new
HashMap
<>();
groupMapByTimestamp
.
forEach
((
timestamp
,
traces
)
->
{
double
avgSpeed
=
traces
.
stream
().
mapToDouble
(
CarTrackModel:
:
getSpeed
).
summaryStatistics
().
getAverage
();
map
.
put
(
timestamp
,
avgSpeed
);
});
double
ridAvgSpeed
=
map
.
values
().
stream
().
mapToDouble
(
number
->
number
).
summaryStatistics
().
getAverage
();
List
<
Map
.
Entry
<
Long
,
Double
>>
entryList
=
map
.
entrySet
().
stream
().
sorted
(
Map
.
Entry
.
comparingByKey
()).
collect
(
Collectors
.
toList
());
int
consecutiveSeconds
=
0
;
//连续时间
long
overflowTimestamp
=
entryList
.
get
(
0
).
getKey
();
//发生溢出的时间戳
for
(
int
i
=
0
;
i
<
entryList
.
size
()
-
1
;
i
++){
Map
.
Entry
<
Long
,
Double
>
current
=
entryList
.
get
(
i
);
Map
.
Entry
<
Long
,
Double
>
next
=
entryList
.
get
(
i
+
1
);
if
(
current
.
getValue
()
<
avgSpeedEvaluationIndex
&&
next
.
getValue
()
<
avgSpeedEvaluationIndex
){
consecutiveSeconds
+=
Math
.
toIntExact
((
next
.
getKey
()
-
current
.
getKey
()));
overflowTimestamp
=
next
.
getKey
();
}
else
{
consecutiveSeconds
=
0
;
}
}
if
(
consecutiveSeconds
>
Integer
.
parseInt
(
durationEvaluationIndex
)){
//计算路口的溢出指数
double
index
=
30
/
ridAvgSpeed
;
if
(
index
>
5
){
//满足溢出判断的所有条件
OverFlowDetail
detail
=
new
OverFlowDetail
();
detail
.
setRid
(
rid
);
detail
.
setTimestamp
(
overflowTimestamp
);
detail
.
setDatetime
(
DateUtil
.
toDateTime
(
overflowTimestamp
,
DateUtil
.
YYYY_MM_DD_HH_MM_SS
));
detail
.
setIndex
(
index
);
details
.
add
(
detail
);
}
}
}
});
//存在路口发生溢出的情况
if
(!
details
.
isEmpty
()){
OverFlowIndexResult
result
=
new
OverFlowIndexResult
();
result
.
setCrossId
(
crossId
);
result
.
setDetails
(
details
);
//判断该路口死锁
if
(
details
.
size
()
>=
2
){
Map
<
String
,
List
<
CarTrackModel
>>
groupMapByPlate
=
carTracesInCrossRoad
.
stream
().
collect
(
Collectors
.
groupingBy
(
CarTrackModel:
:
getPicLicense
));
if
(
groupMapByPlate
.
size
()
>
carNumber
){
AtomicReference
<
Double
>
totalCarSpeed
=
new
AtomicReference
<>((
double
)
0
);
groupMapByPlate
.
forEach
((
carPlate
,
traces
)
->
{
double
carAvgSpeed
=
traces
.
stream
().
mapToDouble
(
CarTrackModel:
:
getSpeed
).
summaryStatistics
().
getAverage
();
totalCarSpeed
.
updateAndGet
(
v
->
(
v
+
carAvgSpeed
));
});
double
crossAvgSpeed
=
totalCarSpeed
.
get
()
/
groupMapByPlate
.
size
();
if
(
crossAvgSpeed
<
crossAvgSpeedIndex
){
result
.
setDeadLock
(
true
);
}
}
}
collector
.
collect
(
result
);
}
}
}
}
})
.
setParallelism
(
2
)
.
name
(
"统计每个路口的溢出指数"
);
SingleOutputStreamOperator
<
String
>
jsonStreamOverFlowIndex
=
overFlowIndexStream
.
flatMap
(
new
FlatMapFunction
<
OverFlowIndexResult
,
String
>()
{
@Override
public
void
flatMap
(
OverFlowIndexResult
overFlowIndexResult
,
Collector
<
String
>
collector
)
throws
Exception
{
collector
.
collect
(
JSONObject
.
toJSONString
(
overFlowIndexResult
));
}
});
String
topicOfOverFlow
=
properties
.
getProperty
(
"road.overflow.deadlock.index.analysis.topic"
);
jsonStreamOverFlowIndex
.
addSink
(
new
FlinkKafkaProducer
<
String
>(
properties
.
getProperty
(
"bootstrap.servers"
),
topicOfOverFlow
,
new
SimpleStringSchema
()
)).
setParallelism
(
1
).
name
(
"将路口溢出指标推送至kafka"
);
//计算失衡
SingleOutputStreamOperator
<
UnbalanceResult
>
unbalanceIndexStream
=
trafficIndexStream
.
union
(
northAndSouthTrafficIndex
)
.
assignTimestampsAndWatermarks
(
WatermarkStrategy
.<
RidIndexResultOfEastAndWest
>
forBoundedOutOfOrderness
(
Duration
.
ofSeconds
(
5
))
.
withTimestampAssigner
((
event
,
timestamp
)
->
event
.
getGlobalTimeStamp
()))
.
keyBy
(
RidIndexResultOfEastAndWest:
:
getCrossId
)
.
window
(
TumblingEventTimeWindows
.
of
(
Time
.
minutes
(
1
)))
.
apply
(
new
WindowFunction
<
RidIndexResultOfEastAndWest
,
UnbalanceResult
,
String
,
TimeWindow
>()
{
@Override
public
void
apply
(
String
crossId
,
TimeWindow
timeWindow
,
Iterable
<
RidIndexResultOfEastAndWest
>
iterable
,
Collector
<
UnbalanceResult
>
collector
)
throws
Exception
{
List
<
RidIndexResultOfEastAndWest
>
collect
=
StreamSupport
.
stream
(
iterable
.
spliterator
(),
false
).
collect
(
Collectors
.
toList
());
if
(!
CollectionUtils
.
isEmpty
(
collect
)
&&
collect
.
size
()
>
1
)
{
//获取最大拥堵指数
double
max
=
collect
.
stream
().
mapToDouble
(
RidIndexResultOfEastAndWest:
:
getIndex
).
max
().
getAsDouble
();
double
min
=
collect
.
stream
().
mapToDouble
(
RidIndexResultOfEastAndWest:
:
getIndex
).
min
().
getAsDouble
();
double
index
=
max
/
min
;
if
(
index
>=
1.5
)
{
UnbalanceResult
result
=
new
UnbalanceResult
();
result
.
setCrossId
(
crossId
);
result
.
setIndex
(
index
);
result
.
setTimestamp
(
DateUtil
.
getNowTime
(
DateUtil
.
YYYY_MM_DD_HH_MM_SS
));
collector
.
collect
(
result
);
}
}
}
}).
setParallelism
(
2
).
name
(
"统计路口失衡指数"
);
SingleOutputStreamOperator
<
String
>
jsonStreamOfUnbalance
=
unbalanceIndexStream
.
flatMap
(
new
FlatMapFunction
<
UnbalanceResult
,
String
>()
{
@Override
public
void
flatMap
(
UnbalanceResult
unbalanceResult
,
Collector
<
String
>
collector
)
throws
Exception
{
String
value
=
JSONObject
.
toJSONString
(
unbalanceResult
);
collector
.
collect
(
value
);
}
}).
setParallelism
(
1
).
name
(
"将失衡指标数据转成json字符串"
);
String
topicOfUnbalanceIndexCalculation
=
properties
.
getProperty
(
"road.unbalance.index.analysis"
);
jsonStreamOfUnbalance
.
addSink
(
new
FlinkKafkaProducer
<
String
>(
properties
.
getProperty
(
"bootstrap.servers"
),
topicOfUnbalanceIndexCalculation
,
new
SimpleStringSchema
()
)).
setParallelism
(
1
).
name
(
"将失衡指数发送至kafka"
);
env
.
execute
(
"计算拥堵-溢出-失衡-死锁-相位空放"
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/Test2.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
;
import
java.util.UUID
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/11/2 19:05
* @Description :
*/
public
class
Test2
{
public
static
void
main
(
String
[]
args
)
{
String
eventSerialNumber
=
UUID
.
randomUUID
().
toString
().
replace
(
"-"
,
""
);
System
.
out
.
println
(
eventSerialNumber
);
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/func/CarDataValidatorFlatMap.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
func
;
import
com.wanji.indicators.constant.VehicleTypeEnum
;
import
com.wanji.indicators.model.CarTrackModel
;
import
com.wanji.indicators.model.FrameModel
;
import
com.wanji.indicators.task.trajectory.CarTrajectoryIndexMain
;
import
com.wanji.indicators.util.DateUtil
;
import
com.wanji.indicators.util.GeomsConvertUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.util.Collector
;
import
org.springframework.util.CollectionUtils
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Objects
;
import
java.util.stream.Collectors
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/25 14:48
* @Description :
*/
public
class
CarDataValidatorFlatMap
implements
FlatMapFunction
<
FrameModel
,
CarTrackModel
>
{
@Override
public
void
flatMap
(
FrameModel
frameModel
,
Collector
<
CarTrackModel
>
collector
)
throws
Exception
{
Long
globalTimeStamp
=
frameModel
.
getGlobalTimeStamp
();
String
globalId
=
frameModel
.
getOrgCode
();
List
<
CarTrackModel
>
trackList
=
frameModel
.
getTrackList
();
if
(!
CollectionUtils
.
isEmpty
(
trackList
)){
List
<
CarTrackModel
>
list
=
trackList
.
stream
()
.
filter
(
CarDataValidatorFlatMap:
:
isValidated
)
.
peek
(
source
->
{
source
.
setTimeStamp
(
DateUtil
.
toDateTime
(
globalTimeStamp
,
"yyyy-MM-dd HH:mm:ss.SSS"
));
source
.
setGlobalTimeStamp
(
globalTimeStamp
);
source
.
setOrgCode
(
globalId
);
//检验车流轨迹数据是否位于虚拟路口区域
Map
<
String
,
String
>
virtualCrossRoadArea
=
CarTrajectoryIndexMain
.
virtualCrossRoadArea
;
for
(
Map
.
Entry
<
String
,
String
>
entry:
virtualCrossRoadArea
.
entrySet
()){
boolean
inPolygon
=
GeomsConvertUtil
.
isInPolygon
(
source
.
getLongitude
(),
source
.
getLatitude
(),
entry
.
getValue
());
if
(
inPolygon
){
source
.
getRoadnet
().
setInCrossFlag
(
1
);
source
.
getRoadnet
().
setCrossId
(
entry
.
getKey
());
break
;
}
}
short
originalType
=
source
.
getOriginalType
().
shortValue
();
int
category
;
if
(
VehicleTypeEnum
.
isMotorVehicles
(
originalType
))
{
category
=
1
;
}
else
if
(
VehicleTypeEnum
.
isNonMotorVehicles
(
originalType
))
{
category
=
2
;
}
else
{
category
=
3
;
}
source
.
setCategory
(
category
);
}).
collect
(
Collectors
.
toList
());
list
.
forEach
(
collector:
:
collect
);
}
}
public
static
boolean
isValidated
(
CarTrackModel
source
)
{
//首先校验是否存在车牌信息
boolean
hasQualifiedPlat
=
isQualifiedPlate
(
source
.
getPicLicense
());
if
(
hasQualifiedPlat
){
//校验路网信息是否齐全
CarTrackModel
.
RoadNet
roadNet
=
source
.
getRoadnet
();
if
(
Objects
.
nonNull
(
roadNet
)){
boolean
qualifiedCrossId
=
isQualified
(
roadNet
.
getCrossId
());
boolean
qualifiedRid
=
isQualified
(
roadNet
.
getRid
());
boolean
qualifiedLaneId
=
isQualified
(
roadNet
.
getLaneId
());
boolean
qualifiedSegmentId
=
isQualified
(
roadNet
.
getSegmentId
());
return
qualifiedCrossId
&&
qualifiedRid
&&
qualifiedLaneId
&&
qualifiedSegmentId
;
}
}
return
false
;
}
public
static
boolean
isQualified
(
String
string
)
{
return
StringUtils
.
isNotEmpty
(
string
)
&&
!
Objects
.
equals
(
"null"
,
string
);
}
public
static
boolean
isQualifiedPlate
(
String
plate
){
return
StringUtils
.
isNotEmpty
(
plate
)
&&
!
Objects
.
equals
(
"null"
,
plate
)
&&
!
Objects
.
equals
(
"默A00000"
,
plate
);
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/func/CarRecordInfoFlatMap.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
func
;
import
com.wanji.indicators.model.CarTrackModel
;
import
com.wanji.indicators.model.FrameModel
;
import
com.wanji.indicators.task.trajectory.pojo.CarRecordInfo
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.util.Collector
;
import
org.springframework.util.CollectionUtils
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Objects
;
import
java.util.stream.Collectors
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/24 16:21
* @Description :
*/
public
class
CarRecordInfoFlatMap
implements
FlatMapFunction
<
FrameModel
,
CarRecordInfo
>
{
@Override
public
void
flatMap
(
FrameModel
frameModel
,
Collector
<
CarRecordInfo
>
collector
)
throws
Exception
{
List
<
CarTrackModel
>
trackList
=
frameModel
.
getTrackList
();
if
(!
CollectionUtils
.
isEmpty
(
trackList
)){
List
<
CarRecordInfo
>
filteredList
=
trackList
.
stream
().
filter
(
carTrackModel
->
StringUtils
.
isNotEmpty
(
carTrackModel
.
getPicLicense
())
&&
!
Objects
.
equals
(
"null"
,
carTrackModel
.
getPicLicense
()))
.
map
(
carTrackModel
->
{
CarRecordInfo
carRecordInfo
=
new
CarRecordInfo
();
carRecordInfo
.
setCarId
(
carTrackModel
.
getId
());
carRecordInfo
.
setCarPlate
(
carTrackModel
.
getPicLicense
());
carRecordInfo
.
setCreateTime
(
new
Date
());
return
carRecordInfo
;
}).
collect
(
Collectors
.
toList
());
filteredList
.
forEach
(
collector:
:
collect
);
}
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/func/CrossRoadLightStatusCoFlatMap.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
func
;
import
com.alibaba.fastjson.JSONArray
;
import
com.wanji.indicators.model.CrossFrameModel
;
import
com.wanji.indicators.model.CrossRidTurnLampStatusModel
;
import
com.wanji.indicators.task.trajectory.pojo.CrossRoadLightStatusModel
;
import
org.apache.flink.streaming.api.functions.co.CoFlatMapFunction
;
import
org.apache.flink.util.Collector
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Objects
;
import
java.util.concurrent.ConcurrentHashMap
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/11/1 13:54
* @Description :
*/
public
class
CrossRoadLightStatusCoFlatMap
implements
CoFlatMapFunction
<
CrossFrameModel
,
String
,
CrossFrameModel
>
{
private
static
final
ConcurrentHashMap
<
String
,
CrossRoadLightStatusModel
>
lightStatusMap
=
new
ConcurrentHashMap
<>();
@Override
public
void
flatMap1
(
CrossFrameModel
crossFrameModel
,
Collector
<
CrossFrameModel
>
collector
)
throws
Exception
{
//将路口的每一帧轨迹数据与当前路口的相位灯信息绑定
if
(
lightStatusMap
.
containsKey
(
crossFrameModel
.
getCrossId
())){
CrossRoadLightStatusModel
lightStatus
=
lightStatusMap
.
get
(
crossFrameModel
.
getCrossId
());
//灯态上报时间戳
long
lightStatusTimeStamp
=
Long
.
parseLong
(
lightStatus
.
getTimeStamp
());
//当前路口车流轨迹数据帧时间戳
long
globalTimeStamp
=
crossFrameModel
.
getGlobalTimeStamp
();
if
(
globalTimeStamp
>=
lightStatusTimeStamp
){
Integer
cycleCountDown
=
lightStatus
.
getCycleCountDown
();
Integer
cyclePhaseCountDown
=
lightStatus
.
getCyclePhaseCountDown
();
Integer
cycleLen
=
lightStatus
.
getCycleLen
();
//存储当前路口不同进口车道的相位灯信息
List
<
CrossRidTurnLampStatusModel
>
ridTurnLampList
=
new
ArrayList
<>();
Map
<
String
,
Map
<
String
,
String
>>
dirLampGroupMap
=
lightStatus
.
getDirLampGroupMap
();
for
(
Map
.
Entry
<
String
,
Map
<
String
,
String
>>
entry:
dirLampGroupMap
.
entrySet
()){
//不同转向车道和相位灯状态
Map
<
String
,
String
>
laneTurnAndLampState
=
entry
.
getValue
();
for
(
Map
.
Entry
<
String
,
String
>
laneEntry:
laneTurnAndLampState
.
entrySet
()){
CrossRidTurnLampStatusModel
ridTurnLampStatusModel
=
new
CrossRidTurnLampStatusModel
();
ridTurnLampStatusModel
.
setCrossId
(
lightStatus
.
getCrossId
());
ridTurnLampStatusModel
.
setCycleCountDown
(
cycleCountDown
);
ridTurnLampStatusModel
.
setCyclePhaseCountDown
(
cyclePhaseCountDown
);
ridTurnLampStatusModel
.
setCycleLen
(
cycleLen
);
ridTurnLampStatusModel
.
setDir
(
Integer
.
parseInt
(
entry
.
getKey
()));
ridTurnLampStatusModel
.
setTurn
(
laneEntry
.
getKey
());
//车道转向
ridTurnLampStatusModel
.
setLampState
(
laneEntry
.
getValue
());
//车道对应灯态
ridTurnLampList
.
add
(
ridTurnLampStatusModel
);
}
}
//绑定灯态信息
crossFrameModel
.
setRidTurnLampList
(
ridTurnLampList
);
collector
.
collect
(
crossFrameModel
);
}
}
}
@Override
public
void
flatMap2
(
String
lightStatusJson
,
Collector
<
CrossFrameModel
>
collector
)
throws
Exception
{
List
<
CrossRoadLightStatusModel
>
list
=
JSONArray
.
parseArray
(
lightStatusJson
,
CrossRoadLightStatusModel
.
class
);
for
(
CrossRoadLightStatusModel
lightStatus:
list
){
lightStatusMap
.
put
(
lightStatus
.
getCrossId
(),
lightStatus
);
}
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/func/ProcessCarInfo.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
func
;
import
com.wanji.indicators.task.trajectory.pojo.CarRecordInfo
;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
;
import
org.apache.flink.streaming.api.windowing.windows.TimeWindow
;
import
org.apache.flink.util.Collector
;
import
java.util.HashSet
;
import
java.util.Iterator
;
import
java.util.Set
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/24 17:04
* @Description :
*/
public
class
ProcessCarInfo
extends
ProcessWindowFunction
<
CarRecordInfo
,
CarRecordInfo
,
String
,
TimeWindow
>
{
@Override
public
void
process
(
String
s
,
ProcessWindowFunction
<
CarRecordInfo
,
CarRecordInfo
,
String
,
TimeWindow
>.
Context
context
,
Iterable
<
CarRecordInfo
>
iterable
,
Collector
<
CarRecordInfo
>
collector
)
throws
Exception
{
Set
<
Integer
>
idSets
=
new
HashSet
<>();
for
(
CarRecordInfo
carRecordInfo
:
iterable
)
{
if
(!
idSets
.
contains
(
carRecordInfo
.
getCarId
()))
{
collector
.
collect
(
carRecordInfo
);
idSets
.
add
(
carRecordInfo
.
getCarId
());
}
}
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/func/ProcessCarTrace.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
func
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/25 16:09
* @Description :
*/
public
class
ProcessCarTrace
{
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/func/ProcessOverFlowFunction.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
func
;
import
com.wanji.indicators.model.CarTrackModel
;
import
com.wanji.indicators.task.trajectory.pojo.OverFlowIndexResult
;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
;
import
org.apache.flink.streaming.api.windowing.windows.TimeWindow
;
import
org.apache.flink.util.Collector
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.stream.Collectors
;
import
java.util.stream.Stream
;
import
java.util.stream.StreamSupport
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/27 10:39
* @Description :
*/
public
class
ProcessOverFlowFunction
extends
ProcessWindowFunction
<
CarTrackModel
,
OverFlowIndexResult
,
String
,
TimeWindow
>
{
private
Integer
agvSpeed
=
5
;
private
Integer
duration
=
3
;
private
static
Map
<
String
,
Set
<
String
>>
crossAndRid
;
private
static
Map
<
String
,
Set
<
String
>>
crossArea
;
public
ProcessOverFlowFunction
(){
}
public
ProcessOverFlowFunction
(
String
agvSpeed
,
String
duration
){
super
();
this
.
agvSpeed
=
Integer
.
parseInt
(
agvSpeed
);
this
.
duration
=
Integer
.
parseInt
(
duration
);
}
@Override
public
void
process
(
String
s
,
ProcessWindowFunction
<
CarTrackModel
,
OverFlowIndexResult
,
String
,
TimeWindow
>.
Context
context
,
Iterable
<
CarTrackModel
>
iterable
,
Collector
<
OverFlowIndexResult
>
collector
)
throws
Exception
{
List
<
CarTrackModel
>
carInCrossRoadList
=
StreamSupport
.
stream
(
iterable
.
spliterator
(),
false
).
collect
(
Collectors
.
toList
());
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/func/VehicleGapTimeProcessFunction.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
func
;
import
com.wanji.indicators.constant.Constant
;
import
com.wanji.indicators.constant.LightStatusEnum
;
import
com.wanji.indicators.constant.TurnEnum
;
import
com.wanji.indicators.model.CarTrackModel
;
import
com.wanji.indicators.model.CrossFrameModel
;
import
com.wanji.indicators.model.CrossRidTurnLampStatusModel
;
import
com.wanji.indicators.task.lightstatus.model.VehicleHeadTimeModel
;
import
com.wanji.indicators.util.GeomsConvertUtil
;
import
com.wanji.indicators.util.PtInPolyUtil
;
import
org.apache.flink.api.common.state.MapState
;
import
org.apache.flink.api.common.state.MapStateDescriptor
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.KeyedProcessFunction
;
import
org.apache.flink.util.Collector
;
import
java.util.*
;
import
java.util.stream.Collectors
;
import
java.util.stream.StreamSupport
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/11/1 16:16
* @Description :
*/
public
class
VehicleGapTimeProcessFunction
extends
KeyedProcessFunction
<
String
,
CrossFrameModel
,
VehicleHeadTimeModel
>
{
private
static
final
Map
<
String
,
Set
<
Integer
>>
lightTurnControlMap
=
new
HashMap
<>();
//车道上离停止线最近的车辆缓存
private
MapState
<
String
,
CarTrackModel
>
mapState
;
//车道转向绿灯剩余时长缓存
private
MapState
<
String
,
Integer
>
greenRemainTimeState
;
//绿灯总时长
private
MapState
<
String
,
Integer
>
greenTotalTimeState
;
//车道车头时距缓存
private
MapState
<
String
,
VehicleHeadTimeModel
>
resultState
;
static
{
List
<
Integer
>
list1
=
Arrays
.
asList
(
1
,
5
,
7
,
8
,
9
,
12
,
14
,
15
);
Set
<
Integer
>
turn1
=
new
HashSet
<>(
list1
);
lightTurnControlMap
.
put
(
"1"
,
turn1
);
List
<
Integer
>
list2
=
Arrays
.
asList
(
2
,
5
,
6
,
7
,
10
,
12
,
13
,
14
);
Set
<
Integer
>
turn2
=
new
HashSet
<>(
list2
);
lightTurnControlMap
.
put
(
"2"
,
turn2
);
List
<
Integer
>
list3
=
Arrays
.
asList
(
3
,
6
,
7
,
8
,
11
,
13
,
14
,
15
);
Set
<
Integer
>
turn3
=
new
HashSet
<>(
list3
);
lightTurnControlMap
.
put
(
"3"
,
turn3
);
}
@Override
public
void
open
(
Configuration
parameters
)
{
mapState
=
getRuntimeContext
().
getMapState
(
new
MapStateDescriptor
<>(
"light_status_map"
,
String
.
class
,
CarTrackModel
.
class
));
greenRemainTimeState
=
getRuntimeContext
().
getMapState
(
new
MapStateDescriptor
<>(
"light_time_map"
,
String
.
class
,
Integer
.
class
));
resultState
=
getRuntimeContext
().
getMapState
(
new
MapStateDescriptor
<>(
"light_result_map"
,
String
.
class
,
VehicleHeadTimeModel
.
class
));
greenTotalTimeState
=
getRuntimeContext
().
getMapState
(
new
MapStateDescriptor
<>(
"light_time_map"
,
String
.
class
,
Integer
.
class
));
}
@Override
public
void
processElement
(
CrossFrameModel
value
,
Context
ctx
,
Collector
<
VehicleHeadTimeModel
>
out
)
throws
Exception
{
String
crossId
=
ctx
.
getCurrentKey
();
List
<
CarTrackModel
>
trackList
=
value
.
getTrackList
();
//灯态数据
List
<
CrossRidTurnLampStatusModel
>
ridLightStatusList
=
value
.
getRidTurnLampList
();
//当前绿灯放行方向
Map
<
Integer
,
List
<
CrossRidTurnLampStatusModel
>>
groupLightStatus
=
ridLightStatusList
.
stream
()
.
filter
(
o
->
o
.
getLampState
().
equals
(
LightStatusEnum
.
GREEN
.
getType
()))
.
collect
(
Collectors
.
groupingBy
(
CrossRidTurnLampStatusModel:
:
getDir
));
List
<
VehicleHeadTimeModel
>
results
=
new
ArrayList
<>();
for
(
Map
.
Entry
<
Integer
,
List
<
CrossRidTurnLampStatusModel
>>
entry
:
groupLightStatus
.
entrySet
())
{
Integer
dir
=
entry
.
getKey
();
List
<
CrossRidTurnLampStatusModel
>
value1
=
entry
.
getValue
();
//同个路段上绿灯放行的转向
//更新进口道绿灯时长和倒计时
String
combinedKey
=
crossId
+
"-"
+
dir
;
if
(!
greenTotalTimeState
.
contains
(
combinedKey
)){
greenTotalTimeState
.
put
(
combinedKey
,
value1
.
get
(
0
).
getCyclePhaseCountDown
());
}
greenRemainTimeState
.
put
(
combinedKey
,
value1
.
get
(
0
).
getCyclePhaseCountDown
());
for
(
CrossRidTurnLampStatusModel
greenLane:
value1
){
//绿灯放行车道
//绿灯放行进入路口内的车辆
List
<
CarTrackModel
>
runList
=
trackList
.
stream
()
.
filter
(
o
->
entry
.
getKey
().
equals
(
o
.
getRoadnet
().
getRidDir8
())
&&
o
.
getRoadnet
().
getInCrossFlag
()
==
Constant
.
CROSSING
)
.
filter
(
o
->
lightTurnControlMap
.
get
(
greenLane
.
getTurn
()).
contains
(
o
.
getRoadnet
().
getTurn
()))
//绿灯能控制转向的车道
.
collect
(
Collectors
.
toList
());
}
//绿灯放行进入路口内的车辆
List
<
CarTrackModel
>
runList
=
trackList
.
stream
()
.
filter
(
o
->
entry
.
getKey
().
equals
(
o
.
getRoadnet
().
getRidDir8
())
&&
o
.
getRoadnet
().
getInCrossFlag
()
==
Constant
.
CROSSING
)
.
collect
(
Collectors
.
toList
());
if
(!
runList
.
isEmpty
())
{
//按车道分组
Map
<
String
,
List
<
CarTrackModel
>>
groupByLane
=
runList
.
stream
().
collect
(
Collectors
.
groupingBy
(
O
->
O
.
getRoadnet
().
getLaneId
()));
for
(
Map
.
Entry
<
String
,
List
<
CarTrackModel
>>
entry1
:
groupByLane
.
entrySet
())
{
List
<
CarTrackModel
>
laneCarList
=
entry1
.
getValue
();
//计算路口内车辆到停止线距离
laneCarList
.
forEach
(
o
->
{
String
laneEndPoint
=
o
.
getRoadnet
().
getLaneEndPoint
();
if
(
Objects
.
nonNull
(
laneEndPoint
))
{
String
[]
sps
=
laneEndPoint
.
split
(
","
);
//距离停止线距离
double
stopLineDist
=
GeomsConvertUtil
.
getDistance
(
o
.
getLongitude
(),
o
.
getLatitude
(),
Double
.
parseDouble
(
sps
[
0
]),
Double
.
parseDouble
(
sps
[
1
]));
o
.
setStopLineDist
(
stopLineDist
);
}
});
//按距离排序
laneCarList
=
laneCarList
.
stream
().
sorted
(
Comparator
.
comparing
(
CarTrackModel:
:
getStopLineDist
)).
collect
(
Collectors
.
toList
());
//取靠近停止线的一辆车
CarTrackModel
lastCar
=
laneCarList
.
get
(
0
);
String
key
=
lastCar
.
getRoadnet
().
getLaneId
();
String
laneFunction
=
TurnEnum
.
getLaneFunction
(
lastCar
.
getRoadnet
().
getTurn
());
CarTrackModel
agoCar
=
mapState
.
get
(
key
);
Integer
agoGreenTime
=
greenRemainTimeState
.
get
(
key
);
//绿灯区间计算
if
(
agoCar
!=
null
&&
agoGreenTime
!=
null
)
{
//有绿灯时长时监测是否有后车
if
(
agoGreenTime
>
0
){
Integer
agoTrackId
=
agoCar
.
getId
();
Integer
trackId
=
lastCar
.
getId
();
//新的车辆驶出停止线
if
(!
Objects
.
equals
(
agoTrackId
,
trackId
))
{
//车辆类型
Integer
lastCarType
=
lastCar
.
getOriginalType
();
//车长配置,单位米
int
carLength
=
4
;
//前车车辆类型
Integer
agoCarType
=
agoCar
.
getOriginalType
();
int
agoCarLength
=
4
;
//TODO 不同车型长度配置
double
distance
=
PtInPolyUtil
.
getDistance
(
agoCar
.
getLongitude
(),
agoCar
.
getLongitude
(),
lastCar
.
getLongitude
(),
lastCar
.
getLatitude
());
double
lastCarSpeed
=
lastCar
.
getSpeed
()
/
3.6
;
//计算车头时距(空当时间) (前车车尾到后车车头距离)/后车速度
// double time = (distance - carLength / 2 - agoCarLength / 2) / lastCarSpeed;
double
time
=
(
lastCar
.
getGlobalTimeStamp
()
-
agoCar
.
getGlobalTimeStamp
())
/
1000.0
-
carLength
/
lastCarSpeed
;
VehicleHeadTimeModel
agoTimeModel
=
resultState
.
get
(
key
);
if
(
agoTimeModel
!=
null
)
{
//空当时间累加
time
=
agoTimeModel
.
getHeadTime
()
+
time
;
}
VehicleHeadTimeModel
timeModel
=
new
VehicleHeadTimeModel
();
timeModel
.
setLaneId
(
key
);
timeModel
.
setCrossId
(
lastCar
.
getRoadnet
().
getCrossId
());
timeModel
.
setRid
(
lastCar
.
getRoadnet
().
getRid
());
timeModel
.
setDir
(
dir
);
timeModel
.
setHeadTime
(
time
);
timeModel
.
setTurn
(
laneFunction
);
timeModel
.
setGreenTime
(
greenTotalTimeState
.
get
(
key
));
resultState
.
put
(
key
,
timeModel
);
mapState
.
put
(
key
,
lastCar
);
}
}
else
{
//绿灯结束时,输出计算结果
List
<
VehicleHeadTimeModel
>
list
=
StreamSupport
.
stream
(
resultState
.
values
().
spliterator
(),
false
).
collect
(
Collectors
.
toList
());
//筛选当前进口、相同转向的 车道
list
=
list
.
stream
().
filter
(
o
->
o
.
getRid
().
equals
(
lastCar
.
getRoadnet
().
getRid
())
&&
o
.
getTurn
().
equals
(
value1
.
get
(
0
).
getTurn
())).
collect
(
Collectors
.
toList
());
if
(!
list
.
isEmpty
())
{
//所有车道车头时距求平均
double
avgTime
=
list
.
stream
().
mapToDouble
(
VehicleHeadTimeModel:
:
getHeadTime
).
summaryStatistics
().
getAverage
();
VehicleHeadTimeModel
finalOutModel
=
list
.
get
(
0
);
finalOutModel
.
setHeadTime
(
avgTime
);
out
.
collect
(
finalOutModel
);
}
//TODO 输出以后清理缓存
}
}
else
{
mapState
.
put
(
key
,
lastCar
);
//只在第一次进入时缓存作为绿灯时长
greenTotalTimeState
.
put
(
key
,
value1
.
get
(
0
).
getCyclePhaseCountDown
());
//相位时长倒计时
greenRemainTimeState
.
put
(
key
,
value1
.
get
(
0
).
getCyclePhaseCountDown
());
}
}
}
}
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/helper/FileReadingHelper.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
helper
;
import
org.apache.commons.io.FileUtils
;
import
org.apache.commons.io.IOUtils
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.nio.charset.StandardCharsets
;
import
java.util.List
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/26 22:34
* @Description :
*/
public
class
FileReadingHelper
{
public
static
List
<
String
>
getFileContent
(
String
path
)
{
InputStream
inputStream
=
null
;
try
{
inputStream
=
FileReadingHelper
.
class
.
getClassLoader
().
getResourceAsStream
(
path
);
assert
inputStream
!=
null
;
return
IOUtils
.
readLines
(
inputStream
,
StandardCharsets
.
UTF_8
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
return
null
;
}
finally
{
if
(
inputStream
!=
null
){
try
{
inputStream
.
close
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
}
}
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/key/KeySelectorByPlateAndRid.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
key
;
import
com.wanji.indicators.model.CarTrackModel
;
import
org.apache.flink.api.java.functions.KeySelector
;
import
org.apache.flink.api.java.tuple.Tuple2
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/25 15:35
* @Description :
*/
public
class
KeySelectorByPlateAndRid
implements
KeySelector
<
CarTrackModel
,
Tuple2
<
String
,
String
>>
{
@Override
public
Tuple2
<
String
,
String
>
getKey
(
CarTrackModel
carTrackModel
)
throws
Exception
{
return
new
Tuple2
<>(
carTrackModel
.
getPicLicense
(),
carTrackModel
.
getRoadnet
().
getRid
());
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/pojo/CarRecordInfo.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
pojo
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
import
java.util.Date
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/24 16:14
* @Description :
*/
@Data
@NoArgsConstructor
public
class
CarRecordInfo
{
private
Long
id
;
private
Integer
carId
;
private
String
carPlate
;
private
Date
createTime
;
private
Date
updateTime
;
private
String
createBy
;
private
String
updateBy
;
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/pojo/CarTrackInCrossRoadFlatMap.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
pojo
;
import
com.wanji.indicators.constant.VehicleTypeEnum
;
import
com.wanji.indicators.model.CarTrackModel
;
import
com.wanji.indicators.model.FrameModel
;
import
com.wanji.indicators.task.trajectory.CarTrajectoryIndexMain
;
import
com.wanji.indicators.task.trajectory.func.CarDataValidatorFlatMap
;
import
com.wanji.indicators.util.DateUtil
;
import
com.wanji.indicators.util.GeomsConvertUtil
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.util.Collector
;
import
org.springframework.util.CollectionUtils
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Objects
;
import
java.util.Set
;
import
java.util.stream.Collectors
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/27 9:46
* @Description :
*/
public
class
CarTrackInCrossRoadFlatMap
implements
FlatMapFunction
<
FrameModel
,
CarTrackModel
>
{
@Override
public
void
flatMap
(
FrameModel
frameModel
,
Collector
<
CarTrackModel
>
collector
)
throws
Exception
{
Long
globalTimeStamp
=
frameModel
.
getGlobalTimeStamp
();
String
globalId
=
frameModel
.
getOrgCode
();
List
<
CarTrackModel
>
trackList
=
frameModel
.
getTrackList
();
if
(!
CollectionUtils
.
isEmpty
(
trackList
)){
List
<
CarTrackModel
>
processedList
=
trackList
.
stream
()
.
peek
(
car
->
{
//判断车辆轨迹数据在指定区域内
Map
<
String
,
List
<
CrossExitInfo
>>
crossExitMap
=
CarTrajectoryIndexMain
.
crossExitMap
;
for
(
Map
.
Entry
<
String
,
List
<
CrossExitInfo
>>
entry:
crossExitMap
.
entrySet
()){
Map
<
String
,
String
>
ridAndAreaCoordinates
=
entry
.
getValue
().
stream
().
collect
(
Collectors
.
toMap
(
CrossExitInfo:
:
getRid
,
CrossExitInfo:
:
getCoordinates
));
if
(
ridAndAreaCoordinates
.
containsKey
(
car
.
getRoadnet
().
getRid
())){
String
coordinates
=
ridAndAreaCoordinates
.
get
(
car
.
getRoadnet
().
getRid
());
boolean
inPolygon
=
GeomsConvertUtil
.
isInPolygon
(
car
.
getLongitude
(),
car
.
getLatitude
(),
coordinates
);
if
(
inPolygon
){
car
.
setIsInSpecificArea
(
1
);
car
.
getRoadnet
().
setCrossId
(
entry
.
getKey
());
break
;
}
}
}
})
.
filter
(
car
->
CarDataValidatorFlatMap
.
isQualifiedPlate
(
car
.
getPicLicense
()))
.
filter
(
car
->
(
Objects
.
nonNull
(
car
.
getRoadnet
())
&&
CarDataValidatorFlatMap
.
isQualified
(
car
.
getRoadnet
().
getCrossId
())
&&
(
car
.
getRoadnet
().
getInCrossFlag
()
==
1
))
||
car
.
getIsInSpecificArea
()
==
1
)
.
peek
(
source
->
{
source
.
setTimeStamp
(
DateUtil
.
toDateTime
(
globalTimeStamp
,
"yyyy-MM-dd HH:mm:ss.SSS"
));
source
.
setGlobalTimeStamp
(
globalTimeStamp
);
source
.
setOrgCode
(
globalId
);
short
originalType
=
source
.
getOriginalType
().
shortValue
();
int
category
;
if
(
VehicleTypeEnum
.
isMotorVehicles
(
originalType
))
{
category
=
1
;
}
else
if
(
VehicleTypeEnum
.
isNonMotorVehicles
(
originalType
))
{
category
=
2
;
}
else
{
category
=
3
;
}
source
.
setCategory
(
category
);
}).
collect
(
Collectors
.
toList
());
processedList
.
forEach
(
collector:
:
collect
);
}
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/pojo/CarTrackModelInNorthAndSouth.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
pojo
;
import
com.wanji.indicators.constant.VehicleTypeEnum
;
import
com.wanji.indicators.model.CarTrackModel
;
import
com.wanji.indicators.model.FrameModel
;
import
com.wanji.indicators.task.trajectory.CarTrajectoryIndexMain
;
import
com.wanji.indicators.task.trajectory.func.CarDataValidatorFlatMap
;
import
com.wanji.indicators.util.DateUtil
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.util.Collector
;
import
org.springframework.util.CollectionUtils
;
import
java.util.List
;
import
java.util.stream.Collectors
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/30 10:10
* @Description :
*/
public
class
CarTrackModelInNorthAndSouth
implements
FlatMapFunction
<
FrameModel
,
CarTrackModel
>
{
@Override
public
void
flatMap
(
FrameModel
frameModel
,
Collector
<
CarTrackModel
>
collector
)
throws
Exception
{
Long
globalTimeStamp
=
frameModel
.
getGlobalTimeStamp
();
String
globalId
=
frameModel
.
getOrgCode
();
List
<
CarTrackModel
>
trackList
=
frameModel
.
getTrackList
();
if
(!
CollectionUtils
.
isEmpty
(
trackList
)){
List
<
CarTrackModel
>
list
=
trackList
.
stream
()
.
filter
(
CarDataValidatorFlatMap:
:
isValidated
)
.
filter
(
car
->
CarTrajectoryIndexMain
.
northAndSouthMap
.
containsKey
(
car
.
getRoadnet
().
getRid
()))
.
peek
(
source
->
{
source
.
setTimeStamp
(
DateUtil
.
toDateTime
(
globalTimeStamp
,
"yyyy-MM-dd HH:mm:ss.SSS"
));
source
.
setGlobalTimeStamp
(
globalTimeStamp
);
source
.
setOrgCode
(
globalId
);
short
originalType
=
source
.
getOriginalType
().
shortValue
();
int
category
;
if
(
VehicleTypeEnum
.
isMotorVehicles
(
originalType
))
{
category
=
1
;
}
else
if
(
VehicleTypeEnum
.
isNonMotorVehicles
(
originalType
))
{
category
=
2
;
}
else
{
category
=
3
;
}
source
.
setCategory
(
category
);
}).
collect
(
Collectors
.
toList
());
list
.
forEach
(
collector:
:
collect
);
}
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/pojo/CrossExitInfo.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
pojo
;
import
lombok.Data
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/27 11:55
* @Description :
*/
@Data
public
class
CrossExitInfo
{
//出口路段编号
private
String
rid
;
//区域内每个顶点的坐标集
private
String
coordinates
;
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/pojo/CrossRoadLightStatusModel.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
pojo
;
import
lombok.Data
;
import
java.util.Map
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/28 14:08
* @Description :
*/
@Data
public
class
CrossRoadLightStatusModel
{
private
String
code
;
private
String
crossId
;
private
String
manufacturerCode
;
private
String
runMode
;
private
String
controlMode
;
private
String
phasePlanId
;
private
String
schemeStartTime
;
private
Integer
cycleCountDown
;
//绿灯倒计时
private
Integer
cyclePhaseCountDown
;
private
Integer
cycleLen
;
private
String
phaseId
;
private
String
schemeId
;
private
Map
<
String
,
Map
<
String
,
String
>>
dirLampGroupMap
;
// private Map<String, Integer> phaseMap;
private
String
timeStamp
;
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/pojo/OverFlowDetail.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
pojo
;
import
lombok.Data
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/27 10:06
* @Description :
*/
@Data
public
class
OverFlowDetail
{
//出现溢出的路口出口所在路段名称
private
String
rid
;
//溢出指数
private
Double
index
;
//溢出发生时间戳
private
Long
timestamp
;
//溢出时间 时间格式: yyyy-MM-dd HH:mm:ss
private
String
datetime
;
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/pojo/OverFlowIndexResult.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
pojo
;
import
lombok.Data
;
import
java.util.List
;
import
java.util.Map
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/27 10:03
* @Description :
*/
@Data
public
class
OverFlowIndexResult
{
//发生溢出的路口id
private
String
crossId
;
private
List
<
OverFlowDetail
>
details
;
private
boolean
isDeadLock
;
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/pojo/RidIndexResultOfEastAndWest.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
pojo
;
import
lombok.Data
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/26 1:09
* @Description :
*/
@Data
public
class
RidIndexResultOfEastAndWest
{
private
String
rid
;
//该路段的结束路口
private
String
crossId
;
private
Double
index
;
private
String
indexName
;
private
String
timestamp
;
private
Long
globalTimeStamp
;
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/pojo/SingleCarInfo.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
pojo
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/25 22:23
* @Description :
*/
@Data
@NoArgsConstructor
public
class
SingleCarInfo
{
private
Integer
id
;
private
String
plate
;
private
String
rid
;
private
String
crossId
;
private
String
laneId
;
//从入口到出口的路段通行时间
private
Long
transitTime
;
private
String
segmentId
;
//旅游轨迹的第一帧数据检测时间
private
Long
startTime
;
//旅游轨迹最后一帧数据检测时间
private
Long
endTime
;
private
Long
windowStartTime
;
private
Long
windowEndTime
;
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trajectory/pojo/UnbalanceResult.java
0 → 100644
View file @
d51c2793
package
com
.
wanji
.
indicators
.
task
.
trajectory
.
pojo
;
import
lombok.Data
;
/**
* @author : jenny
* @version : 1.0
* @createTime : 2023/10/26 17:12
* @Description :
*/
@Data
public
class
UnbalanceResult
{
//路口id
private
String
crossId
;
//失衡发生时间
private
String
timestamp
;
//失衡指数
private
Double
index
;
}
wj-realtime-computing/src/main/java/com/wanji/indicators/util/GeomsConvertUtil.java
View file @
d51c2793
...
...
@@ -15,10 +15,14 @@ import com.vividsolutions.jts.operation.linemerge.LineMerger;
import
com.vividsolutions.jts.util.GeometricShapeFactory
;
import
org.apache.commons.lang.StringUtils
;
import
java.awt.geom.GeneralPath
;
import
java.awt.geom.Point2D
;
import
java.math.BigDecimal
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Collection
;
import
java.util.List
;
import
java.util.stream.Collectors
;
public
class
GeomsConvertUtil
{
private
static
GeometryFactory
geometryFactory
=
new
GeometryFactory
();
...
...
@@ -1005,6 +1009,62 @@ public class GeomsConvertUtil {
return
lineList
;
}
//判断一个点是否在多边形区域内
private
static
boolean
isInPolygon
(
double
longitude
,
double
latitude
,
double
[][]
points
)
{
//将要判断的点的经纬度组成一个点
Point2D
.
Double
point
=
new
Point2D
.
Double
(
longitude
,
latitude
);
//将区域各个顶点的坐标放到一个点集合里
ArrayList
<
Point2D
.
Double
>
point2DS
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
points
.
length
;
i
++){
double
polygonPoint_x
=
points
[
i
][
0
];
//经度
double
polygonPoint_y
=
points
[
i
][
1
];
//纬度
Point2D
.
Double
polygonPoint
=
new
Point2D
.
Double
(
polygonPoint_x
,
polygonPoint_y
);
point2DS
.
add
(
polygonPoint
);
}
return
check
(
point
,
point2DS
);
}
//points的数据格式是:lon1,lat1;lon2,lat2
public
static
boolean
isInPolygon
(
double
longitude
,
double
latitude
,
String
points
){
String
[]
pointsInOneDimension
=
points
.
trim
().
split
(
";"
);
List
<
double
[]>
collect
=
Arrays
.
stream
(
pointsInOneDimension
).
map
(
s
->
s
.
trim
().
split
(
","
))
.
map
(
string
->
{
double
[]
coordinates
=
new
double
[
2
];
coordinates
[
0
]
=
Double
.
parseDouble
(
string
[
0
]);
coordinates
[
1
]
=
Double
.
parseDouble
(
string
[
1
]);
return
coordinates
;
}).
collect
(
Collectors
.
toList
());
double
[][]
polygon
=
new
double
[
collect
.
size
()][
2
];
for
(
int
i
=
0
;
i
<
collect
.
size
();
i
++){
polygon
[
i
]
=
collect
.
get
(
i
);
}
return
isInPolygon
(
longitude
,
latitude
,
polygon
);
}
private
static
boolean
check
(
Point2D
.
Double
point
,
ArrayList
<
Point2D
.
Double
>
polygon
)
{
GeneralPath
generalPath
=
new
GeneralPath
();
Point2D
.
Double
firstPoint
=
polygon
.
get
(
0
);
generalPath
.
moveTo
(
firstPoint
.
x
,
firstPoint
.
y
);
polygon
.
remove
(
0
);
for
(
Point2D
.
Double
polygonPoint:
polygon
){
generalPath
.
lineTo
(
polygonPoint
.
x
,
polygonPoint
.
y
);
}
//将几何多边形封闭
generalPath
.
lineTo
(
firstPoint
.
x
,
firstPoint
.
y
);
generalPath
.
closePath
();
//测试指定点是否在该区域边界内
return
generalPath
.
contains
(
point
);
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
...
...
wj-realtime-computing/src/main/resources/config.properties
View file @
d51c2793
...
...
@@ -33,7 +33,7 @@ alarm.feishu.url=https://open.feishu.cn/open-apis/bot/v2/hook/0840f036-299e-4595
signal.status.url
=
ws://37.12.182.29:9000/utc/signalStatus/{crossId},stateMonitor
#区域路口列表
full.area.cross.list
=
13NI00B5RM0,13NGH0B5RC0,13NF80B5QN0
full.area.cross.list
=
13NI00B5RM0,13NGH0B5RC0,13NF80B5QN0
,13NID0B5RM0
#区域指标分析数据
area.analysis.data.topic
=
analysis.area.indicators
#路段参与者影响分析
...
...
@@ -44,3 +44,31 @@ cross.event.data.topic=analysis.cross.event
plate.prefix
=
鲁
#路段默认自由流速度配置值 km/h
rid.default.free.speed
=
80
#路段拥堵指数统计
rid.traffic.index.analysis.topic
=
rid.traffic.index.analysis
#东西方向路段的rid和方向
east.west.rid.direction.list
=
13NED0B5Q9013NF80B5QN00:6,13NGH0B5RC013NF80B5QN00:2,13NI00B5RM013NGH0B5RC00:3,13NF80B5QN013NGH0B5RC00:6,13NGH0B5RC013NI00B5RM00:7,13NID0B5RM013NI00B5RM00:3
#南北方向路段的rid和方向
north.south.rid.direction.list
=
13NG40B5SK013NI00B5RM00:1,13NEH0B5RJ013NGH0B5RC00:1,13NEP0B5QJ013NGH0B5RC00:5,13NDG0B5RI013NF80B5QN00:1,13NDT0B5Q9013NF80B5QN00:4
#路口溢出评价指标
road.overflow.avg.speed
=
5.0
road.overflow.duration
=
3
road.overflow.deadlock.index.analysis.topic
=
crossroad.overflow.deadlock.index.analysis
#路口失衡topic
road.unbalance.index.analysis.topic
=
road.unbalance.index.analysis
#死锁评价指标
cross.road.deadlock.car.number
=
10
cross.road.deadlock.avg.speed
=
5.0
#相位灯的状态数据
light.status.topic
=
cross_lights_status
#虚拟路口区域
virtual.crossroad.13NED0B5Q90
=
13NED0B5Q90:117.08503591467242,36.64125732273356;117.08495255127629,36.641426722875224;117.08499878952986,36.641454206982246;117.08508543702352,36.641286700399476
virtual.crossroad.13NH20B5RH0
=
13NH20B5RH0:117.09669255282627,36.644871615002884;117.09669985552095,36.645055610398025;117.09675474612689,36.64505324101935;117.09674606214631,36.64486930675771
wj-realtime-computing/src/main/resources/data/cross_exit_info.txt
0 → 100644
View file @
d51c2793
13NID0B5RM0:13NID0B5RM013NH20B5RH00:117.09637002802258,36.64488693335725;117.09637338547009,36.644965413693335;117.096424586545,36.64496331528863;117.09642122909746,36.64488609399537
13NGH0B5RC0:13NGH0B5RC013NF80B5QN00:117.08958898682974,36.643899290074785;117.08955379723122,36.64397154605041;117.08960400105843,36.643990313836284;117.08964012904624,36.64391876165263
13NGH0B5RC0:13NGH0B5RC013NEP0B5QJ00:117.0899131390518,36.6437643837966;117.09002985710713,36.64380773763002;117.09005137896341,36.643758573022545;117.08993645702961,36.64371467989449
13NGH0B5RC0:13NGH0B5RC013NI00B5RM00:117.09029328379803,36.64403689420678;117.0902662941944,36.64411114941236;117.09031650772073,36.64412899638859;117.09034442601454,36.64405636578462
13NGH0B5RC0:13NGH0B5RC013NEH0B5RJ00:117.0900452476105,36.644319696002505;117.08989006966468,36.64426827251168;117.089875426555,36.64431973551543;117.09002407323193,36.64436900205546
13NF80B5QN0:13NF80B5QN013NGH0B5RC00:117.0861592881092,36.64197997974727;117.08609667244684,36.64205197263441;117.08614355857083,36.64208312922003;117.08620587174205,36.64201325377074
13NF80B5QN0:13NF80B5QN013NDG0B5RI00:117.08584437829073,36.64214724722395;117.08578629999519,36.642116393129456;117.08576352430352,36.64216220348786;117.08581851529976,36.6421903522089
13NF80B5QN0:13NF80B5QN013NDT0B5Q900:117.08605990284744,36.64173522729118;117.08601072453179,36.64170088034566;117.08596764994606,36.64173562023835;117.08601707684535,36.64177287323281
13NI00B5RM0:13NI00B5RM013NGH0B5RC00:117.09429965504133,36.64494101925379;117.09428967283428,36.64501573456104;117.09434230628959,36.64502178438349;117.09435410344337,36.6449452541295
13NI00B5RM0:13NI00B5RM013NG40B5SK00:117.09481322833656,36.64521631203853;117.09467287245573,36.64521963944088;117.09467426195232,36.645273677983255;117.09481413580993,36.645270760440575
13NI00B5RM0:13NI00B5RM013NID0B5RM00:117.09495729255127,36.644879713845825;117.0949548726223,36.64496864623584;117.09500798547438,36.644968227032756;117.09500962351547,36.64488122630144
13NF80B5QN0:13NF80B5QN013NED0B5Q900:117.08568429578892,36.641777410599644;117.08564440984892,36.64186426813691;117.0856906481025,36.641891752243936;117.08573299685965,36.641806147256275
13NID0B5RM0:13NID0B5RM013NI00B5RM00:117.09559352885879,36.644983794719344;117.09559110892981,36.64507272710936;117.0956442217819,36.645072307906275;117.09564585982298,36.64498530717496
13NID0B5RM0:13NID0B5RM013NHC0B5R400:117.09611905881948,36.64473039236603;117.0961578793065,36.644764806203234;117.09619669979368,36.64473584821827;117.09615515138046,36.644701434381055
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment