Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
T
traffic-signal-platform
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
signal
traffic-signal-platform
Commits
eb220b3d
Commit
eb220b3d
authored
May 26, 2025
by
duwei
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
优化路口灯态倒计时,用kafka消费
parent
ea29bad8
Changes
13
Show whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
447 additions
and
10 deletions
+447
-10
application-demo.properties
...gn-service/src/main/resources/application-demo.properties
+4
-0
bootstrap-demo.yaml
...l-optimize-service/src/main/resources/bootstrap-demo.yaml
+15
-0
bootstrap.yaml
signal-optimize-service/src/main/resources/bootstrap.yaml
+2
-1
pom.xml
signal-utc-service/pom.xml
+5
-0
RedisConfig.java
...rvice/src/main/java/net/wanji/utc/config/RedisConfig.java
+5
-1
Intersection.java
...rc/main/java/net/wanji/utc/dto/twinspat/Intersection.java
+15
-0
Phase.java
...rvice/src/main/java/net/wanji/utc/dto/twinspat/Phase.java
+26
-0
WHSpatData.java
.../src/main/java/net/wanji/utc/dto/twinspat/WHSpatData.java
+28
-0
SignalStatusTask.java
...ce/src/main/java/net/wanji/utc/task/SignalStatusTask.java
+231
-7
DirectionConverter.java
.../src/main/java/net/wanji/utc/util/DirectionConverter.java
+53
-0
TurnDirectionMapper.java
...src/main/java/net/wanji/utc/util/TurnDirectionMapper.java
+46
-0
bootstrap-demo.yaml
signal-utc-service/src/main/resources/bootstrap-demo.yaml
+15
-0
bootstrap.yaml
signal-utc-service/src/main/resources/bootstrap.yaml
+2
-1
No files found.
signal-feign-service/src/main/resources/application-demo.properties
0 → 100644
View file @
eb220b3d
utc.service.url
=
http://192.168.208.43:32000/utc
utc.dt.service.url
=
http://192.168.208.43:39002/utc-dt
utc.hisense.service.url
=
http://192.168.208.43:39003/utc-hisense
control.url
=
http://192.168.208.43:32001/web
signal-optimize-service/src/main/resources/bootstrap-demo.yaml
0 → 100644
View file @
eb220b3d
spring
:
cloud
:
nacos
:
config
:
server-addr
:
10.102.1.163:8848
file-extension
:
yaml
group
:
signal
namespace
:
signal
username
:
nacos
password
:
Wgzonal0sowQjm9
application
:
# dubbo启动需要程序名称
name
:
signal-optimize-service
main
:
allow-circular-references
:
true
signal-optimize-service/src/main/resources/bootstrap.yaml
View file @
eb220b3d
spring
:
spring
:
profiles
:
profiles
:
active
:
docker
#docker是武汉环境,demo是北京环境
\ No newline at end of file
active
:
demo
\ No newline at end of file
signal-utc-service/pom.xml
View file @
eb220b3d
...
@@ -167,6 +167,11 @@
...
@@ -167,6 +167,11 @@
<groupId>
javax.el
</groupId>
<groupId>
javax.el
</groupId>
<artifactId>
javax.el-api
</artifactId>
<artifactId>
javax.el-api
</artifactId>
</dependency>
</dependency>
<dependency>
<groupId>
net.jodah
</groupId>
<artifactId>
expiringmap
</artifactId>
<version>
0.5.10
</version>
</dependency>
</dependencies>
</dependencies>
<build>
<build>
...
...
signal-utc-service/src/main/java/net/wanji/utc/config/RedisConfig.java
View file @
eb220b3d
...
@@ -119,7 +119,11 @@ public class RedisConfig extends CachingConfigurerSupport {
...
@@ -119,7 +119,11 @@ public class RedisConfig extends CachingConfigurerSupport {
@Bean
@Bean
public
Jedis
jedis
()
{
public
Jedis
jedis
()
{
Jedis
jedis
=
new
Jedis
(
redisHost
,
redisPort
);
Jedis
jedis
=
new
Jedis
(
redisHost
,
redisPort
);
if
(
password
!=
null
&&
!
""
.
equals
(
password
))
{
jedis
.
auth
(
password
);
}
else
{
jedis
.
auth
(
"Wanji300552"
);
jedis
.
auth
(
"Wanji300552"
);
}
return
jedis
;
return
jedis
;
}
}
...
...
signal-utc-service/src/main/java/net/wanji/utc/dto/twinspat/Intersection.java
0 → 100644
View file @
eb220b3d
package
net
.
wanji
.
utc
.
dto
.
twinspat
;
import
lombok.Data
;
import
java.util.List
;
/**
* 路口信息
*/
@Data
public
class
Intersection
{
// 路口id (这个id需要与库表中路口id要映射)
private
int
nodeId
;
private
List
<
Phase
>
phases
;
}
\ No newline at end of file
signal-utc-service/src/main/java/net/wanji/utc/dto/twinspat/Phase.java
0 → 100644
View file @
eb220b3d
package
net
.
wanji
.
utc
.
dto
.
twinspat
;
import
lombok.Data
;
/**
* 相位信息
* 解释名词:
* 相位是指在一个信号灯周期中,某一组特定方向的交通流获得通行权(如绿灯)的状态。
* “相位”就是信号灯在某个时间段内允许某些方向车辆通行的一种状态。
* “当前相位”是指当前正在运行的信号灯状态,决定了哪些方向可以通行、哪些需要等待。
*
*/
@Data
public
class
Phase
{
private
String
id
;
// 唯一值 phaseId + "_" + laneId
private
int
phaseId
;
// 相位ID [1-16], 控制一个灯组
private
int
laneId
;
// 车道编号
private
int
state
;
// 转向(灯组状态 [参考表E.3])
private
int
importDirect
;
// 进口方向 [参考表E.2]
private
int
turn
;
// 车道转向
private
int
light
;
// 信号灯颜色 [0..8]
private
int
limitEndTime
;
// 信号灯倒计时, 单位:秒
private
String
log
;
// 经度
private
String
lat
;
// 纬度
private
float
angle
;
// 信号灯角度
}
\ No newline at end of file
signal-utc-service/src/main/java/net/wanji/utc/dto/twinspat/WHSpatData.java
0 → 100644
View file @
eb220b3d
package
net
.
wanji
.
utc
.
dto
.
twinspat
;
import
com.alibaba.fastjson.JSONObject
;
import
lombok.Data
;
import
java.util.List
;
/**
* 孪生专用信号灯数据
*/
@Data
public
class
WHSpatData
{
private
String
rsuId
;
//要配映射路口id
private
int
msgCnt
;
private
long
timestamp
;
private
String
uuid
;
private
String
msgType
;
/**
* 路口列表
*/
private
List
<
Intersection
>
intersections
;
public
static
void
main
(
String
[]
args
)
{
String
json
=
"{\"intersections\":[{\"nodeId\":15,\"phases\":[{\"angle\":261.0,\"id\":\"4_1\",\"importDirect\":0,\"laneId\":1,\"lat\":\"30.4518233\",\"light\":3,\"limitEndTime\":28,\"log\":\"114.0926314\",\"phaseId\":4,\"state\":4},{\"angle\":261.0,\"id\":\"4_2\",\"importDirect\":0,\"laneId\":2,\"lat\":\"30.4518180\",\"light\":3,\"limitEndTime\":28,\"log\":\"114.0925986\",\"phaseId\":4,\"state\":4},{\"angle\":348.0,\"id\":\"66_1\",\"importDirect\":4,\"laneId\":1,\"lat\":\"30.4516888\",\"light\":3,\"limitEndTime\":5,\"log\":\"114.0930230\",\"phaseId\":66,\"state\":2},{\"angle\":348.0,\"id\":\"65_3\",\"importDirect\":4,\"laneId\":3,\"lat\":\"30.4517495\",\"light\":4,\"limitEndTime\":2,\"log\":\"114.0930097\",\"phaseId\":65,\"state\":1},{\"angle\":348.0,\"id\":\"65_2\",\"importDirect\":4,\"laneId\":2,\"lat\":\"30.4517189\",\"light\":4,\"limitEndTime\":2,\"log\":\"114.0930162\",\"phaseId\":65,\"state\":1},{\"angle\":91.0,\"id\":\"132_1\",\"importDirect\":8,\"laneId\":1,\"lat\":\"30.4511704\",\"light\":3,\"limitEndTime\":28,\"log\":\"114.0927570\",\"phaseId\":132,\"state\":4},{\"angle\":91.0,\"id\":\"132_2\",\"importDirect\":8,\"laneId\":2,\"lat\":\"30.4511697\",\"light\":3,\"limitEndTime\":28,\"log\":\"114.0927937\",\"phaseId\":132,\"state\":4},{\"angle\":169.0,\"id\":\"194_1\",\"importDirect\":12,\"laneId\":1,\"lat\":\"30.4515106\",\"light\":3,\"limitEndTime\":5,\"log\":\"114.0924748\",\"phaseId\":194,\"state\":2},{\"angle\":169.0,\"id\":\"193_2\",\"importDirect\":12,\"laneId\":2,\"lat\":\"30.4514799\",\"light\":4,\"limitEndTime\":2,\"log\":\"114.0924808\",\"phaseId\":193,\"state\":1},{\"angle\":170.0,\"id\":\"193_3\",\"importDirect\":12,\"laneId\":3,\"lat\":\"30.4514491\",\"light\":4,\"limitEndTime\":2,\"log\":\"114.0924865\",\"phaseId\":193,\"state\":1}]}],\"msgCnt\":92,\"msgType\":\"spat\",\"rsuId\":\"R-081507\",\"timestamp\":1746685493855,\"uuid\":\"R-081507_1746685493855\"}"
;
WHSpatData
whSpatData
=
JSONObject
.
parseObject
(
json
,
WHSpatData
.
class
);
System
.
out
.
println
(
whSpatData
);
}
}
signal-utc-service/src/main/java/net/wanji/utc/task/SignalStatusTask.java
View file @
eb220b3d
package
net
.
wanji
.
utc
.
task
;
package
net
.
wanji
.
utc
.
task
;
import
com.alibaba.fastjson.JSONObject
;
import
com.fasterxml.jackson.core.type.TypeReference
;
import
com.fasterxml.jackson.core.type.TypeReference
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.google.common.collect.Maps
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
net.jodah.expiringmap.ExpirationPolicy
;
import
net.jodah.expiringmap.ExpiringMap
;
import
net.wanji.common.framework.rest.JsonViewObject
;
import
net.wanji.common.framework.rest.JsonViewObject
;
import
net.wanji.common.utils.tool.JacksonUtils
;
import
net.wanji.common.utils.tool.JacksonUtils
;
import
net.wanji.databus.dao.entity.CrossLightsStatusHistPO
;
import
net.wanji.databus.dao.entity.CrossLightsStatusHistPO
;
...
@@ -17,11 +21,17 @@ import net.wanji.feign.service.UtcDTFeignClients;
...
@@ -17,11 +21,17 @@ import net.wanji.feign.service.UtcDTFeignClients;
import
net.wanji.feign.service.UtcHisenseFeignClients
;
import
net.wanji.feign.service.UtcHisenseFeignClients
;
import
net.wanji.utc.common.constant.Constants
;
import
net.wanji.utc.common.constant.Constants
;
import
net.wanji.utc.common.typeenum.BasicEnum
;
import
net.wanji.utc.common.typeenum.BasicEnum
;
import
net.wanji.utc.dto.twinspat.Intersection
;
import
net.wanji.utc.dto.twinspat.Phase
;
import
net.wanji.utc.dto.twinspat.WHSpatData
;
import
net.wanji.utc.kafka.ProducerHandler
;
import
net.wanji.utc.kafka.ProducerHandler
;
import
net.wanji.utc.service.runninginfo.HkLightsStatusService
;
import
net.wanji.utc.service.runninginfo.HkLightsStatusService
;
import
net.wanji.utc.service.runninginfo.SignalStatusService
;
import
net.wanji.utc.service.runninginfo.SignalStatusService
;
import
net.wanji.utc.util.DirectionConverter
;
import
net.wanji.utc.util.TurnDirectionMapper
;
import
net.wanji.utc.websocket.RealTimeDataWebSocket
;
import
net.wanji.utc.websocket.RealTimeDataWebSocket
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.kafka.clients.consumer.ConsumerRecord
;
import
org.elasticsearch.action.index.IndexRequest
;
import
org.elasticsearch.action.index.IndexRequest
;
import
org.elasticsearch.action.index.IndexResponse
;
import
org.elasticsearch.action.index.IndexResponse
;
import
org.elasticsearch.client.RequestOptions
;
import
org.elasticsearch.client.RequestOptions
;
...
@@ -33,6 +43,8 @@ import org.elasticsearch.common.xcontent.XContentType;
...
@@ -33,6 +43,8 @@ import org.elasticsearch.common.xcontent.XContentType;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Qualifier
;
import
org.springframework.beans.factory.annotation.Qualifier
;
import
org.springframework.kafka.annotation.KafkaListener
;
import
org.springframework.kafka.support.Acknowledgment
;
import
org.springframework.scheduling.annotation.Scheduled
;
import
org.springframework.scheduling.annotation.Scheduled
;
import
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
;
import
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
...
@@ -43,6 +55,8 @@ import java.time.LocalDateTime;
...
@@ -43,6 +55,8 @@ import java.time.LocalDateTime;
import
java.time.ZoneOffset
;
import
java.time.ZoneOffset
;
import
java.util.*
;
import
java.util.*
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicInteger
;
/**
/**
* 订阅信号运行状态、告警、灯态
* 订阅信号运行状态、告警、灯态
...
@@ -65,7 +79,7 @@ public class SignalStatusTask {
...
@@ -65,7 +79,7 @@ public class SignalStatusTask {
private
ManufacturerInfoMapper
manufacturerInfoMapper
;
private
ManufacturerInfoMapper
manufacturerInfoMapper
;
@Resource
@Resource
private
CrossInfoMapper
crossInfoMapper
;
private
CrossInfoMapper
crossInfoMapper
;
@Resource
//
@Resource
private
UtcDTFeignClients
utcDTFeignClients
;
private
UtcDTFeignClients
utcDTFeignClients
;
@Resource
@Resource
private
CrossLightsStatusHistMapper
crossLightsStatusHistMapper
;
private
CrossLightsStatusHistMapper
crossLightsStatusHistMapper
;
...
@@ -81,30 +95,99 @@ public class SignalStatusTask {
...
@@ -81,30 +95,99 @@ public class SignalStatusTask {
private
static
final
ConcurrentHashMap
<
String
,
List
<
LightsStatusVO2
>>
produceListMap
=
new
ConcurrentHashMap
<>();
private
static
final
ConcurrentHashMap
<
String
,
List
<
LightsStatusVO2
>>
produceListMap
=
new
ConcurrentHashMap
<>();
//路口信息过期map<路口id,路口信息>
public
static
final
ExpiringMap
<
String
,
CrossInfoPO
>
crossCacheMap
=
ExpiringMap
.
builder
()
.
expiration
(
1
,
TimeUnit
.
MINUTES
)
.
maxSize
(
100
)
.
variableExpiration
()
.
expirationPolicy
(
ExpirationPolicy
.
CREATED
)
//过期监听
.
asyncExpirationListener
((
rcuIdStr
,
value
)
->
{
}).
build
();
//信号灯倒计时缓存map<crossid_相位id, 倒计时>
public
static
final
ExpiringMap
<
String
,
Integer
>
phaseCountdownCacheMap
=
ExpiringMap
.
builder
()
.
expiration
(
60
*
24
,
TimeUnit
.
MINUTES
)
.
maxSize
(
100
)
.
variableExpiration
()
.
expirationPolicy
(
ExpirationPolicy
.
CREATED
)
//过期监听
.
asyncExpirationListener
((
rcuIdStr
,
value
)
->
{
}).
build
();
// 运行状态、告警,1分钟一次
// 运行状态、告警,1分钟一次
@Scheduled
(
fixedRate
=
60
*
1000
)
@Scheduled
(
fixedRate
=
60
*
1000
)
public
void
runningStatusAlarm
()
throws
Exception
{
public
void
runningStatusAlarm
()
throws
Exception
{
signalStatusService
.
runningStatusAlarm
();
signalStatusService
.
runningStatusAlarm
();
}
}
/**
* 消费孪生灯态kafka数据
*/
@KafkaListener
(
topics
=
{
"WHSpatData"
},
groupId
=
"utc-whspat-consumer-01"
)
public
void
consumeTwinSpat
(
ConsumerRecord
<
Object
,
Object
>
record
,
Acknowledgment
acknowledgment
)
{
try
{
String
lightStatusJson
=
String
.
valueOf
(
record
.
value
());
WHSpatData
light
=
JSONObject
.
parseObject
(
lightStatusJson
,
WHSpatData
.
class
);
if
(
Objects
.
nonNull
(
light
))
{
//--------------------------------------------------------------------------
ConcurrentHashMap
<
String
,
RealTimeDataWebSocket
>
evenWarnWebSocketMap
=
RealTimeDataWebSocket
.
getEvenWarnWebSocketMap
();
if
(!
evenWarnWebSocketMap
.
isEmpty
())
{
//1. 按路口分组后的websocket连接列表:map<crossId, List<Map.Entry<String, RealTimeDataWebSocket>>>
Map
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
crossEntryMap
=
getCrossEntryMap
(
evenWarnWebSocketMap
);
//2. 循环每一个路口,并且循环每一个websocket连接,发送灯态数据
for
(
Map
.
Entry
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
entry
:
crossEntryMap
.
entrySet
())
{
ObjectMapper
mapper
=
JacksonUtils
.
getInstance
();
String
crossId
=
entry
.
getKey
();
//2.1 获取路口信息
CrossInfoPO
crossInfoPO
=
crossCacheMap
.
get
(
crossId
);
if
(
Objects
.
isNull
(
crossInfoPO
))
{
crossInfoPO
=
crossInfoMapper
.
selectByPrimaryKey
(
crossId
);
if
(
Objects
.
isNull
(
crossInfoPO
))
{
continue
;
}
crossCacheMap
.
put
(
crossId
,
crossInfoPO
);
}
//2.2 获取灯态信息,传递参数:mapper,crossId,路口信息
List
<
LightsStatusVO
>
lightsStatusVOS
=
getLightsStatusVOS2
(
crossInfoPO
,
light
);
//2.3 保存灯态数据并发送
saveAndSendLightsStatus
(
mapper
,
crossId
,
lightsStatusVOS
,
entry
);
}
sendKafka
();
}
//--------------------------------------------------------------------------
}
}
catch
(
Exception
e
)
{
log
.
error
(
"解析灯态数据异常"
,
e
);
}
}
// 灯态,每秒一次
// 灯态,每秒一次
@Scheduled
(
fixedRate
=
1000
)
//
@Scheduled(fixedRate = 1000)
public
void
lightStatus
()
{
public
void
lightStatus
()
{
ConcurrentHashMap
<
String
,
RealTimeDataWebSocket
>
evenWarnWebSocketMap
=
RealTimeDataWebSocket
.
getEvenWarnWebSocketMap
();
ConcurrentHashMap
<
String
,
RealTimeDataWebSocket
>
evenWarnWebSocketMap
=
RealTimeDataWebSocket
.
getEvenWarnWebSocketMap
();
try
{
try
{
if
(!
evenWarnWebSocketMap
.
isEmpty
())
{
if
(!
evenWarnWebSocketMap
.
isEmpty
())
{
//1. 按路口分组后的websocket连接列表:map<crossId, List<Map.Entry<String, RealTimeDataWebSocket>>>
Map
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
crossEntryMap
=
getCrossEntryMap
(
evenWarnWebSocketMap
);
Map
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
crossEntryMap
=
getCrossEntryMap
(
evenWarnWebSocketMap
);
//2. 循环每一个路口,并且循环每一个websocket连接,发送灯态数据
for
(
Map
.
Entry
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
entry
:
crossEntryMap
.
entrySet
())
{
for
(
Map
.
Entry
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
entry
:
crossEntryMap
.
entrySet
())
{
//commonThreadPoolExecutor.execute(() -> {
//commonThreadPoolExecutor.execute(() -> {
ObjectMapper
mapper
=
JacksonUtils
.
getInstance
();
ObjectMapper
mapper
=
JacksonUtils
.
getInstance
();
String
crossId
=
entry
.
getKey
();
String
crossId
=
entry
.
getKey
();
//2.1 获取路口信息
CrossInfoPO
crossInfoPO
=
crossInfoMapper
.
selectByPrimaryKey
(
crossId
);
CrossInfoPO
crossInfoPO
=
crossInfoMapper
.
selectByPrimaryKey
(
crossId
);
if
(
Objects
.
isNull
(
crossInfoPO
))
{
if
(
Objects
.
isNull
(
crossInfoPO
))
{
continue
;
continue
;
}
}
//2.2 获取灯态信息,传递参数:mapper,crossId,路口信息
List
<
LightsStatusVO
>
lightsStatusVOS
=
getLightsStatusVOS
(
mapper
,
crossId
,
crossInfoPO
);
List
<
LightsStatusVO
>
lightsStatusVOS
=
getLightsStatusVOS
(
mapper
,
crossId
,
crossInfoPO
);
//2.3 保存灯态数据并发送
saveAndSendLightsStatus
(
mapper
,
crossId
,
lightsStatusVOS
,
entry
);
saveAndSendLightsStatus
(
mapper
,
crossId
,
lightsStatusVOS
,
entry
);
//});
//});
}
}
...
@@ -125,23 +208,28 @@ public class SignalStatusTask {
...
@@ -125,23 +208,28 @@ public class SignalStatusTask {
*/
*/
private
void
saveAndSendLightsStatus
(
ObjectMapper
mapper
,
String
crossId
,
List
<
LightsStatusVO
>
lightsStatusVOS
,
private
void
saveAndSendLightsStatus
(
ObjectMapper
mapper
,
String
crossId
,
List
<
LightsStatusVO
>
lightsStatusVOS
,
Map
.
Entry
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
entry
)
{
Map
.
Entry
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
entry
)
{
//1. 获取路口websocket连接列表
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>
value
=
entry
.
getValue
();
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>
value
=
entry
.
getValue
();
produceListMap
.
remove
(
crossId
);
// 清除上一秒缓存
produceListMap
.
remove
(
crossId
);
// 清除上一秒缓存
if
(!
CollectionUtils
.
isEmpty
(
lightsStatusVOS
))
{
if
(!
CollectionUtils
.
isEmpty
(
lightsStatusVOS
))
{
//2. 循环遍历灯态数据,并保存灯态数据到数据库,并发送灯态数据到websocket
for
(
LightsStatusVO
lightsStatusVO
:
lightsStatusVOS
)
{
for
(
LightsStatusVO
lightsStatusVO
:
lightsStatusVOS
)
{
if
(
Objects
.
nonNull
(
lightsStatusVO
)
&&
Objects
.
equals
(
lightsStatusVO
.
getCrossId
(),
crossId
))
{
if
(
Objects
.
nonNull
(
lightsStatusVO
)
&&
Objects
.
equals
(
lightsStatusVO
.
getCrossId
(),
crossId
))
{
//
calculateControlCountDown
(
crossId
,
lightsStatusVO
);
calculateControlCountDown
(
crossId
,
lightsStatusVO
);
try
{
try
{
// 相同路口不同websocket统一发送灯态
// 相同路口不同websocket统一发送灯态
List
<
LightsStatusVO2
>
listResult
=
convertDirLampGroupMap2Entity
(
lightsStatusVOS
);
List
<
LightsStatusVO2
>
listResult
2
=
convertDirLampGroupMap2Entity
(
lightsStatusVOS
);
String
json
=
mapper
.
writeValueAsString
(
listResult
);
String
json
=
mapper
.
writeValueAsString
(
listResult
2
);
for
(
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>
socketEntry
:
value
)
{
for
(
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>
socketEntry
:
value
)
{
String
crossIdStr
=
socketEntry
.
getKey
();
String
crossIdStr
=
socketEntry
.
getKey
();
RealTimeDataWebSocket
webSocket
=
socketEntry
.
getValue
();
RealTimeDataWebSocket
webSocket
=
socketEntry
.
getValue
();
// 发送数据到websocket
webSocket
.
sendInfo
(
json
,
crossIdStr
);
webSocket
.
sendInfo
(
json
,
crossIdStr
);
}
}
insertIntoHist
(
crossId
,
json
);
insertIntoHist
(
crossId
,
json
);
produceListMap
.
put
(
crossId
,
listResult
);
produceListMap
.
put
(
crossId
,
listResult
2
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"RealTimeDataWebSocket发送异常,异常信息:"
,
e
);
log
.
error
(
"RealTimeDataWebSocket发送异常,异常信息:"
,
e
);
}
}
...
@@ -188,6 +276,139 @@ public class SignalStatusTask {
...
@@ -188,6 +276,139 @@ public class SignalStatusTask {
return
Collections
.
emptyList
();
return
Collections
.
emptyList
();
}
}
public
static
Map
<
String
,
String
>
crossIdMap
=
new
HashMap
();
{
crossIdMap
.
put
(
"R-081507"
,
"12Q5V099C30"
);
}
/**
* 获取路口灯态列表
* @param crossInfoPO 路口信息
* @param light 孪生灯态数据
* @return
*/
private
List
<
LightsStatusVO
>
getLightsStatusVOS2
(
CrossInfoPO
crossInfoPO
,
WHSpatData
light
){
List
<
LightsStatusVO
>
result
=
new
ArrayList
<>();
//1. 获取路口列表
List
<
Intersection
>
intersections
=
light
.
getIntersections
();
//2. 循环每一个路口(实际就一个路口,因为rsuId对应一个路口)
for
(
Intersection
nodeCross
:
intersections
)
{
List
<
Phase
>
phases
=
nodeCross
.
getPhases
();
String
crossId
=
(
String
)
crossIdMap
.
get
(
light
.
getRsuId
());
//转向-方向-灯态
Map
<
String
,
Boolean
>
m
=
Maps
.
newHashMap
();
//循环每一个相位,获取该相位相关字段:如周期时间、周期倒计时、相位灯态倒计时、方向、车道转向、灯态
for
(
Phase
p
:
phases
)
{
LightsStatusVO
infoVo
=
new
LightsStatusVO
();
infoVo
.
setRunMode
(
"8"
);
// 控制模式
infoVo
.
setPhasePlanId
(
"1"
);
// 相位方案号
infoVo
.
setSchemeId
(
"1"
);
infoVo
.
setCode
(
light
.
getRsuId
());
infoVo
.
setCrossId
(
crossId
);
infoVo
.
setCycleCountDown
(
0
);
String
cycleLenKey
=
crossId
+
"_"
+
p
.
getPhaseId
();
Integer
len
=
phaseCountdownCacheMap
.
get
(
cycleLenKey
);
if
(
len
!=
null
){
if
(
len
<
p
.
getLimitEndTime
()){
phaseCountdownCacheMap
.
put
(
cycleLenKey
,
p
.
getLimitEndTime
());
infoVo
.
setCycleLen
(
p
.
getLimitEndTime
());
}
else
{
infoVo
.
setCycleLen
(
len
);
}
}
else
{
phaseCountdownCacheMap
.
put
(
cycleLenKey
,
p
.
getLimitEndTime
());
}
infoVo
.
setPhaseId
(
p
.
getPhaseId
()
+
""
);
infoVo
.
setCyclePhaseCountDown
(
p
.
getLimitEndTime
());
//map<方向,倒计时>
Map
<
String
,
Object
>
phaseMap
=
new
HashMap
<>();
infoVo
.
setPhaseMap
(
phaseMap
);
//后两个参数是map<方向,倒计时>, 倒计时
Map
<
String
,
Object
>
dirLampGroupMap
=
getDirLampGroupMap
(
p
,
m
,
phaseMap
,
p
.
getLimitEndTime
());
infoVo
.
setDirLampGroupMap
(
dirLampGroupMap
);
infoVo
.
setTimeStamp
(
light
.
getTimestamp
()
+
""
);
infoVo
.
setSchemeStartTime
(
light
.
getTimestamp
()
+
""
);
result
.
add
(
infoVo
);
}
}
return
result
;
}
private
Map
<
String
,
Object
>
getDirLampGroupMap
(
Phase
phase
,
Map
<
String
,
Boolean
>
m
,
Map
<
String
,
Object
>
phaseMap
,
int
limitEndTime
)
{
//Map<进口方向, Map<车道转向, 对应灯态>>
Map
<
String
,
Object
>
dirLampGroupMap
=
new
HashMap
<>();
//进口方向
int
importDirect
=
phase
.
getImportDirect
();
int
dir
=
DirectionConverter
.
convertDirection
(
importDirect
);
//进口方向适配
phaseMap
.
put
(
dir
+
""
,
limitEndTime
);
//方向,倒计时
//Map<车道转向, 对应灯态>
Map
<
Integer
,
String
>
turnMap
=
new
HashMap
<>();
dirLampGroupMap
.
put
(
dir
+
""
,
turnMap
);
//获取灯态
// 信号灯颜色 [0..8],0:未知状态,1:信号灯未工作,2:红闪,3:红灯状态,4:绿灯待行状态,
// 5:绿灯状态,6:受保护相位绿灯(箭头灯),7:黄灯状态,8:黄闪。
int
light
=
phase
.
getLight
();
String
color
=
transferLampGroup
(
light
);
//车道灯态适配
int
turn
=
phase
.
getTurn
();
int
turnDir
=
TurnDirectionMapper
.
mapToTargetValue
(
turn
);
//车道转向适配
if
(
m
.
containsKey
(
dir
+
"-"
+
turnDir
+
"-"
+
color
)){
return
null
;
}
else
{
m
.
put
(
dir
+
"-"
+
turnDir
+
"-"
+
color
,
true
);
}
turnMap
.
put
(
turnDir
,
color
);
return
dirLampGroupMap
;
}
/**
* 信号灯状态转颜色
* 信号灯颜色 [0..8],
* 0:未知状态,1:信号灯未工作,2:红闪,3:红灯状态,
* 4:绿灯待行状态,5:绿灯状态,6:受保护相位绿灯(箭头灯),7:黄灯状态,8:黄闪。
* @param state
* @return
*/
private
String
transferLampGroup
(
int
state
)
{
String
color
=
""
;
switch
(
state
)
{
case
1
:
//信号灯未工作
color
=
"green"
;
break
;
case
2
:
color
=
"redFlash"
;
break
;
case
3
:
color
=
"red"
;
break
;
case
4
:
color
=
"greenFlash"
;
break
;
case
5
:
color
=
"green"
;
break
;
case
6
:
color
=
"green"
;
break
;
case
7
:
color
=
"yellow"
;
break
;
case
8
:
color
=
"yellowFlash"
;
break
;
default
:
color
=
""
;
break
;
}
return
color
;
}
private
static
List
<
LightsStatusVO2
>
convertDirLampGroupMap2Entity
(
List
<
LightsStatusVO
>
lightsStatusVOS
)
throws
Exception
{
private
static
List
<
LightsStatusVO2
>
convertDirLampGroupMap2Entity
(
List
<
LightsStatusVO
>
lightsStatusVOS
)
throws
Exception
{
List
<
LightsStatusVO2
>
listResult
=
null
;
List
<
LightsStatusVO2
>
listResult
=
null
;
try
{
try
{
...
@@ -353,10 +574,13 @@ public class SignalStatusTask {
...
@@ -353,10 +574,13 @@ public class SignalStatusTask {
* 不同系统获取统一路口灯态,统一处理
* 不同系统获取统一路口灯态,统一处理
*
*
* @param evenWarnWebSocketMap
* @param evenWarnWebSocketMap
* @return
* @return
返回类型是 Map<String, List<Map.Entry<String, RealTimeDataWebSocket>>>,表示按 crossId 分组后的连接条目。
*/
*/
private
static
Map
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
getCrossEntryMap
(
ConcurrentHashMap
<
String
,
RealTimeDataWebSocket
>
evenWarnWebSocketMap
)
{
private
static
Map
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
getCrossEntryMap
(
ConcurrentHashMap
<
String
,
RealTimeDataWebSocket
>
evenWarnWebSocketMap
)
{
//map<crossId, List<Map.Entry<crossIdStr, RealTimeDataWebSocket>>>
Map
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
tempMap
=
new
ConcurrentHashMap
<>();
Map
<
String
,
List
<
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>>>
tempMap
=
new
ConcurrentHashMap
<>();
//遍历所有路口websocket
for
(
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>
item
:
evenWarnWebSocketMap
.
entrySet
())
{
for
(
Map
.
Entry
<
String
,
RealTimeDataWebSocket
>
item
:
evenWarnWebSocketMap
.
entrySet
())
{
String
crossId
=
""
;
String
crossId
=
""
;
String
crossIdStr
=
item
.
getKey
();
String
crossIdStr
=
item
.
getKey
();
...
...
signal-utc-service/src/main/java/net/wanji/utc/util/DirectionConverter.java
0 → 100644
View file @
eb220b3d
package
net
.
wanji
.
utc
.
util
;
/**
* 灯态的16个方向转换为8个方向类型
* 注:用于计算空放
*/
public
class
DirectionConverter
{
/**
* 将0~15的进口方向编号转换为1~8的方向类型
* @param directionCode 0~15的输入方向编号
* @return 1~8的输出方向类型
*/
public
static
int
convertDirection
(
int
directionCode
)
{
if
(
directionCode
<
0
||
directionCode
>
15
)
{
throw
new
IllegalArgumentException
(
"directionCode must be between 0 and 15"
);
}
switch
(
directionCode
)
{
case
15
:
case
0
:
return
1
;
// 北
case
1
:
case
2
:
return
2
;
// 东北
case
3
:
case
4
:
return
3
;
// 东
case
5
:
case
6
:
return
4
;
// 东南
case
7
:
case
8
:
return
5
;
// 南
case
9
:
case
10
:
return
6
;
// 西南
case
11
:
case
12
:
return
7
;
// 西
case
13
:
case
14
:
return
8
;
// 西北
}
return
0
;
// 未知方向
}
public
static
void
main
(
String
[]
args
)
{
for
(
int
i
=
0
;
i
<=
15
;
i
++)
{
System
.
out
.
println
(
"Input: "
+
i
+
" → Output: "
+
convertDirection
(
i
));
}
}
}
signal-utc-service/src/main/java/net/wanji/utc/util/TurnDirectionMapper.java
0 → 100644
View file @
eb220b3d
package
net
.
wanji
.
utc
.
util
;
/**
* 灯态车道转向映射器
*/
public
class
TurnDirectionMapper
{
// 定义映射关系,索引为源头转向值,值为目标转向值
private
static
final
int
[]
DIRECTION_MAP
=
{
1
,
// 源头值 0 -> 目标值 1(左转)
2
,
// 源头值 1 -> 目标值 2(直行)
1
,
// 源头值 2 -> 目标值 1(左转)
5
,
// 源头值 3 -> 目标值 5(直左)
3
,
// 源头值 4 -> 目标值 3(右转)
6
,
// 源头值 5 -> 目标值 6(直右)
8
,
// 源头值 6 -> 目标值 8(左右)
7
,
// 源头值 7 -> 目标值 7(左直右)
4
,
// 源头值 8 -> 目标值 4(掉头)
10
,
// 源头值 9 -> 目标值 10(直行掉头)
9
,
// 源头值 10 -> 目标值 9(左转掉头)
12
,
// 源头值 11 -> 目标值 12(左直掉头)
11
,
// 源头值 12 -> 目标值 11(右转掉头)
13
,
// 源头值 13 -> 目标值 13(直右掉头)
15
,
// 源头值 14 -> 目标值 15(左右掉头)
14
// 源头值 15 -> 目标值 14(左直右掉头)
};
/**
* 转换源头转向值到目标转向值
*
* @param sourceValue 源头转向值 (0-15)
* @return 目标转向值 (1-15)
*/
public
static
int
mapToTargetValue
(
int
sourceValue
)
{
if
(
sourceValue
<
0
||
sourceValue
>=
DIRECTION_MAP
.
length
)
{
throw
new
IllegalArgumentException
(
"非法源头转向值: "
+
sourceValue
);
}
return
DIRECTION_MAP
[
sourceValue
];
}
public
static
void
main
(
String
[]
args
)
{
for
(
int
i
=
0
;
i
<
16
;
i
++)
{
System
.
out
.
println
(
"源头值 "
+
i
+
" 映射的目标值为: "
+
mapToTargetValue
(
i
));
}
}
}
signal-utc-service/src/main/resources/bootstrap-demo.yaml
0 → 100644
View file @
eb220b3d
spring
:
application
:
# dubbo启动需要程序名称
name
:
utc
main
:
allow-circular-references
:
true
cloud
:
nacos
:
config
:
server-addr
:
10.102.1.163:8848
file-extension
:
yaml
group
:
signal
namespace
:
signal
username
:
nacos
password
:
Wgzonal0sowQjm9
\ No newline at end of file
signal-utc-service/src/main/resources/bootstrap.yaml
View file @
eb220b3d
spring
:
spring
:
profiles
:
profiles
:
active
:
dev
#demo 北京环境
\ No newline at end of file
active
:
demo
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment