Commit be064f1b authored by duanruiming's avatar duanruiming

[update] 实时数据优化

parent fecdea7c
......@@ -37,7 +37,7 @@ public class CrossStrategyOptConsumerHandler implements KafkaListenerErrorHandle
* @param acknowledgment
* @throws Exception
*/
@KafkaListener(topics = {"${kafka-consumer.imbalanceAlgorithmResultTopic}"}, groupId = "group1")
@KafkaListener(topics = {"${kafka-consumer.imbalanceAlgorithmResultTopic}"}, groupId = "group")
public void receiveImbalanceData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String message = String.valueOf(record.value());
if (StringUtils.isNotBlank(message)) {
......@@ -48,7 +48,7 @@ public class CrossStrategyOptConsumerHandler implements KafkaListenerErrorHandle
acknowledgment.acknowledge();
}
@KafkaListener(topics = {"${kafka-consumer.overflowAlgorithmResultTopic}"}, groupId = "group1")
@KafkaListener(topics = {"${kafka-consumer.overflowAlgorithmResultTopic}"}, groupId = "group")
public void receiveOverFlowData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String message = String.valueOf(record.value());
if (StringUtils.isNotBlank(message)) {
......
......@@ -2,13 +2,9 @@ package net.wanji.datacenter.kafka;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import net.wanji.datacenter.common.Constants;
import net.wanji.datacenter.pojo.vo.GreenBeltChartVO;
import net.wanji.datacenter.pojo.vo.GreenBeltOptInfoVO;
import net.wanji.datacenter.service.GreenBeltInfoService;
import net.wanji.datacenter.util.RedisUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
......@@ -30,43 +26,35 @@ public class GreenBeltDataConsumerHandler implements KafkaListenerErrorHandler {
@Resource
private GreenBeltInfoService greenBeltInfoService;
@Resource
private RedisUtils redisUtils;
@KafkaListener(topics = {"${kafka-consumer.greenBeltInfoDataTopic}"}, groupId = "group1")
@KafkaListener(topics = {"${kafka-consumer.greenBeltInfoDataTopic}"}, groupId = "group-blet")
public void receiveGreenBeltInfoData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
try {
String message = String.valueOf(record.value());
if (StringUtils.isNotBlank(message)) {
GreenBeltOptInfoVO greenBeltOptInfoVO = greenBeltInfoService.convertData(message);
greenBeltInfoService.save(greenBeltOptInfoVO);
String key = Constants.GREEN_ID_OPT_KEY.concat(greenBeltOptInfoVO.getGreenId());
//redisUtils.setAndExpire(key, greenBeltOptInfoVO, 3600 * 24);
GreenBeltChartVO greenBeltChartVO = greenBeltInfoService.convertChartData(greenBeltOptInfoVO);
String chartKey = Constants.GREEN_ID_OPT_CHART_KEY.concat(greenBeltOptInfoVO.getGreenId());
redisUtils.setAndExpire(chartKey, JacksonUtils.getInstance().writeValueAsString(greenBeltChartVO), 3600 * 24);
}
acknowledgment.acknowledge();
} catch (Exception e) {
throw new RuntimeException(e);
log.error("绿波GreenOpt实时数据转换异常", e);
throw new Exception(e);
}
}
@KafkaListener(topics = {"${kafka-consumer.greenWavePeriodTopic}"}, groupId = "group")
public void receiveGreenPeriodfoData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
public void receiveGreenPeriodData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value());
try {
greenBeltInfoService.savePeriod(originalData);
} catch (Exception e) {
log.error("绿波GreenRealTime实时数据转换异常", e);
log.error("绿波GreenPeriod实时数据转换异常", e);
throw new Exception();
}
acknowledgment.acknowledge();
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
......
......@@ -2,7 +2,6 @@ package net.wanji.datacenter.kafka;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.wanji.datacenter.pojo.dto.CrossSnapshotDataDTO;
import net.wanji.datacenter.service.DataProcessService;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
......@@ -14,9 +13,6 @@ import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author duanruiming
......@@ -46,24 +42,23 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler {
String originalData = String.valueOf(record.value());
try {
dataProcessService.crossSave(originalData);
//acknowledgment.acknowledge();
} catch (Exception e) {
//acknowledgment.acknowledge();
log.error("路口Cross实时数据转换异常", e);
throw new Exception();
}
acknowledgment.acknowledge();
}
@KafkaListener(topics = {"${kafka-consumer.dirPeriodicDataTopic}"}, groupId = "group")
@KafkaListener(topics = {"${kafka-consumer.dirPeriodicDataTopic}"}, groupId = "group-dir")
public void receiveDirPeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value());
try {
dataProcessService.dirSave(originalData);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("方向Dir周期实时数据转换异常", e);
throw new Exception();
}
acknowledgment.acknowledge();
}
@KafkaListener(topics = {"${kafka-consumer.turnPeriodicDataTopic}"}, groupId = "group")
......@@ -71,15 +66,14 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler {
String originalData = String.valueOf(record.value());
try {
dataProcessService.turnSave(originalData);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("转向Turn周期实时数据转换异常", e);
throw new Exception();
}
acknowledgment.acknowledge();
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
......
......@@ -3,7 +3,6 @@ package net.wanji.datacenter.pojo.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import net.wanji.databus.dao.entity.GreenwaveRealtimePO;
import net.wanji.databus.po.CrossTurnDataRealtimePO;
import java.util.List;
......
......@@ -7,6 +7,7 @@ import net.wanji.datacenter.pojo.dto.ImbalanceAlgorithmDTO;
import net.wanji.datacenter.pojo.entity.CrossStrategyResultEntity;
import net.wanji.datacenter.service.CrossStrategyOptService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
......@@ -20,6 +21,7 @@ public class CrossStrategyOptServiceImpl implements CrossStrategyOptService {
@Resource
private CrossStrategyResultMapper crossStrategyResultMapper;
@Transactional(rollbackFor = Exception.class)
@Override
public void save(ImbalanceAlgorithmDTO dto) {
try {
......
......@@ -121,10 +121,7 @@ public class DataProcessServiceImpl implements DataProcessService {
public void crossSave(String originalData) throws Exception {
try {
CrossPeriodDataDTO crossPeriodDataDTO = JSONObject.parseObject(originalData, CrossPeriodDataDTO.class);
String timeStamp = crossPeriodDataDTO.getTimeStamp();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
List<CrossDataRealtimePO> eventList = crossPeriodDataDTO.getEventList();
CrossDataRealtimePO crossDataRealtimePO = new CrossDataRealtimePO();
if (!CollectionUtils.isEmpty(eventList)) {
List<CrossDataRealtimePO> crossDataRealtimePOS = eventList;
crossDataRealtimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList()));
......@@ -148,7 +145,14 @@ public class DataProcessServiceImpl implements DataProcessService {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
List<CrossDirDataRealtimePO> eventList = crossPeriodDirDTO.getEventList();
if (!CollectionUtils.isEmpty(eventList)) {
List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = eventList;
List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = new ArrayList<>(eventList.size());
for (CrossDirDataRealtimePO crossDirDataRealtimePO : eventList) {
String crossId = crossDirDataRealtimePO.getCrossId();
Integer dirType = crossDirDataRealtimePO.getDirType() + 1;
crossDirDataRealtimePO.setId(crossId.concat(String.valueOf(dirType)));
crossDirDataRealtimePO.setDirType(dirType);
crossDirDataRealtimePOS.add(crossDirDataRealtimePO);
}
crossDirDataRealTimeMapper.deleteBatch(crossDirDataRealtimePOS.stream().map(CrossDirDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossDirDataRealTimeMapper.insertBatch(crossDirDataRealtimePOS);
List<CrossDirDataHistPO> list = new ArrayList<>(crossDirDataRealtimePOS.size());
......
package net.wanji.datacenter.service.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.common.utils.tool.JacksonUtils;
......@@ -9,21 +8,19 @@ import net.wanji.databus.dao.entity.GreenwaveHistPO;
import net.wanji.databus.dao.entity.GreenwaveRealtimePO;
import net.wanji.databus.dao.mapper.GreenwaveHistMapper;
import net.wanji.databus.dao.mapper.GreenwaveRealtimeMapper;
import net.wanji.databus.po.CrossDirDataHistPO;
import net.wanji.databus.po.CrossDirDataRealtimePO;
import net.wanji.datacenter.common.Constants;
import net.wanji.datacenter.mapper.StrategyGreenOptHistMapper;
import net.wanji.datacenter.pojo.dto.GreenBeltKafkaDTO;
import net.wanji.datacenter.pojo.dto.GreenWavePeriodDTO;
import net.wanji.datacenter.pojo.entity.StrategyGreenOptHistEntity;
import net.wanji.datacenter.pojo.vo.GreenBeltChartVO;
import net.wanji.datacenter.pojo.vo.GreenBeltOptInfoVO;
import net.wanji.datacenter.service.GreenBeltInfoService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
......@@ -41,7 +38,7 @@ public class GreenBeltInfoServiceImpl implements GreenBeltInfoService {
private GreenwaveHistMapper greenwaveHistMapper;
public static void main(String[] args) throws Exception {
String message = greenBeltInfoData;
String message = null;
ObjectMapper mapper = JacksonUtils.getInstance();
GreenBeltKafkaDTO kafkaDTO = mapper.readValue(message, GreenBeltKafkaDTO.class);
// 正反绿波容器
......@@ -204,7 +201,7 @@ public class GreenBeltInfoServiceImpl implements GreenBeltInfoService {
@Override
public GreenBeltOptInfoVO convertData(String message) throws Exception {
ObjectMapper mapper = JacksonUtils.getInstance();
GreenBeltKafkaDTO kafkaDTO = mapper.readValue(greenBeltInfoData, GreenBeltKafkaDTO.class);
GreenBeltKafkaDTO kafkaDTO = mapper.readValue(message, GreenBeltKafkaDTO.class);
// 正反绿波容器
List<GreenBeltOptInfoVO.DirGreenDetail> dirGreenDetails = new ArrayList<>();
// 构建正向绿波
......@@ -225,6 +222,7 @@ public class GreenBeltInfoServiceImpl implements GreenBeltInfoService {
return greenBeltOptInfoVO;
}
@Transactional(rollbackFor = Exception.class)
@Override
public void save(GreenBeltOptInfoVO infoVO) throws Exception {
if (Objects.nonNull(infoVO)) {
......@@ -249,7 +247,6 @@ public class GreenBeltInfoServiceImpl implements GreenBeltInfoService {
entity.setGreenWidthTime(dirGreenDetail.getGreenWidthTime());
entity.setCrossGreenDetail(JacksonUtils.getInstance().writeValueAsString(dirGreenDetail.getCrossGreenDetailList()));
strategyGreenOptHistMapper.insert(entity);
String key = Constants.GREEN_ID_OPT_KEY.concat(infoVO.getGreenId());
}
}
}
......@@ -337,10 +334,12 @@ public class GreenBeltInfoServiceImpl implements GreenBeltInfoService {
dirGreenDetails.add(dirGreenDetail);
}
@Transactional(rollbackFor = Exception.class)
@Override
public void savePeriod(String message) throws Exception {
ObjectMapper instance = JacksonUtils.getInstance();
List<GreenwaveRealtimePO> greenwaveRealtimePOS = instance.readValue(message, new TypeReference<List<GreenwaveRealtimePO>>() {});
GreenWavePeriodDTO greenWavePeriodDTO = instance.readValue(message, GreenWavePeriodDTO.class);
List<GreenwaveRealtimePO> greenwaveRealtimePOS = greenWavePeriodDTO.getEventList();
if (!CollectionUtils.isEmpty(greenwaveRealtimePOS)) {
List<Integer> collect = greenwaveRealtimePOS.stream().map(GreenwaveRealtimePO::getGreenId).collect(Collectors.toList());
greenwaveRealtimeMapper.deleteBatch(collect);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment