Commit 3ab2a488 authored by duanruiming's avatar duanruiming

[update] 添加异常处理

parent 3838cf4d
...@@ -45,69 +45,90 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -45,69 +45,90 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
@Transactional @Transactional
@KafkaListener(topics = {"cross_real_time_lane_data"}) @KafkaListener(topics = {"cross_real_time_lane_data"})
public void receiveCrossRealTimeLaneData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveCrossRealTimeLaneData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String crossRealTimeLaneData = String.valueOf(record.value()); try {
if (StringUtils.isNotBlank(crossRealTimeLaneData)) { String crossRealTimeLaneData = String.valueOf(record.value());
List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = JacksonUtils.getInstance().readValue(crossRealTimeLaneData, new TypeReference<List<CrossLaneDataRealTimePO>>() {}); if (StringUtils.isNotBlank(crossRealTimeLaneData)) {
if (!CollectionUtils.isEmpty(crossLaneDataRealTimePOS)) { List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = JacksonUtils.getInstance().readValue(crossRealTimeLaneData, new TypeReference<List<CrossLaneDataRealTimePO>>() {});
crossLaneDataRealTimeMapper.deleteBatch(crossLaneDataRealTimePOS.stream().map(CrossLaneDataRealTimePO::getCrossId).collect(Collectors.toList())); if (!CollectionUtils.isEmpty(crossLaneDataRealTimePOS)) {
crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS); crossLaneDataRealTimeMapper.deleteBatch(crossLaneDataRealTimePOS.stream().map(CrossLaneDataRealTimePO::getCrossId).collect(Collectors.toList()));
List<CrossLaneDataHistPO> crossLaneDataHistPOS = new ArrayList<>(crossLaneDataRealTimePOS.size()); crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS);
BeanListUtils.populateList(crossLaneDataRealTimePOS, crossLaneDataHistPOS, CrossLaneDataHistPO.class); List<CrossLaneDataHistPO> crossLaneDataHistPOS = new ArrayList<>(crossLaneDataRealTimePOS.size());
crossLaneDataHistMapper.insertBatch(crossLaneDataHistPOS); BeanListUtils.populateList(crossLaneDataRealTimePOS, crossLaneDataHistPOS, CrossLaneDataHistPO.class);
crossLaneDataHistMapper.insertBatch(crossLaneDataHistPOS);
}
} }
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("kafka消费数据插入路口车道实时表异常", e);
throw new Exception(e);
} }
acknowledgment.acknowledge();
} }
@Transactional @Transactional
@KafkaListener(topics = {"cross_real_time_data"}) @KafkaListener(topics = {"cross_real_time_data"})
public void receiveCrossRealTimeData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveCrossRealTimeData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String crossRealTimeData = String.valueOf(record.value()); try {
if (StringUtils.isNotBlank(crossRealTimeData)) { String crossRealTimeData = String.valueOf(record.value());
List<CrossDataRealtimePO> crossDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeData, new TypeReference<List<CrossDataRealtimePO>>() {}); if (StringUtils.isNotBlank(crossRealTimeData)) {
if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) { List<CrossDataRealtimePO> crossDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeData, new TypeReference<List<CrossDataRealtimePO>>() {});
crossDataRealTimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList())); if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) {
crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS); crossDataRealTimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList()));
List<CrossDataHistPO> crossDataHistPOS = new ArrayList<>(crossDataRealtimePOS.size()); crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS);
BeanListUtils.populateList(crossDataRealtimePOS, crossDataHistPOS, CrossDataHistPO.class); List<CrossDataHistPO> crossDataHistPOS = new ArrayList<>(crossDataRealtimePOS.size());
crossDataHistMapper.insertBatch(crossDataHistPOS); BeanListUtils.populateList(crossDataRealtimePOS, crossDataHistPOS, CrossDataHistPO.class);
crossDataHistMapper.insertBatch(crossDataHistPOS);
}
} }
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("kafka消费数据插入路口实时表异常", e);
throw new Exception(e);
} }
acknowledgment.acknowledge();
} }
@Transactional @Transactional
@KafkaListener(topics = {"cross_real_time_dir_data"}) @KafkaListener(topics = {"cross_real_time_dir_data"})
public void receiveCrossRealTimeDirData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveCrossRealTimeDirData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String crossRealTimeDirData = String.valueOf(record.value()); try {
if (StringUtils.isNotBlank(crossRealTimeDirData)) { String crossRealTimeDirData = String.valueOf(record.value());
List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeDirData, new TypeReference<List<CrossDirDataRealtimePO>>() {}); if (StringUtils.isNotBlank(crossRealTimeDirData)) {
if (!CollectionUtils.isEmpty(crossDirDataRealtimePOS)) { List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeDirData, new TypeReference<List<CrossDirDataRealtimePO>>() {});
crossDirDataRealTimeMapper.deleteBatch(crossDirDataRealtimePOS.stream().map(CrossDirDataRealtimePO::getCrossId).collect(Collectors.toList())); if (!CollectionUtils.isEmpty(crossDirDataRealtimePOS)) {
crossDirDataRealTimeMapper.insertBatch(crossDirDataRealtimePOS); crossDirDataRealTimeMapper.deleteBatch(crossDirDataRealtimePOS.stream().map(CrossDirDataRealtimePO::getCrossId).collect(Collectors.toList()));
List<CrossDirDataHistPO> list = new ArrayList<>(crossDirDataRealtimePOS.size()); crossDirDataRealTimeMapper.insertBatch(crossDirDataRealtimePOS);
BeanListUtils.populateList(crossDirDataRealtimePOS, list, CrossDirDataHistPO.class); List<CrossDirDataHistPO> list = new ArrayList<>(crossDirDataRealtimePOS.size());
crossDirDataHistMapper.insertBatch(list); BeanListUtils.populateList(crossDirDataRealtimePOS, list, CrossDirDataHistPO.class);
crossDirDataHistMapper.insertBatch(list);
}
} }
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("kafka消费数据插入路口方向实时表异常", e);
throw new Exception(e);
} }
acknowledgment.acknowledge();
} }
@Transactional @Transactional
@KafkaListener(topics = {"cross_real_time_turn_data"}) @KafkaListener(topics = {"cross_real_time_turn_data"})
public void receiveCrossRealTimeTurnData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveCrossRealTimeTurnData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String crossRealTimeTurnData = String.valueOf(record.value()); try {
if (StringUtils.isNotBlank(crossRealTimeTurnData)) { String crossRealTimeTurnData = String.valueOf(record.value());
List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeTurnData, new TypeReference<List<CrossTurnDataRealtimePO>>() {}); if (StringUtils.isNotBlank(crossRealTimeTurnData)) {
if (!CollectionUtils.isEmpty(crossTurnDataRealtimePOS)) { List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeTurnData, new TypeReference<List<CrossTurnDataRealtimePO>>() {});
crossTurnDataRealTimeMapper.deleteBatch(crossTurnDataRealtimePOS.stream().map(CrossTurnDataRealtimePO::getCrossId).collect(Collectors.toList())); if (!CollectionUtils.isEmpty(crossTurnDataRealtimePOS)) {
crossTurnDataRealTimeMapper.insertBatch(crossTurnDataRealtimePOS); List<String> crossIds = crossTurnDataRealtimePOS.stream().map(CrossTurnDataRealtimePO::getCrossId).distinct().collect(Collectors.toList());
List<CrossTurnDataHistPO> list = new ArrayList<>(crossTurnDataRealtimePOS.size()); crossTurnDataRealTimeMapper.deleteBatch(crossIds);
BeanListUtils.populateList(crossTurnDataRealtimePOS, list, CrossTurnDataHistPO.class); crossTurnDataRealTimeMapper.insertBatch(crossTurnDataRealtimePOS);
crossTurnDataHistMapper.insertBatch(list); List<CrossTurnDataHistPO> list = new ArrayList<>(crossTurnDataRealtimePOS.size());
BeanListUtils.populateList(crossTurnDataRealtimePOS, list, CrossTurnDataHistPO.class);
crossTurnDataHistMapper.insertBatch(list);
}
} }
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("kafka消费数据插入路口转向实时表异常", e);
throw new Exception(e);
} }
acknowledgment.acknowledge();
} }
@Override @Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment