Commit 45ce11e6 authored by duanruiming's avatar duanruiming

[add] 灯态发送kafka

parent 6c35c9d1
......@@ -26,6 +26,10 @@
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>net.wanji</groupId>
<artifactId>signal-feign-service</artifactId>
......
package net.wanji.utc.kafka;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
/**
* @author duanruiming
* @date 2023/08/02 16:58
*/
@Component
@Slf4j
@SuppressWarnings("all")
public class ProducerHandler {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, Object msg) {
Page<Object> objectPage = new Page<>();
try {
ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(topic, JacksonUtils.getInstance().writeValueAsString(msg));
future.addCallback(
success -> {},
failure -> log.error("消息发送失败:", failure));
} catch (JsonProcessingException e) {
log.error("{}主题发送消息失败:", e);
return;
}
}
}
......@@ -17,6 +17,7 @@ import net.wanji.feign.service.UtcDTFeignClients;
import net.wanji.feign.service.UtcHisenseFeignClients;
import net.wanji.utc.common.constant.Constants;
import net.wanji.utc.common.typeenum.BasicEnum;
import net.wanji.utc.kafka.ProducerHandler;
import net.wanji.utc.service.runninginfo.HkLightsStatusService;
import net.wanji.utc.service.runninginfo.SignalStatusService;
import net.wanji.utc.websocket.RealTimeDataWebSocket;
......@@ -33,6 +34,7 @@ import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 订阅信号运行状态、告警、灯态
......@@ -59,6 +61,8 @@ public class SignalStatusTask {
private UtcDTFeignClients utcDTFeignClients;
@Resource
private CrossLightsStatusHistMapper crossLightsStatusHistMapper;
@Autowired
private ProducerHandler producerHandler;
private static Map<String, List<LightsStatusVO>> lightStatusCache = new HashMap<>();
private static final Map<String, Integer> runModeTimeMap = new ConcurrentHashMap<>();
......@@ -77,6 +81,7 @@ public class SignalStatusTask {
public void lightStatus() {
ConcurrentHashMap<String, RealTimeDataWebSocket> evenWarnWebSocketMap = RealTimeDataWebSocket.getEvenWarnWebSocketMap();
try {
List<LightsStatusVO> produceList = new ArrayList<>();
if (!evenWarnWebSocketMap.isEmpty()) {
for (Map.Entry<String, RealTimeDataWebSocket> entry : evenWarnWebSocketMap.entrySet()) {
commonThreadPoolExecutor.execute(() -> {
......@@ -122,6 +127,7 @@ public class SignalStatusTask {
String json = mapper.writeValueAsString(lightsStatusVOS);
webSocket.sendInfo(json, crossIdStr);
insertIntoHist(crossId, json);
produceList.addAll(lightsStatusVOS);
} catch (Exception e) {
log.error("RealTimeDataWebSocket发送异常,异常信息:", e);
}
......@@ -131,6 +137,10 @@ public class SignalStatusTask {
});
}
}
if (!CollectionUtils.isEmpty(produceList)) {
List<LightsStatusVO> list = produceList.stream().distinct().collect(Collectors.toList());
producerHandler.send("cross_lights_status", list);
}
} catch (Exception e) {
log.error("实时推送灯态数据失败", e);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment