Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
W
wj-datacenter-platform
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
jinan
wj-datacenter-platform
Commits
282152b1
Commit
282152b1
authored
Oct 18, 2023
by
zhoushiguang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
路段路况计算
parent
48c173a5
Changes
15
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
440 additions
and
22 deletions
+440
-22
Constant.java
...src/main/java/com/wanji/indicators/constant/Constant.java
+2
-0
RoadClassMap.java
...main/java/com/wanji/indicators/constant/RoadClassMap.java
+34
-0
CarTrackModel.java
...c/main/java/com/wanji/indicators/model/CarTrackModel.java
+4
-0
BaseLaneInfoService.java
...ava/com/wanji/indicators/service/BaseLaneInfoService.java
+1
-1
BaseLaneInfoServiceImpl.java
...anji/indicators/service/impl/BaseLaneInfoServiceImpl.java
+16
-7
AreaFreeSpeedSource.java
...java/com/wanji/indicators/source/AreaFreeSpeedSource.java
+3
-3
OfflinePeriodIndicatorTaskMain.java
...wanji/indicators/task/OfflinePeriodIndicatorTaskMain.java
+1
-1
OfflineTestIndicatorTaskMain.java
...m/wanji/indicators/task/OfflineTestIndicatorTaskMain.java
+2
-1
RidFreeSpeedCoFlatMap.java
...icators/task/area/service/func/RidFreeSpeedCoFlatMap.java
+20
-8
ParticipantAnalysisMidResultModel.java
...area/service/model/ParticipantAnalysisMidResultModel.java
+1
-0
RidTrafficStateMain.java
...nji/indicators/task/trafficstate/RidTrafficStateMain.java
+139
-0
RidMotorCarProcessWindow.java
...tors/task/trafficstate/func/RidMotorCarProcessWindow.java
+132
-0
RidProcessWindow.java
...i/indicators/task/trafficstate/func/RidProcessWindow.java
+81
-0
LaneTrafficStateResult.java
...ators/task/trafficstate/model/LaneTrafficStateResult.java
+1
-0
BaseLaneInfoMapper.xml
...omputing/src/main/resources/mapper/BaseLaneInfoMapper.xml
+3
-1
No files found.
wj-realtime-computing/src/main/java/com/wanji/indicators/constant/Constant.java
View file @
282152b1
...
...
@@ -13,6 +13,8 @@ public class Constant {
public
static
final
String
LANE_TRAFFIC_STATE
=
"LANE_TRAFFIC_STATE"
;
public
static
final
String
RID_TRAFFIC_STATE
=
"RID_TRAFFIC_STATE"
;
public
static
final
String
RID_REALTIME_INDICATOR
=
"RID_REALTIME_INDICATOR"
;
public
static
final
String
HASH_KEY_TRAVEL_DATA
=
"TRAVEL_DATA"
;
...
...
wj-realtime-computing/src/main/java/com/wanji/indicators/constant/RoadClassMap.java
0 → 100644
View file @
282152b1
package
com
.
wanji
.
indicators
.
constant
;
import
java.util.HashMap
;
import
java.util.Map
;
public
enum
RoadClassMap
{
GAOSU_GONGLU
(
41000
,
"高速公路"
),
GUOD_AO
(
42000
,
"国道"
),
SHENG_DAO
(
51000
,
"省道"
),
XIAN_DAO
(
52000
,
"县道"
),
XIANG_DAO
(
53000
,
"乡公路"
),
XIAN_XIANG_DAO
(
54000
,
"县乡内道路"
),
CITY_KUAISU_DAO
(
43000
,
"主要大街、城市快速道"
),
MAIN_DAO
(
44000
,
"主要道路"
),
CIYAO_DAO
(
45000
,
"次要道路"
),
PUTONG_DAO
(
47000
,
"普通道路"
),
NON_DAOHANG_DAO
(
49
,
"小路非导航道路"
);
private
Integer
type
;
private
String
desc
;
RoadClassMap
(
Integer
type
,
String
desc
)
{
this
.
type
=
type
;
this
.
desc
=
desc
;
}
public
Integer
getType
()
{
return
type
;
}
public
String
getDesc
()
{
return
desc
;
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/model/CarTrackModel.java
View file @
282152b1
...
...
@@ -75,6 +75,10 @@ public class CarTrackModel implements Serializable {
private
Double
freeSpeed
;
/*车道角度*/
private
Double
laneAngle
;
/*道路等级*/
private
int
ridLevel
;
/*路段长度*/
private
double
ridLength
;
}
/***********扩展属性*********************************/
...
...
wj-realtime-computing/src/main/java/com/wanji/indicators/service/BaseLaneInfoService.java
View file @
282152b1
...
...
@@ -28,5 +28,5 @@ public interface BaseLaneInfoService extends IService<BaseLaneInfo> {
public
double
getAreaFreeSpeed
(
String
crossList
,
Double
defaultFreeSpeed
);
public
Map
<
String
,
Double
>
getRidFreeSpeed
(
String
crossList
,
Double
defaultFreeSpeed
);
public
Map
<
String
,
Map
<
String
,
Double
>
>
getRidFreeSpeed
(
String
crossList
,
Double
defaultFreeSpeed
);
}
wj-realtime-computing/src/main/java/com/wanji/indicators/service/impl/BaseLaneInfoServiceImpl.java
View file @
282152b1
...
...
@@ -169,14 +169,14 @@ public class BaseLaneInfoServiceImpl extends ServiceImpl<BaseLaneInfoMapper, Bas
@Override
public
double
getAreaFreeSpeed
(
String
crossList
,
Double
defaultFreeSpeed
)
{
Map
<
String
,
Double
>
ridFreeSpeed
=
this
.
getRidFreeSpeed
(
crossList
,
defaultFreeSpeed
);
Collection
<
Double
>
collection
=
ridFreeSpeed
.
values
();
return
collection
.
stream
().
mapToDouble
(
o
->
o
).
summaryStatistics
().
getAverage
();
Map
<
String
,
Map
<
String
,
Double
>
>
ridFreeSpeed
=
this
.
getRidFreeSpeed
(
crossList
,
defaultFreeSpeed
);
Collection
<
Map
<
String
,
Double
>
>
collection
=
ridFreeSpeed
.
values
();
return
collection
.
stream
().
mapToDouble
(
o
->
o
.
get
(
"freeSpeed"
)
).
summaryStatistics
().
getAverage
();
}
@Override
public
Map
<
String
,
Double
>
getRidFreeSpeed
(
String
crossList
,
Double
defaultFreeSpeed
)
{
Map
<
String
,
Double
>
retmap
=
new
HashMap
<>();
public
Map
<
String
,
Map
<
String
,
Double
>
>
getRidFreeSpeed
(
String
crossList
,
Double
defaultFreeSpeed
)
{
Map
<
String
,
Map
<
String
,
Double
>
>
retmap
=
new
HashMap
<>();
String
[]
sps
=
crossList
.
split
(
","
);
Map
<
String
,
Object
>
params
=
new
HashMap
<>();
params
.
put
(
"crossList"
,
Arrays
.
asList
(
sps
));
...
...
@@ -188,10 +188,19 @@ public class BaseLaneInfoServiceImpl extends ServiceImpl<BaseLaneInfoMapper, Bas
String
fs
=
Tools
.
getMapValue
(
"free_speed"
,
map
);
fs
=
fs
==
null
?
defaultFreeSpeed
.
toString
()
:
fs
;
String
level
=
Tools
.
getMapValue
(
"level"
,
map
);
String
wkt
=
Tools
.
getMapValue
(
"wkt"
,
map
);
double
ridLength
=
GeomsConvertUtil
.
getSegmentTotalLeng
(
wkt
);
double
freeSpeed
=
Double
.
parseDouble
(
fs
);
String
key
=
crossId
+
Constant
.
MARK
+
rid
;
retmap
.
put
(
key
,
freeSpeed
);
String
key
=
rid
;
Map
<
String
,
Double
>
dataMap
=
new
HashMap
<>();
dataMap
.
put
(
"level"
,
Double
.
parseDouble
(
level
));
dataMap
.
put
(
"ridLength"
,
ridLength
);
dataMap
.
put
(
"freeSpeed"
,
freeSpeed
);
retmap
.
put
(
key
,
dataMap
);
}
return
retmap
;
}
...
...
wj-realtime-computing/src/main/java/com/wanji/indicators/source/AreaFreeSpeedSource.java
View file @
282152b1
...
...
@@ -18,7 +18,7 @@ import java.util.*;
* 自由流速度
*/
@Service
public
class
AreaFreeSpeedSource
extends
RichParallelSourceFunction
<
Map
<
String
,
Double
>>
{
public
class
AreaFreeSpeedSource
extends
RichParallelSourceFunction
<
Map
<
String
,
Map
<
String
,
Double
>
>>
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
AreaFreeSpeedSource
.
class
);
...
...
@@ -41,13 +41,13 @@ public class AreaFreeSpeedSource extends RichParallelSourceFunction<Map<String,
@Override
public
void
run
(
SourceContext
<
Map
<
String
,
Double
>>
sourceContext
)
throws
Exception
{
public
void
run
(
SourceContext
<
Map
<
String
,
Map
<
String
,
Double
>
>>
sourceContext
)
throws
Exception
{
try
{
String
crossList
=
properties
.
getProperty
(
"full.area.cross.list"
);
Double
defaultFreeSpeed
=
Double
.
parseDouble
(
properties
.
getProperty
(
"rid.default.free.speed"
));
Map
<
String
,
Double
>
ridFreeSpeed
=
baseLaneInfoService
.
getRidFreeSpeed
(
crossList
,
defaultFreeSpeed
);
Map
<
String
,
Map
<
String
,
Double
>
>
ridFreeSpeed
=
baseLaneInfoService
.
getRidFreeSpeed
(
crossList
,
defaultFreeSpeed
);
sourceContext
.
collect
(
ridFreeSpeed
);
}
catch
(
Exception
e
)
{
...
...
wj-realtime-computing/src/main/java/com/wanji/indicators/task/OfflinePeriodIndicatorTaskMain.java
View file @
282152b1
...
...
@@ -53,7 +53,7 @@ public class OfflinePeriodIndicatorTaskMain {
.
setParallelism
(
1
)
.
name
(
"车道周期数据-车道属性数据绑定"
);
LaneTrafficStateMain
.
init
(
env
,
outPath
,
tru
e
).
run
(
joinLaneStream
);
LaneTrafficStateMain
.
init
(
env
,
outPath
,
fals
e
).
run
(
joinLaneStream
);
try
{
env
.
execute
(
"离线调试测试"
);
}
catch
(
Exception
e
)
{
...
...
wj-realtime-computing/src/main/java/com/wanji/indicators/task/OfflineTestIndicatorTaskMain.java
View file @
282152b1
...
...
@@ -7,6 +7,7 @@ import com.wanji.indicators.task.area.service.AreaIndexAnalysisMainNew;
import
com.wanji.indicators.task.area.service.RidParticipantAnalysisMainNew
;
import
com.wanji.indicators.task.track.service.func.CarTrackFlatMap
;
import
com.wanji.indicators.task.track.service.func.FrameFlatMap
;
import
com.wanji.indicators.task.trafficstate.RidTrafficStateMain
;
import
com.wanji.indicators.util.CommonUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
...
...
@@ -54,7 +55,7 @@ public class OfflineTestIndicatorTaskMain {
.
setParallelism
(
1
)
.
name
(
"轨迹帧数据解析-ToCarTrackModel"
);
Rid
ParticipantAnalysisMainNew
.
init
(
env
,
outPath
,
tru
e
).
run
(
carTrackModelStream
);
Rid
TrafficStateMain
.
init
(
env
,
outPath
,
fals
e
).
run
(
carTrackModelStream
);
try
{
env
.
execute
(
"离线调试测试"
);
}
catch
(
Exception
e
)
{
...
...
wj-realtime-computing/src/main/java/com/wanji/indicators/task/area/service/func/RidFreeSpeedCoFlatMap.java
View file @
282152b1
package
com
.
wanji
.
indicators
.
task
.
area
.
service
.
func
;
import
com.wanji.indicators.constant.Constant
;
import
com.wanji.indicators.constant.RoadClassMap
;
import
com.wanji.indicators.entity.BaseLaneInfo
;
import
com.wanji.indicators.model.CarTrackModel
;
import
com.wanji.indicators.model.CrossFrameModel
;
...
...
@@ -8,26 +10,36 @@ import org.apache.flink.util.Collector;
import
java.util.*
;
public
class
RidFreeSpeedCoFlatMap
implements
CoFlatMapFunction
<
CarTrackModel
,
Map
<
String
,
Double
>,
CarTrackModel
>
{
public
class
RidFreeSpeedCoFlatMap
implements
CoFlatMapFunction
<
CarTrackModel
,
Map
<
String
,
Map
<
String
,
Double
>
>,
CarTrackModel
>
{
private
Map
<
String
,
Double
>
map
=
new
HashMap
();
private
Map
<
String
,
Map
<
String
,
Double
>
>
map
=
new
HashMap
();
@Override
public
void
flatMap1
(
CarTrackModel
carTrackModel
,
Collector
<
CarTrackModel
>
collector
)
throws
Exception
{
if
(
map
!=
null
)
{
if
(
map
!=
null
&&
!
map
.
isEmpty
()
)
{
String
rid
=
carTrackModel
.
getRoadnet
().
getRid
();
Double
freeSpeed
=
map
.
get
(
rid
);
Collection
<
Double
>
collection
=
map
.
values
();
double
areaFreeSpeed
=
collection
.
stream
().
mapToDouble
(
o
->
o
).
summaryStatistics
().
getAverage
();
String
crossId
=
carTrackModel
.
getRoadnet
().
getCrossId
();
String
key
=
rid
;
if
(!
map
.
containsKey
(
key
))
{
return
;
}
Double
freeSpeed
=
map
.
get
(
key
).
get
(
"freeSpeed"
);
Double
ridLevel
=
map
.
get
(
key
).
get
(
"level"
)
/*==null? RoadClassMap.CIYAO_DAO.getType():map.get(key).get("level")*/
;
Double
ridLength
=
map
.
get
(
key
).
get
(
"ridLength"
);
Collection
<
Map
<
String
,
Double
>>
collection
=
map
.
values
();
double
areaFreeSpeed
=
collection
.
stream
().
mapToDouble
(
o
->
o
.
get
(
"freeSpeed"
)).
summaryStatistics
().
getAverage
();
carTrackModel
.
setAreaFreeSpeed
(
areaFreeSpeed
);
carTrackModel
.
getRoadnet
().
setFreeSpeed
(
freeSpeed
);
carTrackModel
.
getRoadnet
().
setRidLevel
(
ridLevel
.
intValue
());
carTrackModel
.
getRoadnet
().
setRidLength
(
ridLength
);
collector
.
collect
(
carTrackModel
);
}
collector
.
collect
(
carTrackModel
);
}
@Override
public
void
flatMap2
(
Map
<
String
,
Double
>
hashMap
,
Collector
<
CarTrackModel
>
collector
)
throws
Exception
{
public
void
flatMap2
(
Map
<
String
,
Map
<
String
,
Double
>
>
hashMap
,
Collector
<
CarTrackModel
>
collector
)
throws
Exception
{
this
.
map
=
hashMap
;
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/area/service/model/ParticipantAnalysisMidResultModel.java
View file @
282152b1
...
...
@@ -27,6 +27,7 @@ public class ParticipantAnalysisMidResultModel implements Serializable {
private
Double
speed
=
0
D
;
//平均速度
private
Long
count
;
//参与者目标数量
private
Double
trafficIndex
;
private
String
trafficState
;
private
String
crossId
;
private
String
rid
;
...
...
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trafficstate/RidTrafficStateMain.java
0 → 100644
View file @
282152b1
package
com
.
wanji
.
indicators
.
task
.
trafficstate
;
import
com.alibaba.fastjson.JSONObject
;
import
com.wanji.indicators.config.RedisConfig
;
import
com.wanji.indicators.constant.Constant
;
import
com.wanji.indicators.constant.VehicleTypeEnum
;
import
com.wanji.indicators.model.CarTrackModel
;
import
com.wanji.indicators.sink.JsonStringFileSink
;
import
com.wanji.indicators.source.AreaFreeSpeedSource
;
import
com.wanji.indicators.task.area.service.func.RidCarProcessWindow
;
import
com.wanji.indicators.task.area.service.func.RidFreeSpeedCoFlatMap
;
import
com.wanji.indicators.task.area.service.func.RidOutProcessWindow
;
import
com.wanji.indicators.task.area.service.model.ParticipantAnalysisMidResultModel
;
import
com.wanji.indicators.task.area.service.model.ParticipantAnalysisOutputModel
;
import
com.wanji.indicators.task.trafficstate.func.LaneProcessWindow
;
import
com.wanji.indicators.task.trafficstate.func.RidMotorCarProcessWindow
;
import
com.wanji.indicators.task.trafficstate.model.LanePeriodData
;
import
com.wanji.indicators.task.trafficstate.model.LaneTrafficStateResult
;
import
com.wanji.indicators.task.trafficstate.sink.TrafficStateRedisMapper
;
import
com.wanji.indicators.util.PropertiesHelper
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks
;
import
org.apache.flink.api.common.eventtime.WatermarkStrategy
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction
;
import
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
;
import
org.apache.flink.streaming.api.windowing.time.Time
;
import
org.apache.flink.streaming.api.windowing.windows.TimeWindow
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
;
import
org.apache.flink.streaming.connectors.redis.RedisSink
;
import
org.apache.flink.util.Collector
;
import
java.io.Serializable
;
import
java.time.Duration
;
import
java.util.List
;
import
java.util.Properties
;
import
java.util.stream.Collectors
;
import
java.util.stream.StreamSupport
;
@Slf4j
public
class
RidTrafficStateMain
implements
Serializable
{
private
final
StreamExecutionEnvironment
env
;
private
final
PropertiesHelper
instance
;
private
Properties
p
=
new
Properties
();
private
String
path
;
private
int
windowTime
=
10
;
//本地调试设为true输出结果保存到文件
private
boolean
isDebug
=
false
;
private
RidTrafficStateMain
(
StreamExecutionEnvironment
env
,
String
path
,
boolean
isDebug
)
{
this
.
env
=
env
;
this
.
instance
=
PropertiesHelper
.
getInstance
();
this
.
p
=
instance
.
getProperties
();
this
.
path
=
path
;
this
.
isDebug
=
isDebug
;
}
public
static
RidTrafficStateMain
init
(
StreamExecutionEnvironment
env
,
String
path
,
boolean
isDebug
)
{
return
new
RidTrafficStateMain
(
env
,
path
,
isDebug
);
}
public
void
run
(
SingleOutputStreamOperator
<
CarTrackModel
>
filterStream
)
{
try
{
SingleOutputStreamOperator
<
CarTrackModel
>
joinLaneStream
=
filterStream
.
connect
(
env
.
addSource
(
new
AreaFreeSpeedSource
()).
broadcast
())
.
flatMap
(
new
RidFreeSpeedCoFlatMap
())
.
setParallelism
(
1
)
.
name
(
"路段交通路况-车道属性数据绑定"
);
SingleOutputStreamOperator
<
CarTrackModel
>
motorStream
=
filterStream
.
filter
(
o
->
VehicleTypeEnum
.
isMotorVehicles
(
o
.
getOriginalType
().
shortValue
()))
.
name
(
"路段交通路况-筛选机动车数据"
);
SingleOutputStreamOperator
<
LaneTrafficStateResult
>
groupByRidStream
=
motorStream
.
assignTimestampsAndWatermarks
(
WatermarkStrategy
.<
CarTrackModel
>
forBoundedOutOfOrderness
(
Duration
.
ofSeconds
(
5
))
.
withTimestampAssigner
((
event
,
timestamp
)
->
event
.
getGlobalTimeStamp
()))
.
keyBy
(
o
->(
o
.
getRoadnet
().
getRid
()))
.
window
(
SlidingEventTimeWindows
.
of
(
Time
.
minutes
(
10
),
Time
.
minutes
(
1
)))
.
process
(
new
RidMotorCarProcessWindow
())
.
name
(
"路段交通路况-按RID分组车辆"
);
SingleOutputStreamOperator
<
List
<
LaneTrafficStateResult
>>
listStream
=
groupByRidStream
.
keyBy
(
o
->
o
.
getId
()+
"_"
+
o
.
getStartTime
())
.
windowAll
(
TumblingEventTimeWindows
.
of
(
Time
.
seconds
(
1
)))
.
process
(
new
ProcessAllWindowFunction
<
LaneTrafficStateResult
,
List
<
LaneTrafficStateResult
>,
TimeWindow
>()
{
@Override
public
void
process
(
Context
context
,
Iterable
<
LaneTrafficStateResult
>
elements
,
Collector
<
List
<
LaneTrafficStateResult
>>
out
)
throws
Exception
{
List
<
LaneTrafficStateResult
>
list
=
StreamSupport
.
stream
(
elements
.
spliterator
(),
false
).
collect
(
Collectors
.
toList
());
out
.
collect
(
list
);
}
})
.
name
(
"路段路况计算-RID路况数据集合封装"
);
SingleOutputStreamOperator
<
JSONObject
>
jsonObjectStream
=
listStream
.
flatMap
(
new
FlatMapFunction
<
List
<
LaneTrafficStateResult
>,
JSONObject
>()
{
@Override
public
void
flatMap
(
List
<
LaneTrafficStateResult
>
value
,
Collector
<
JSONObject
>
out
)
throws
Exception
{
JSONObject
jsonObject
=
new
JSONObject
();
jsonObject
.
put
(
"list"
,
value
);
jsonObject
.
put
(
"id"
,
value
.
get
(
0
).
getId
());
log
.
info
(
"路段路况集合:{}"
,
jsonObject
);
out
.
collect
(
jsonObject
);
}
}).
name
(
"路段路况计算-车道路况集合转JsonObject"
);
SingleOutputStreamOperator
<
String
>
toJsonStream
=
listStream
.
flatMap
(
new
FlatMapFunction
<
List
<
LaneTrafficStateResult
>,
String
>()
{
@Override
public
void
flatMap
(
List
<
LaneTrafficStateResult
>
value
,
Collector
<
String
>
out
)
throws
Exception
{
out
.
collect
(
JSONObject
.
toJSONString
(
value
));
}
}).
name
(
"路段路况计算-转换为json"
);
if
(!
isDebug
)
{
//存储到redis
jsonObjectStream
.
addSink
(
new
RedisSink
<
JSONObject
>(
new
RedisConfig
().
getRedisConfig
(),
new
TrafficStateRedisMapper
(
Constant
.
RID_TRAFFIC_STATE
)));
}
else
{
String
path
=
this
.
path
+
"rid.traffic.state"
;
toJsonStream
.
addSink
(
new
JsonStringFileSink
(
path
)).
setParallelism
(
1
).
name
(
"路段路况计算-结果输出至文件"
);
}
}
catch
(
Exception
e
)
{
log
.
error
(
"路段路况计算"
,
e
);
}
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trafficstate/func/RidMotorCarProcessWindow.java
0 → 100644
View file @
282152b1
package
com
.
wanji
.
indicators
.
task
.
trafficstate
.
func
;
import
com.wanji.indicators.constant.CongestEnum
;
import
com.wanji.indicators.constant.RoadClassMap
;
import
com.wanji.indicators.model.CarTrackModel
;
import
com.wanji.indicators.task.area.service.model.ParticipantAnalysisMidResultModel
;
import
com.wanji.indicators.task.trafficstate.model.LaneTrafficStateResult
;
import
com.wanji.indicators.util.ArithOfBigDecmial
;
import
com.wanji.indicators.util.CommonUtil
;
import
com.wanji.indicators.util.DateUtil
;
import
com.wanji.indicators.util.PropertiesHelper
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
;
import
org.apache.flink.streaming.api.windowing.windows.TimeWindow
;
import
org.apache.flink.util.Collector
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.math.BigDecimal
;
import
java.util.*
;
import
java.util.stream.Collectors
;
import
java.util.stream.StreamSupport
;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
public
class
RidMotorCarProcessWindow
extends
ProcessWindowFunction
<
CarTrackModel
,
LaneTrafficStateResult
,
String
,
TimeWindow
>
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
RidMotorCarProcessWindow
.
class
);
private
Properties
properties
=
new
Properties
();
public
void
open
(
Configuration
parameters
)
throws
Exception
{
PropertiesHelper
instance
=
PropertiesHelper
.
getInstance
();
this
.
properties
=
instance
.
getProperties
();
}
@Override
public
void
process
(
String
key
,
Context
context
,
Iterable
<
CarTrackModel
>
elements
,
Collector
<
LaneTrafficStateResult
>
out
)
throws
Exception
{
long
windowStartTs
=
context
.
window
().
getStart
();
long
windowEndTs
=
context
.
window
().
getEnd
();
List
<
CarTrackModel
>
list
=
StreamSupport
.
stream
(
elements
.
spliterator
(),
false
).
collect
(
Collectors
.
toList
());
long
count
=
list
.
stream
().
map
(
CarTrackModel:
:
getId
).
distinct
().
count
();
double
speed
=
0
;
//getFrameAvgSpeed(list);
double
totalSpeed
=
0
;
int
size
=
0
;
List
<
Double
>
timeList
=
new
ArrayList
<>();
Map
<
String
,
List
<
CarTrackModel
>>
groupByPlateNo
=
list
.
stream
().
filter
(
o
->
Objects
.
nonNull
(
o
.
getId
())).
collect
(
Collectors
.
groupingBy
(
o
->
o
.
getId
().
toString
()));
for
(
Map
.
Entry
<
String
,
List
<
CarTrackModel
>>
entry
:
groupByPlateNo
.
entrySet
())
{
List
<
CarTrackModel
>
carList
=
entry
.
getValue
();
if
(
carList
.
size
()
>
1
)
{
carList
=
carList
.
stream
().
sorted
(
Comparator
.
comparing
(
o
->
o
.
getGlobalTimeStamp
())).
collect
(
Collectors
.
toList
());
double
distance
=
CommonUtil
.
getTravelDistance
(
carList
);
//秒
double
travelTime
=
(
carList
.
get
(
carList
.
size
()
-
1
).
getGlobalTimeStamp
()
-
carList
.
get
(
0
).
getGlobalTimeStamp
())
/
1000
;
if
(
travelTime
>
0
)
{
double
travelSpeed
=
(
distance
/
1000
)
/
(
travelTime
/
60
/
60.0
);
totalSpeed
+=
travelSpeed
;
size
++;
timeList
.
add
(
travelTime
);
}
}
}
double
travelTime
=
this
.
getFrameAvgSpeed
(
timeList
);
Double
freeSpeed
=
list
.
get
(
0
).
getRoadnet
().
getFreeSpeed
();
if
(
Objects
.
isNull
(
freeSpeed
))
{
freeSpeed
=
Double
.
parseDouble
(
this
.
properties
.
getProperty
(
"rid.default.free.speed"
));
}
double
ridLength
=
list
.
get
(
0
).
getRoadnet
().
getRidLength
()/
1000
;
int
ridLevel
=
list
.
get
(
0
).
getRoadnet
().
getRidLevel
();
String
trafficState
=
"1"
;
double
freeTime
=
ridLength
/
freeSpeed
;
double
trafficIndex
=
travelTime
/
freeTime
;
//主要道路
if
(
ridLevel
==
RoadClassMap
.
MAIN_DAO
.
getType
())
{
if
(
trafficIndex
<=
1.8
)
{
trafficState
=
CongestEnum
.
NO_CONGEST
.
getType
();
}
else
if
(
trafficIndex
>
1.8
&&
trafficIndex
<=
2.5
)
{
trafficState
=
CongestEnum
.
LIGHT_CONGEST
.
getType
();
}
else
if
(
trafficIndex
>
2.5
&&
trafficIndex
<=
3.5
)
{
trafficState
=
CongestEnum
.
MODERATE_CONGEST
.
getType
();
}
else
if
(
trafficIndex
>
3.5
)
{
trafficState
=
CongestEnum
.
HEAVY_CONGEST
.
getType
();
}
}
else
if
(
ridLevel
==
RoadClassMap
.
CIYAO_DAO
.
getType
())
{
//次要道路
if
(
trafficIndex
<=
1.5
)
{
trafficState
=
CongestEnum
.
NO_CONGEST
.
getType
();
}
else
if
(
trafficIndex
>
1.5
&&
trafficIndex
<=
2
)
{
trafficState
=
CongestEnum
.
LIGHT_CONGEST
.
getType
();
}
else
if
(
trafficIndex
>
2
&&
trafficIndex
<=
3
)
{
trafficState
=
CongestEnum
.
MODERATE_CONGEST
.
getType
();
}
else
if
(
trafficIndex
>
3
)
{
trafficState
=
CongestEnum
.
HEAVY_CONGEST
.
getType
();
}
}
LaneTrafficStateResult
outModel
=
new
LaneTrafficStateResult
();
outModel
.
setRid
(
key
);
outModel
.
setTrafficIndex
(
trafficIndex
);
outModel
.
setTrafficState
(
Integer
.
valueOf
(
trafficState
));
outModel
.
setStartTimestamp
(
windowStartTs
);
outModel
.
setEndTimestamp
(
windowEndTs
);
outModel
.
setStartTime
(
DateUtil
.
toDateTime
(
windowStartTs
,
"yyyy-MM-dd HH:mm:ss"
));
outModel
.
setEndTime
(
DateUtil
.
toDateTime
(
windowEndTs
,
"yyyy-MM-dd HH:mm:ss"
));
outModel
.
setRidLength
(
ridLength
);
out
.
collect
(
outModel
);
}
private
int
getFrameAvgSpeed
(
List
<
Double
>
list
)
{
//当前帧机动车平均速度,所有速度大于5km/h的速度进行排序,去掉低速10%,然后进行平均
List
<
Double
>
speedList
=
list
.
stream
()
.
filter
(
speed
->
speed
>
0
).
distinct
()
.
sorted
(
Comparator
.
comparing
(
Double:
:
doubleValue
))
.
collect
(
Collectors
.
toList
());
int
startIndex
=
new
BigDecimal
(
speedList
.
size
()
*
0.1
).
intValue
();
int
endIndex
=
(
int
)
ArithOfBigDecmial
.
round
(
speedList
.
size
()
*
0.9
,
0
);
List
<
Double
>
subList
=
speedList
.
subList
(
startIndex
,
endIndex
);
double
avgSpeed
=
subList
.
stream
().
collect
(
Collectors
.
averagingDouble
(
Double:
:
doubleValue
));
return
(
int
)
ArithOfBigDecmial
.
round
(
avgSpeed
,
0
);
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trafficstate/func/RidProcessWindow.java
0 → 100644
View file @
282152b1
package
com
.
wanji
.
indicators
.
task
.
trafficstate
.
func
;
/**
* @author fengyi
* @date 2023/3/1
* @description
*/
import
com.wanji.indicators.constant.CongestEnum
;
import
com.wanji.indicators.task.trafficstate.model.LanePeriodData
;
import
com.wanji.indicators.task.trafficstate.model.LaneTrafficStateResult
;
import
com.wanji.indicators.util.DateUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
;
import
org.apache.flink.streaming.api.windowing.windows.TimeWindow
;
import
org.apache.flink.util.Collector
;
import
java.util.List
;
import
java.util.stream.Collectors
;
import
java.util.stream.StreamSupport
;
@Slf4j
public
class
RidProcessWindow
extends
ProcessWindowFunction
<
LanePeriodData
,
LaneTrafficStateResult
,
String
,
TimeWindow
>
{
@Override
public
void
process
(
String
s
,
Context
context
,
Iterable
<
LanePeriodData
>
elements
,
Collector
<
LaneTrafficStateResult
>
out
)
throws
Exception
{
long
windowStartTs
=
context
.
window
().
getStart
();
long
windowEndTs
=
context
.
window
().
getEnd
();
List
<
LanePeriodData
>
list
=
StreamSupport
.
stream
(
elements
.
spliterator
(),
false
).
collect
(
Collectors
.
toList
());
double
avgQueueLengthOfGreenStart
=
list
.
stream
().
mapToDouble
(
LanePeriodData:
:
getLightGreenStartQueueLength
).
average
().
getAsDouble
();
double
laneLength
=
list
.
get
(
0
).
getLaneLength
();
double
laneIndex
=
avgQueueLengthOfGreenStart
/
laneLength
;
double
avgStopNumber
=
list
.
stream
().
mapToDouble
(
LanePeriodData:
:
getMeanStopsNumber
).
average
().
getAsDouble
();
LaneTrafficStateResult
result
=
new
LaneTrafficStateResult
();
result
.
setNowTimeStamp
(
context
.
currentProcessingTime
());
result
.
setNowDateTime
(
DateUtil
.
toDateTime
(
result
.
getNowTimeStamp
(),
DateUtil
.
YYYY_MM_DD_HH_MM_SS
));
result
.
setRid
(
list
.
get
(
0
).
getRid
());
result
.
setLaneId
(
list
.
get
(
0
).
getLaneId
());
result
.
setLaneLength
(
laneLength
);
result
.
setRidLength
(
list
.
get
(
0
).
getRidLength
());
result
.
setAvgStopNum
(
avgStopNumber
);
result
.
setLaneIndex
(
laneIndex
);
result
.
setStartTimestamp
(
windowStartTs
);
result
.
setEndTimestamp
(
windowEndTs
);
result
.
setStartTime
(
DateUtil
.
toDateTime
(
windowStartTs
,
DateUtil
.
YYYY_MM_DD_HH_MM_SS
));
result
.
setEndTime
(
DateUtil
.
toDateTime
(
windowEndTs
,
DateUtil
.
YYYY_MM_DD_HH_MM_SS
));
result
.
setTrafficState
(
Integer
.
valueOf
(
this
.
getTrafficState
(
laneIndex
,
avgStopNumber
)));
log
.
info
(
"车道交通状态信息:{}"
,
result
.
toString
());
out
.
collect
(
result
);
}
/**
* 计算交通状态
* @param laneIndex
* @param avgStopNumber
* @return
*/
private
String
getTrafficState
(
double
laneIndex
,
double
avgStopNumber
)
{
if
(
laneIndex
<=
0.9
||
avgStopNumber
<=
0.8
)
{
return
CongestEnum
.
NO_CONGEST
.
getType
();
}
else
if
(
laneIndex
>
0.9
){
if
(
avgStopNumber
>
0.8
&&
avgStopNumber
<=
1.5
)
{
return
CongestEnum
.
LIGHT_CONGEST
.
getType
();
}
else
if
(
avgStopNumber
>
1.5
&&
avgStopNumber
<=
2
)
{
return
CongestEnum
.
MODERATE_CONGEST
.
getType
();
}
else
if
(
avgStopNumber
>
2
)
{
return
CongestEnum
.
HEAVY_CONGEST
.
getType
();
}
}
return
CongestEnum
.
NO_CONGEST
.
getType
();
}
}
wj-realtime-computing/src/main/java/com/wanji/indicators/task/trafficstate/model/LaneTrafficStateResult.java
View file @
282152b1
...
...
@@ -31,5 +31,6 @@ public class LaneTrafficStateResult implements Serializable {
private
Double
laneIndex
;
private
Double
avgStopNum
;
private
int
trafficState
;
private
Double
trafficIndex
;
}
wj-realtime-computing/src/main/resources/mapper/BaseLaneInfoMapper.xml
View file @
282152b1
...
...
@@ -204,7 +204,9 @@
SELECT
a.end_cross_id cross_id,
a.id AS rid,
max(b.free_speed) free_speed
max(b.free_speed) free_speed,
a.level,
a.wkt
FROM
t_base_rid_info a
LEFT JOIN ( SELECT cross_id, rid, free_speed FROM t_cross_rid_freespeed WHERE window_start_time >= DATE_SUB( DATE_FORMAT( NOW(), '%Y-%m-%d' ), INTERVAL 7 DAY ) ) b ON a.id = b.rid
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment