Commit 70142148 authored by duanruiming's avatar duanruiming

[update] 优化实时数据,经过数据中心处理后保存

parent ee22abb0
......@@ -34,7 +34,7 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
* @param topic
* @return
*/
public String getTopicMessage(String topic) {
public static String getTopicMessage(String topic) {
if (realTimeCarInfoMap.isEmpty()) {
return null;
}
......
package net.wanji.opt.task;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import net.wanji.databus.dao.mapper.*;
import net.wanji.databus.po.CrossDataRealtimePO;
import net.wanji.databus.po.CrossDirDataRealtimePO;
import net.wanji.databus.po.CrossLaneDataRealTimePO;
import net.wanji.databus.po.CrossTurnDataRealtimePO;
import net.wanji.opt.kafka.ConsumerHandler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* @author duanruiming
* @date 2023/08/17 8:58
*/
@Component
@RequiredArgsConstructor
public class DataCenterProcessTask {
private final CrossBaseLaneInfoMapper crossBaseLaneInfoMapper;
private final CrossDirDataRealtimeMapper crossDirDataRealTimeMapper;
private final CrossTurnDataRealtimeMapper crossTurnDataRealTimeMapper;
private final CrossLaneDataRealTimeMapper crossLaneDataRealTimeMapper;
private final CrossDataRealtimeMapper crossDataRealTimeMapper;
@Scheduled(fixedRate = 1 * 60 * 1000)
public void realTimeDataSave() throws Exception {
ObjectMapper instance = JacksonUtils.getInstance();
String crossRealTimeData = ConsumerHandler.getTopicMessage("cross_real_time_data");
if (StringUtils.isNotBlank(crossRealTimeData)) {
List<CrossDataRealtimePO> crossDataRealtimePOS = instance.readValue(crossRealTimeData, new TypeReference<List<CrossDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) {
// crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS);
}
}
String crossRealTimeDirData = ConsumerHandler.getTopicMessage("cross_real_time_dir_data");
if (StringUtils.isNotBlank(crossRealTimeDirData)) {
List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = instance.readValue(crossRealTimeDirData, new TypeReference<List<CrossDirDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossDirDataRealtimePOS)) {
// crossDirDataRealTimeMapper.insertBatch(crossDirDataRealtimePOS);
}
}
String crossRealTimeTurnData = ConsumerHandler.getTopicMessage("cross_real_time_turn_data");
if (StringUtils.isNotBlank(crossRealTimeTurnData)) {
List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = instance.readValue(crossRealTimeTurnData, new TypeReference<List<CrossTurnDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossTurnDataRealtimePOS)) {
// crossTurnDataRealTimeMapper.insertBatch(crossTurnDataRealtimePOS);
}
}
String crossRealTimeLaneData = ConsumerHandler.getTopicMessage("cross_real_time_lane_data");
if (StringUtils.isNotBlank(crossRealTimeLaneData)) {
List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = instance.readValue(crossRealTimeLaneData, new TypeReference<List<CrossLaneDataRealTimePO>>() {});
if (!CollectionUtils.isEmpty(crossLaneDataRealTimePOS)) {
// crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment