Commit c8bdcfdd authored by duanruiming's avatar duanruiming

[update] 信号机灯态->增加保存历史灯态到ES,未开启使用

parent 08fff978
......@@ -65,11 +65,29 @@
<oauth2-version>2.2.5.RELEASE</oauth2-version>
<jjwt-version>0.9.1</jjwt-version>
<harbor.ip.port>10.102.1.182:5000</harbor.ip.port>
<elastic-version>7.3.0</elastic-version>
<elastic-data-version>2.6.3</elastic-data-version>
</properties>
<!-- 依赖声明 -->
<dependencyManagement>
<dependencies>
<!-- ElasticSearch依赖 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elastic-version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elastic-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>${elastic-data-version}</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
......
......@@ -26,6 +26,19 @@
</properties>
<dependencies>
<!-- ElasticSearch依赖 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
......
......@@ -15,6 +15,11 @@ import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings("all")
public class Constants {
/**
* kafka elastic 历史灯态存储key
*/
public static final String SIGNAL_LIGHT_HISTORY_KEY = "cross_lights_status";
/**
* 信号机运行模式
*/
......
package net.wanji.utc.service.runninginfo.impl;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
......@@ -28,10 +29,22 @@ import net.wanji.utc.task.SignalStatus4StaticSchemeTask;
import net.wanji.utc.util.ListUtil;
import net.wanji.utc.util.RedisUtil;
import net.wanji.utc.util.StringUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
......@@ -50,6 +63,8 @@ public class SignalStatusServiceImpl implements SignalStatusService {
private final SignalStatusLogMapper signalStatusLogMapper;
private final WanJiCommonRunningStatusService wanJiRunningStatusService;
private final CrossLightsStatusHistMapper crossLightsStatusHistMapper;
@Resource
RestHighLevelClient client;
@Override
@Transactional
......@@ -206,13 +221,21 @@ public class SignalStatusServiceImpl implements SignalStatusService {
@Override
public List<LightsStatusVO2> lightStatusHist(String crossId, Integer startBatchTime, Integer endBatchTime) throws Exception {
if (StringUtils.equals("es", "")) {
return fromElastic(crossId, startBatchTime, endBatchTime);
}
return fromMysql(crossId, startBatchTime, endBatchTime);
}
private List<LightsStatusVO2> fromMysql(String crossId, Integer startBatchTime, Integer endBatchTime) throws JsonProcessingException {
List<LightsStatusVO2> result = new ArrayList<>();
List<CrossLightsStatusHistPO> crossLightsStatusHistPOS = crossLightsStatusHistMapper.selectRangeList(crossId, startBatchTime, endBatchTime);
if (!CollectionUtils.isEmpty(crossLightsStatusHistPOS)) {
for (CrossLightsStatusHistPO crossLightsStatusHistPO : crossLightsStatusHistPOS) {
String lightsStatusJson = crossLightsStatusHistPO.getLightsStatusJson();
ObjectMapper instance = JacksonUtils.getInstance();
List<LightsStatusVO2> lightsStatusVOS = instance.readValue(lightsStatusJson, new TypeReference<List<LightsStatusVO2>>() {});
List<LightsStatusVO2> lightsStatusVOS = instance.readValue(lightsStatusJson, new TypeReference<List<LightsStatusVO2>>() {
});
if (!CollectionUtils.isEmpty(lightsStatusVOS)) {
LightsStatusVO2 lightsStatusVO = lightsStatusVOS.get(0);
result.add(lightsStatusVO);
......@@ -221,4 +244,41 @@ public class SignalStatusServiceImpl implements SignalStatusService {
}
return result;
}
private List<LightsStatusVO2> fromElastic(String crossId, Integer startBatchTime, Integer endBatchTime) throws Exception {
try {
List<LightsStatusVO2> result = new ArrayList<>();
SearchRequest searchRequest = new SearchRequest(Constants.SIGNAL_LIGHT_HISTORY_KEY);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder matchQuery = QueryBuilders.matchQuery("crossId", crossId);
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timeStamp")
.gte(startBatchTime)
.lte(endBatchTime);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.must(matchQuery)
.must(rangeQuery);
searchSourceBuilder
.query(boolQuery);
//.size(10000);
searchRequest.source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
LightsStatusVO2 lightsStatusVO2 =
JacksonUtils.getInstance().readValue(sourceAsString, LightsStatusVO2.class);
result.add(lightsStatusVO2);
}
return result;
} catch (Exception e) {
log.error("elastic查询历史灯态异常", e);
throw new Exception(e);
}
}
}
......@@ -22,6 +22,14 @@ import net.wanji.utc.service.runninginfo.HkLightsStatusService;
import net.wanji.utc.service.runninginfo.SignalStatusService;
import net.wanji.utc.websocket.RealTimeDataWebSocket;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
......@@ -63,13 +71,15 @@ public class SignalStatusTask {
private CrossLightsStatusHistMapper crossLightsStatusHistMapper;
@Autowired
private ProducerHandler producerHandler;
@Resource
RestHighLevelClient client;
private static final Map<String, Integer> runModeTimeMap = new ConcurrentHashMap<>();
@Qualifier("net.wanji.feign.service.UtcHisenseFeignClients")
@Autowired
private UtcHisenseFeignClients utcHisenseFeignClients;
private static final ConcurrentHashMap<String, List<LightsStatusVO>> produceListMap = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, List<LightsStatusVO2>> produceListMap = new ConcurrentHashMap<>();
// 运行状态、告警,1分钟一次
@Scheduled(fixedRate = 60 * 1000)
......@@ -99,6 +109,7 @@ public class SignalStatusTask {
//});
}
sendKafka();
//sendElastic();
}
} catch (Exception e) {
log.error("实时推送灯态数据失败", e);
......@@ -115,6 +126,7 @@ public class SignalStatusTask {
private void saveAndSendLightsStatus(ObjectMapper mapper, String crossId, List<LightsStatusVO> lightsStatusVOS,
Map.Entry<String, List<Map.Entry<String, RealTimeDataWebSocket>>> entry) {
List<Map.Entry<String, RealTimeDataWebSocket>> value = entry.getValue();
produceListMap.remove(crossId); // 清除上一秒缓存
if (!CollectionUtils.isEmpty(lightsStatusVOS)) {
for (LightsStatusVO lightsStatusVO : lightsStatusVOS) {
if (Objects.nonNull(lightsStatusVO) && Objects.equals(lightsStatusVO.getCrossId(), crossId)) {
......@@ -129,13 +141,11 @@ public class SignalStatusTask {
webSocket.sendInfo(json, crossIdStr);
}
insertIntoHist(crossId, json);
produceListMap.put(crossId, lightsStatusVOS);
produceListMap.put(crossId, listResult);
} catch (Exception e) {
log.error("RealTimeDataWebSocket发送异常,异常信息:", e);
}
}
produceListMap.remove(crossId);
}
}
}
......@@ -156,11 +166,13 @@ public class SignalStatusTask {
}
if (StringUtils.equals(BasicEnum.ManufacturerEnum.DT.getCode(), manufacturerInfoPO.getCode())) {
JsonViewObject jsonViewObject = utcDTFeignClients.lightStatus(crossId);
return mapper.convertValue(jsonViewObject.getContent(), new TypeReference<List<LightsStatusVO>>() {});
return mapper.convertValue(jsonViewObject.getContent(), new TypeReference<List<LightsStatusVO>>() {
});
}
if (StringUtils.endsWithIgnoreCase(BasicEnum.ManufacturerEnum.HISENSE.getCode(), manufacturerInfoPO.getCode())) {
JsonViewObject jsonViewObject = utcHisenseFeignClients.lightStatus(crossId);
return mapper.convertValue(jsonViewObject.getContent(), new TypeReference<List<LightsStatusVO>>() {});
return mapper.convertValue(jsonViewObject.getContent(), new TypeReference<List<LightsStatusVO>>() {
});
}
if (StringUtils.endsWithIgnoreCase("STATIC", manufacturerInfoPO.getCode())) {
LightsStatusVO lightsStatusVO = SignalStatus4StaticSchemeTask.currentCrossLightStatusCache.get(crossId);
......@@ -210,6 +222,7 @@ public class SignalStatusTask {
/**
* 设置方向转向信息实体
*
* @param phaseMap
* @param dir
* @param turnList
......@@ -231,6 +244,7 @@ public class SignalStatusTask {
/**
* 转换转向参数
*
* @param turnEntry
* @return
*/
......@@ -247,6 +261,7 @@ public class SignalStatusTask {
/**
* 设置方向转向倒计时时间
*
* @param phaseMap
* @param dir
* @param turn
......@@ -277,11 +292,37 @@ public class SignalStatusTask {
*/
private void sendKafka() {
if (!produceListMap.isEmpty()) {
List<LightsStatusVO> list = new ArrayList<>(produceListMap.size());
for (Map.Entry<String, List<LightsStatusVO>> entry : produceListMap.entrySet()) {
List<LightsStatusVO2> list = new ArrayList<>(produceListMap.size());
for (Map.Entry<String, List<LightsStatusVO2>> entry : produceListMap.entrySet()) {
list.addAll(entry.getValue());
}
producerHandler.send("cross_lights_status", list);
producerHandler.send(Constants.SIGNAL_LIGHT_HISTORY_KEY, list);
}
}
/**
* 灯态数据发送elastic
*/
private void sendElastic() throws Exception {
if (!produceListMap.isEmpty()) {
List<LightsStatusVO2> list = new ArrayList<>(produceListMap.size());
for (Map.Entry<String, List<LightsStatusVO2>> entry : produceListMap.entrySet()) {
List<LightsStatusVO2> value = entry.getValue();
for (LightsStatusVO2 lightsStatusVO2 : value) {
String json = JacksonUtils.getInstance().writeValueAsString(lightsStatusVO2);
GetIndexRequest getIndexRequest = new GetIndexRequest(Constants.SIGNAL_LIGHT_HISTORY_KEY);
boolean exists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
if (exists) {
IndexRequest indexRequest = new IndexRequest(Constants.SIGNAL_LIGHT_HISTORY_KEY);
indexRequest.source(json, XContentType.JSON);
IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
} else {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(Constants.SIGNAL_LIGHT_HISTORY_KEY);
createIndexRequest.source(json, XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
}
}
}
}
}
......@@ -310,6 +351,7 @@ public class SignalStatusTask {
/**
* 不同系统获取统一路口灯态,统一处理
*
* @param evenWarnWebSocketMap
* @return
*/
......
......@@ -3,6 +3,8 @@ package net.wanji.databus.vo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
/**
* @author Kent HAN
......@@ -15,6 +17,7 @@ public class BaseCrossInfo {
private String code;
@ApiModelProperty(value = "路口ID")
@Field(type = FieldType.Keyword, name = "crossId")
private String crossId;
@ApiModelProperty(value = "厂商代码")
......
......@@ -4,6 +4,8 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.util.List;
......@@ -36,6 +38,7 @@ public class LightsStatusVO2 extends BaseCrossInfo {
@ApiModelProperty(value = "灯组状态")
private List<DirInfo> dirLampGroupMapList;
@ApiModelProperty(value = "数据上报时间戳")
@Field(type = FieldType.Keyword, name = "timeStamp")
private String timeStamp;
@Data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment