Commit 2bdd1f16 authored by duanruiming's avatar duanruiming

[update] 优化解析实时数据结构

parent 6ad84d48
...@@ -22,6 +22,8 @@ import org.springframework.util.CollectionUtils; ...@@ -22,6 +22,8 @@ import org.springframework.util.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -48,13 +50,16 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -48,13 +50,16 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
try { try {
String crossRealTimeLaneData = String.valueOf(record.value()); String crossRealTimeLaneData = String.valueOf(record.value());
if (StringUtils.isNotBlank(crossRealTimeLaneData)) { if (StringUtils.isNotBlank(crossRealTimeLaneData)) {
List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = JacksonUtils.getInstance().readValue(crossRealTimeLaneData, new TypeReference<List<CrossLaneDataRealTimePO>>() {}); Map<String, List<CrossLaneDataRealTimePO>> map = JacksonUtils.getInstance().readValue(crossRealTimeLaneData, new TypeReference<Map<String, List<CrossLaneDataRealTimePO>>>() {});
if (!CollectionUtils.isEmpty(crossLaneDataRealTimePOS)) { if (Objects.nonNull(map) && !map.isEmpty()) {
crossLaneDataRealTimeMapper.deleteBatch(crossLaneDataRealTimePOS.stream().map(CrossLaneDataRealTimePO::getCrossId).collect(Collectors.toList())); List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = map.get("crossRealTimeLaneData");
crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS); if (!CollectionUtils.isEmpty(crossLaneDataRealTimePOS)) {
List<CrossLaneDataHistPO> crossLaneDataHistPOS = new ArrayList<>(crossLaneDataRealTimePOS.size()); crossLaneDataRealTimeMapper.deleteBatch(crossLaneDataRealTimePOS.stream().map(CrossLaneDataRealTimePO::getCrossId).collect(Collectors.toList()));
BeanListUtils.populateList(crossLaneDataRealTimePOS, crossLaneDataHistPOS, CrossLaneDataHistPO.class); crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS);
crossLaneDataHistMapper.insertBatch(crossLaneDataHistPOS); List<CrossLaneDataHistPO> crossLaneDataHistPOS = new ArrayList<>(crossLaneDataRealTimePOS.size());
BeanListUtils.populateList(crossLaneDataRealTimePOS, crossLaneDataHistPOS, CrossLaneDataHistPO.class);
crossLaneDataHistMapper.insertBatch(crossLaneDataHistPOS);
}
} }
} }
acknowledgment.acknowledge(); acknowledgment.acknowledge();
...@@ -70,14 +75,18 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -70,14 +75,18 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
try { try {
String crossRealTimeData = String.valueOf(record.value()); String crossRealTimeData = String.valueOf(record.value());
if (StringUtils.isNotBlank(crossRealTimeData)) { if (StringUtils.isNotBlank(crossRealTimeData)) {
List<CrossDataRealtimePO> crossDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeData, new TypeReference<List<CrossDataRealtimePO>>() {}); Map<String, List<CrossDataRealtimePO>> map = JacksonUtils.getInstance().readValue(crossRealTimeData, new TypeReference<Map<String, List<CrossDataRealtimePO>>>() {});
if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) { if (Objects.nonNull(map) && !map.isEmpty()) {
crossDataRealTimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList())); List<CrossDataRealtimePO> crossDataRealtimePOS = map.get("crossRealTimeData");
crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS); if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) {
List<CrossDataHistPO> crossDataHistPOS = new ArrayList<>(crossDataRealtimePOS.size()); crossDataRealTimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList()));
BeanListUtils.populateList(crossDataRealtimePOS, crossDataHistPOS, CrossDataHistPO.class); crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS);
crossDataHistMapper.insertBatch(crossDataHistPOS); List<CrossDataHistPO> crossDataHistPOS = new ArrayList<>(crossDataRealtimePOS.size());
BeanListUtils.populateList(crossDataRealtimePOS, crossDataHistPOS, CrossDataHistPO.class);
crossDataHistMapper.insertBatch(crossDataHistPOS);
}
} }
} }
acknowledgment.acknowledge(); acknowledgment.acknowledge();
} catch (Exception e) { } catch (Exception e) {
...@@ -92,13 +101,16 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -92,13 +101,16 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
try { try {
String crossRealTimeDirData = String.valueOf(record.value()); String crossRealTimeDirData = String.valueOf(record.value());
if (StringUtils.isNotBlank(crossRealTimeDirData)) { if (StringUtils.isNotBlank(crossRealTimeDirData)) {
List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeDirData, new TypeReference<List<CrossDirDataRealtimePO>>() {}); Map<String, List<CrossDirDataRealtimePO>> map = JacksonUtils.getInstance().readValue(crossRealTimeDirData, new TypeReference<Map<String, List<CrossDirDataRealtimePO>>>() {});
if (!CollectionUtils.isEmpty(crossDirDataRealtimePOS)) { if (Objects.nonNull(map) && !map.isEmpty()) {
crossDirDataRealTimeMapper.deleteBatch(crossDirDataRealtimePOS.stream().map(CrossDirDataRealtimePO::getCrossId).collect(Collectors.toList())); List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = map.get("crossRealTimeDirData");
crossDirDataRealTimeMapper.insertBatch(crossDirDataRealtimePOS); if (!CollectionUtils.isEmpty(crossDirDataRealtimePOS)) {
List<CrossDirDataHistPO> list = new ArrayList<>(crossDirDataRealtimePOS.size()); crossDirDataRealTimeMapper.deleteBatch(crossDirDataRealtimePOS.stream().map(CrossDirDataRealtimePO::getCrossId).collect(Collectors.toList()));
BeanListUtils.populateList(crossDirDataRealtimePOS, list, CrossDirDataHistPO.class); crossDirDataRealTimeMapper.insertBatch(crossDirDataRealtimePOS);
crossDirDataHistMapper.insertBatch(list); List<CrossDirDataHistPO> list = new ArrayList<>(crossDirDataRealtimePOS.size());
BeanListUtils.populateList(crossDirDataRealtimePOS, list, CrossDirDataHistPO.class);
crossDirDataHistMapper.insertBatch(list);
}
} }
} }
acknowledgment.acknowledge(); acknowledgment.acknowledge();
...@@ -114,14 +126,17 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -114,14 +126,17 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
try { try {
String crossRealTimeTurnData = String.valueOf(record.value()); String crossRealTimeTurnData = String.valueOf(record.value());
if (StringUtils.isNotBlank(crossRealTimeTurnData)) { if (StringUtils.isNotBlank(crossRealTimeTurnData)) {
List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = JacksonUtils.getInstance().readValue(crossRealTimeTurnData, new TypeReference<List<CrossTurnDataRealtimePO>>() {}); Map<String, List<CrossTurnDataRealtimePO>> map = JacksonUtils.getInstance().readValue(crossRealTimeTurnData, new TypeReference<Map<String, List<CrossTurnDataRealtimePO>>>() {});
if (!CollectionUtils.isEmpty(crossTurnDataRealtimePOS)) { if (Objects.nonNull(map) && !map.isEmpty()) {
List<String> crossIds = crossTurnDataRealtimePOS.stream().map(CrossTurnDataRealtimePO::getCrossId).distinct().collect(Collectors.toList()); List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = map.get("crossRealTimeTurnData");
crossTurnDataRealTimeMapper.deleteBatch(crossIds); if (!CollectionUtils.isEmpty(crossTurnDataRealtimePOS)) {
crossTurnDataRealTimeMapper.insertBatch(crossTurnDataRealtimePOS); List<String> crossIds = crossTurnDataRealtimePOS.stream().map(CrossTurnDataRealtimePO::getCrossId).distinct().collect(Collectors.toList());
List<CrossTurnDataHistPO> list = new ArrayList<>(crossTurnDataRealtimePOS.size()); crossTurnDataRealTimeMapper.deleteBatch(crossIds);
BeanListUtils.populateList(crossTurnDataRealtimePOS, list, CrossTurnDataHistPO.class); crossTurnDataRealTimeMapper.insertBatch(crossTurnDataRealtimePOS);
crossTurnDataHistMapper.insertBatch(list); List<CrossTurnDataHistPO> list = new ArrayList<>(crossTurnDataRealtimePOS.size());
BeanListUtils.populateList(crossTurnDataRealtimePOS, list, CrossTurnDataHistPO.class);
crossTurnDataHistMapper.insertBatch(list);
}
} }
} }
acknowledgment.acknowledge(); acknowledgment.acknowledge();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment