Commit 02ebb804 authored by duanruiming's avatar duanruiming

[update] 优化实时数据入库延时问题

parent 1cdb7e8d
package net.wanji.opt.kafka; package net.wanji.opt.kafka;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import net.wanji.databus.dao.mapper.*;
import net.wanji.databus.po.*;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
...@@ -10,9 +17,12 @@ import org.springframework.kafka.listener.ListenerExecutionFailedException; ...@@ -10,9 +17,12 @@ import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.HashMap; import java.util.ArrayList;
import java.util.Map; import java.util.List;
import java.util.stream.Collectors;
/** /**
* @author duanruiming * @author duanruiming
...@@ -20,45 +30,83 @@ import java.util.Map; ...@@ -20,45 +30,83 @@ import java.util.Map;
*/ */
@Component @Component
@Slf4j @Slf4j
@RequiredArgsConstructor
public class ConsumerHandler implements KafkaListenerErrorHandler { public class ConsumerHandler implements KafkaListenerErrorHandler {
/** private final CrossDirDataRealtimeMapper crossDirDataRealTimeMapper;
* kafka数据缓存 private final CrossDirDataHistMapper crossDirDataHistMapper;
* key:topic value:data private final CrossTurnDataRealtimeMapper crossTurnDataRealTimeMapper;
*/ private final CrossTurnDataHistMapper crossTurnDataHistMapper;
private static final Map<String, String> realTimeCarInfoMap = new HashMap<>(); private final CrossLaneDataRealTimeMapper crossLaneDataRealTimeMapper;
private final CrossLaneDataHistMapper crossLaneDataHistMapper;
private final CrossDataRealtimeMapper crossDataRealTimeMapper;
private final CrossDataHistMapper crossDataHistMapper;
/** @Transactional
* 通过消息主题获取消息信息 @KafkaListener(topics = {"cross_real_time_lane_data"})
* public void receiveCrossRealTimeLaneData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
* @param topic String crossRealTimeLaneData = String.valueOf(record.value());
* @return if (StringUtils.isNotBlank(crossRealTimeLaneData)) {
*/ List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = JacksonUtils.getInstance().readValue(crossRealTimeLaneData, new TypeReference<List<CrossLaneDataRealTimePO>>() {});
public static String getTopicMessage(String topic) { if (!CollectionUtils.isEmpty(crossLaneDataRealTimePOS)) {
if (realTimeCarInfoMap.isEmpty()) { crossLaneDataRealTimeMapper.deleteBatch(crossLaneDataRealTimePOS.stream().map(CrossLaneDataRealTimePO::getCrossId).collect(Collectors.toList()));
return null; crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS);
List<CrossLaneDataHistPO> crossLaneDataHistPOS = new ArrayList<>(crossLaneDataRealTimePOS.size());
BeanListUtils.populateList(crossLaneDataRealTimePOS, crossLaneDataHistPOS, CrossLaneDataHistPO.class);
crossLaneDataHistMapper.insertBatch(crossLaneDataHistPOS);
}
} }
return realTimeCarInfoMap.get(topic); acknowledgment.acknowledge();
} }
@Transactional
@KafkaListener(topics = {"cross_real_time_data"}) @KafkaListener(topics = {"cross_real_time_data"})
public void receiveCrossRealTimeData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) { public void receiveCrossRealTimeData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
realTimeCarInfoMap.put(record.topic(), String.valueOf(record.value())); String crossRealTimeData = String.valueOf(record.value());
if (StringUtils.isNotBlank(crossRealTimeData)) {
List<CrossDataRealtimePO> crossDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeData, new TypeReference<List<CrossDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) {
crossDataRealTimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS);
List<CrossDataHistPO> crossDataHistPOS = new ArrayList<>(crossDataRealtimePOS.size());
BeanListUtils.populateList(crossDataRealtimePOS, crossDataHistPOS, CrossDataHistPO.class);
crossDataHistMapper.insertBatch(crossDataHistPOS);
}
}
acknowledgment.acknowledge(); acknowledgment.acknowledge();
} }
@Transactional
@KafkaListener(topics = {"cross_real_time_dir_data"}) @KafkaListener(topics = {"cross_real_time_dir_data"})
public void receiveCrossRealTimeDirData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) { public void receiveCrossRealTimeDirData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
realTimeCarInfoMap.put(record.topic(), String.valueOf(record.value())); String crossRealTimeDirData = String.valueOf(record.value());
if (StringUtils.isNotBlank(crossRealTimeDirData)) {
List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeDirData, new TypeReference<List<CrossDirDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossDirDataRealtimePOS)) {
crossDirDataRealTimeMapper.deleteBatch(crossDirDataRealtimePOS.stream().map(CrossDirDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossDirDataRealTimeMapper.insertBatch(crossDirDataRealtimePOS);
List<CrossDirDataHistPO> list = new ArrayList<>(crossDirDataRealtimePOS.size());
BeanListUtils.populateList(crossDirDataRealtimePOS, list, CrossDirDataHistPO.class);
crossDirDataHistMapper.insertBatch(list);
}
}
acknowledgment.acknowledge(); acknowledgment.acknowledge();
} }
@Transactional
@KafkaListener(topics = {"cross_real_time_turn_data"}) @KafkaListener(topics = {"cross_real_time_turn_data"})
public void receiveCrossRealTimeTurnData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) { public void receiveCrossRealTimeTurnData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
realTimeCarInfoMap.put(record.topic(), String.valueOf(record.value())); String crossRealTimeTurnData = String.valueOf(record.value());
acknowledgment.acknowledge(); if (StringUtils.isNotBlank(crossRealTimeTurnData)) {
} List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeTurnData, new TypeReference<List<CrossTurnDataRealtimePO>>() {});
@KafkaListener(topics = {"cross_real_time_lane_data"}) if (!CollectionUtils.isEmpty(crossTurnDataRealtimePOS)) {
public void receiveCrossRealTimeLaneData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) { crossTurnDataRealTimeMapper.deleteBatch(crossTurnDataRealtimePOS.stream().map(CrossTurnDataRealtimePO::getCrossId).collect(Collectors.toList()));
realTimeCarInfoMap.put(record.topic(), String.valueOf(record.value())); crossTurnDataRealTimeMapper.insertBatch(crossTurnDataRealtimePOS);
List<CrossTurnDataHistPO> list = new ArrayList<>(crossTurnDataRealtimePOS.size());
BeanListUtils.populateList(crossTurnDataRealtimePOS, list, CrossTurnDataHistPO.class);
crossTurnDataHistMapper.insertBatch(list);
}
}
acknowledgment.acknowledge(); acknowledgment.acknowledge();
} }
......
package net.wanji.opt.task;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import net.wanji.databus.dao.mapper.*;
import net.wanji.databus.po.*;
import net.wanji.opt.kafka.ConsumerHandler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/08/17 8:58
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class DataCenterProcessTask {
private final CrossDirDataRealtimeMapper crossDirDataRealTimeMapper;
private final CrossDirDataHistMapper crossDirDataHistMapper;
private final CrossTurnDataRealtimeMapper crossTurnDataRealTimeMapper;
private final CrossTurnDataHistMapper crossTurnDataHistMapper;
private final CrossLaneDataRealTimeMapper crossLaneDataRealTimeMapper;
private final CrossLaneDataHistMapper crossLaneDataHistMapper;
private final CrossDataRealtimeMapper crossDataRealTimeMapper;
private final CrossDataHistMapper crossDataHistMapper;
/**
* @description 收到数据中心kafka中的实时数据插入实时数据表、实时历史表
* @param
* @return void
*/
@Scheduled(fixedRate = 5 * 60 * 1000)
public void realTimeDataSave() throws Exception {
ObjectMapper instance = JacksonUtils.getInstance();
try {
saveCrossRealTimeData(instance);
} catch (Exception e) {
log.error("路口实时数据插入失败", e);
}
try {
saveCrossRealTimeDirData(instance);
} catch (Exception e) {
log.error("路口方向实时数据插入失败", e);
}
try {
saveCrossRealTimeTurnData(instance);
} catch (Exception e) {
log.error("路口转向实时数据插入失败", e);
}
try {
saveCrossRealTimeLaneData(instance);
} catch (Exception e) {
log.error("路口车道实时数据插入失败", e);
}
}
@Transactional
public void saveCrossRealTimeLaneData(ObjectMapper instance) throws JsonProcessingException {
String crossRealTimeLaneData = ConsumerHandler.getTopicMessage("cross_real_time_lane_data");
if (StringUtils.isNotBlank(crossRealTimeLaneData)) {
List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = instance.readValue(crossRealTimeLaneData, new TypeReference<List<CrossLaneDataRealTimePO>>() {});
if (!CollectionUtils.isEmpty(crossLaneDataRealTimePOS)) {
crossLaneDataRealTimeMapper.deleteBatch(crossLaneDataRealTimePOS.stream().map(CrossLaneDataRealTimePO::getCrossId).collect(Collectors.toList()));
crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS);
List<CrossLaneDataHistPO> crossLaneDataHistPOS = new ArrayList<>(crossLaneDataRealTimePOS.size());
BeanListUtils.populateList(crossLaneDataRealTimePOS, crossLaneDataHistPOS, CrossLaneDataHistPO.class);
crossLaneDataHistMapper.insertBatch(crossLaneDataHistPOS);
}
}
}
@Transactional
public void saveCrossRealTimeTurnData(ObjectMapper instance) throws JsonProcessingException {
String crossRealTimeTurnData = ConsumerHandler.getTopicMessage("cross_real_time_turn_data");
if (StringUtils.isNotBlank(crossRealTimeTurnData)) {
List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = instance.readValue(crossRealTimeTurnData, new TypeReference<List<CrossTurnDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossTurnDataRealtimePOS)) {
crossTurnDataRealTimeMapper.deleteBatch(crossTurnDataRealtimePOS.stream().map(CrossTurnDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossTurnDataRealTimeMapper.insertBatch(crossTurnDataRealtimePOS);
List<CrossTurnDataHistPO> list = new ArrayList<>(crossTurnDataRealtimePOS.size());
BeanListUtils.populateList(crossTurnDataRealtimePOS, list, CrossTurnDataHistPO.class);
crossTurnDataHistMapper.insertBatch(list);
}
}
}
@Transactional
public void saveCrossRealTimeDirData(ObjectMapper instance) throws JsonProcessingException {
String crossRealTimeDirData = ConsumerHandler.getTopicMessage("cross_real_time_dir_data");
if (StringUtils.isNotBlank(crossRealTimeDirData)) {
List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = instance.readValue(crossRealTimeDirData, new TypeReference<List<CrossDirDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossDirDataRealtimePOS)) {
crossDirDataRealTimeMapper.deleteBatch(crossDirDataRealtimePOS.stream().map(CrossDirDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossDirDataRealTimeMapper.insertBatch(crossDirDataRealtimePOS);
List<CrossDirDataHistPO> list = new ArrayList<>(crossDirDataRealtimePOS.size());
BeanListUtils.populateList(crossDirDataRealtimePOS, list, CrossDirDataHistPO.class);
crossDirDataHistMapper.insertBatch(list);
}
}
}
@Transactional
public void saveCrossRealTimeData(ObjectMapper instance) throws JsonProcessingException {
String crossRealTimeData = ConsumerHandler.getTopicMessage("cross_real_time_data");
if (StringUtils.isNotBlank(crossRealTimeData)) {
List<CrossDataRealtimePO> crossDataRealtimePOS = instance.readValue(crossRealTimeData, new TypeReference<List<CrossDataRealtimePO>>() {});
if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) {
crossDataRealTimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS);
List<CrossDataHistPO> crossDataHistPOS = new ArrayList<>(crossDataRealtimePOS.size());
BeanListUtils.populateList(crossDataRealtimePOS, crossDataHistPOS, CrossDataHistPO.class);
crossDataHistMapper.insertBatch(crossDataHistPOS);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment