Commit 4ae96a01 authored by hanbing's avatar hanbing

Merge remote-tracking branch 'origin/master'

parents d95eb09d bc538598
package net.wanji.opt.task;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
......@@ -11,6 +13,7 @@ import net.wanji.databus.po.*;
import net.wanji.opt.kafka.ConsumerHandler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
......@@ -23,6 +26,7 @@ import java.util.stream.Collectors;
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class DataCenterProcessTask {
private final CrossDirDataRealtimeMapper crossDirDataRealTimeMapper;
......@@ -39,35 +43,52 @@ public class DataCenterProcessTask {
* @param
* @return void
*/
@Scheduled(fixedRate = 5 * 60 * 1000)
public void realTimeDataSave() throws Exception {
ObjectMapper instance = JacksonUtils.getInstance();
String crossRealTimeData = ConsumerHandler.getTopicMessage("cross_real_time_data");
if (StringUtils.isNotBlank(crossRealTimeData)) {
List<CrossDataRealtimePO> crossDataRealtimePOS = instance.readValue(crossRealTimeData, new TypeReference<List<CrossDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) {
crossDataRealTimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS);
List<CrossDataHistPO> crossDataHistPOS = new ArrayList<>(crossDataRealtimePOS.size());
BeanListUtils.populateList(crossDataRealtimePOS, crossDataHistPOS, CrossDataHistPO.class);
crossDataHistMapper.insertBatch(crossDataHistPOS);
}
try {
saveCrossRealTimeData(instance);
} catch (Exception e) {
log.error("路口实时数据插入失败", e);
}
String crossRealTimeDirData = ConsumerHandler.getTopicMessage("cross_real_time_dir_data");
if (StringUtils.isNotBlank(crossRealTimeDirData)) {
List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = instance.readValue(crossRealTimeDirData, new TypeReference<List<CrossDirDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossDirDataRealtimePOS)) {
crossDirDataRealTimeMapper.deleteBatch(crossDirDataRealtimePOS.stream().map(CrossDirDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossDirDataRealTimeMapper.insertBatch(crossDirDataRealtimePOS);
List<CrossDirDataHistPO> list = new ArrayList<>(crossDirDataRealtimePOS.size());
BeanListUtils.populateList(crossDirDataRealtimePOS, list, CrossDirDataHistPO.class);
crossDirDataHistMapper.insertBatch(list);
try {
saveCrossRealTimeDirData(instance);
} catch (Exception e) {
log.error("路口方向实时数据插入失败", e);
}
try {
saveCrossRealTimeTurnData(instance);
} catch (Exception e) {
log.error("路口转向实时数据插入失败", e);
}
try {
saveCrossRealTimeLaneData(instance);
} catch (Exception e) {
log.error("路口车道实时数据插入失败", e);
}
}
@Transactional
public void saveCrossRealTimeLaneData(ObjectMapper instance) throws JsonProcessingException {
String crossRealTimeLaneData = ConsumerHandler.getTopicMessage("cross_real_time_lane_data");
if (StringUtils.isNotBlank(crossRealTimeLaneData)) {
List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = instance.readValue(crossRealTimeLaneData, new TypeReference<List<CrossLaneDataRealTimePO>>() {});
if (!CollectionUtils.isEmpty(crossLaneDataRealTimePOS)) {
crossLaneDataRealTimeMapper.deleteBatch(crossLaneDataRealTimePOS.stream().map(CrossLaneDataRealTimePO::getCrossId).collect(Collectors.toList()));
crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS);
List<CrossLaneDataHistPO> crossLaneDataHistPOS = new ArrayList<>(crossLaneDataRealTimePOS.size());
BeanListUtils.populateList(crossLaneDataRealTimePOS, crossLaneDataHistPOS, CrossLaneDataHistPO.class);
crossLaneDataHistMapper.insertBatch(crossLaneDataHistPOS);
}
}
}
@Transactional
public void saveCrossRealTimeTurnData(ObjectMapper instance) throws JsonProcessingException {
String crossRealTimeTurnData = ConsumerHandler.getTopicMessage("cross_real_time_turn_data");
if (StringUtils.isNotBlank(crossRealTimeTurnData)) {
List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = instance.readValue(crossRealTimeTurnData, new TypeReference<List<CrossTurnDataRealtimePO>>() {});
......@@ -79,16 +100,34 @@ public class DataCenterProcessTask {
crossTurnDataHistMapper.insertBatch(list);
}
}
String crossRealTimeLaneData = ConsumerHandler.getTopicMessage("cross_real_time_lane_data");
if (StringUtils.isNotBlank(crossRealTimeLaneData)) {
List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = instance.readValue(crossRealTimeLaneData, new TypeReference<List<CrossLaneDataRealTimePO>>() {});
if (!CollectionUtils.isEmpty(crossLaneDataRealTimePOS)) {
crossLaneDataRealTimeMapper.deleteBatch(crossLaneDataRealTimePOS.stream().map(CrossLaneDataRealTimePO::getCrossId).collect(Collectors.toList()));
crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS);
List<CrossLaneDataHistPO> crossLaneDataHistPOS = new ArrayList<>(crossLaneDataRealTimePOS.size());
BeanListUtils.populateList(crossLaneDataRealTimePOS, crossLaneDataHistPOS, CrossLaneDataHistPO.class);
crossLaneDataHistMapper.insertBatch(crossLaneDataHistPOS);
}
@Transactional
public void saveCrossRealTimeDirData(ObjectMapper instance) throws JsonProcessingException {
String crossRealTimeDirData = ConsumerHandler.getTopicMessage("cross_real_time_dir_data");
if (StringUtils.isNotBlank(crossRealTimeDirData)) {
List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = instance.readValue(crossRealTimeDirData, new TypeReference<List<CrossDirDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossDirDataRealtimePOS)) {
crossDirDataRealTimeMapper.deleteBatch(crossDirDataRealtimePOS.stream().map(CrossDirDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossDirDataRealTimeMapper.insertBatch(crossDirDataRealtimePOS);
List<CrossDirDataHistPO> list = new ArrayList<>(crossDirDataRealtimePOS.size());
BeanListUtils.populateList(crossDirDataRealtimePOS, list, CrossDirDataHistPO.class);
crossDirDataHistMapper.insertBatch(list);
}
}
}
@Transactional
public void saveCrossRealTimeData(ObjectMapper instance) throws JsonProcessingException {
String crossRealTimeData = ConsumerHandler.getTopicMessage("cross_real_time_data");
if (StringUtils.isNotBlank(crossRealTimeData)) {
List<CrossDataRealtimePO> crossDataRealtimePOS = instance.readValue(crossRealTimeData, new TypeReference<List<CrossDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) {
crossDataRealTimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS);
List<CrossDataHistPO> crossDataHistPOS = new ArrayList<>(crossDataRealtimePOS.size());
BeanListUtils.populateList(crossDataRealtimePOS, crossDataHistPOS, CrossDataHistPO.class);
crossDataHistMapper.insertBatch(crossDataHistPOS);
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment