Commit cac32575 authored by duanruiming's avatar duanruiming

[update] 城市大脑-路口实时数据保存优化

parent 5c91161c
package net.wanji.opt.dao.mapper;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import net.wanji.opt.po.ForeignDevicePO;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
* @author duanruiming
* @date 2024/07/22 10:54
*/
@Repository
@DS("holo")
public interface HoloForeignDeviceMapper extends BaseMapper<ForeignDevicePO> {
List<ForeignDevicePO> selectDistinctDevice();
}
...@@ -9,6 +9,8 @@ import net.wanji.common.utils.tool.JacksonUtils; ...@@ -9,6 +9,8 @@ import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils; import net.wanji.common.utils.tool.StringUtils;
import net.wanji.databus.dao.mapper.*; import net.wanji.databus.dao.mapper.*;
import net.wanji.databus.po.*; import net.wanji.databus.po.*;
import net.wanji.opt.dao.mapper.HoloForeignDeviceMapper;
import net.wanji.opt.po.ForeignDevicePO;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
...@@ -20,10 +22,7 @@ import org.springframework.stereotype.Component; ...@@ -20,10 +22,7 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -43,6 +42,7 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -43,6 +42,7 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
private final CrossLaneDataHistMapper crossLaneDataHistMapper; private final CrossLaneDataHistMapper crossLaneDataHistMapper;
private final CrossDataRealtimeMapper crossDataRealTimeMapper; private final CrossDataRealtimeMapper crossDataRealTimeMapper;
private final CrossDataHistMapper crossDataHistMapper; private final CrossDataHistMapper crossDataHistMapper;
private final HoloForeignDeviceMapper holoForeignDeviceMapper;
@Transactional @Transactional
@KafkaListener(topics = {"cross_real_time_lane_data"}) @KafkaListener(topics = {"cross_real_time_lane_data"})
...@@ -69,6 +69,17 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -69,6 +69,17 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
} }
} }
private Map<String, String> getDeviceNoMap() {
List<ForeignDevicePO> foreignDevicePOS = holoForeignDeviceMapper.selectDistinctDevice();
Map<String, String> deviceMap = new HashMap<>();
if (!CollectionUtils.isEmpty(foreignDevicePOS)) {
foreignDevicePOS.forEach(item -> {
deviceMap.put(item.getDeviceNo(), item.getCrossId());
});
}
return deviceMap;
}
@Transactional @Transactional
@KafkaListener(topics = {"cross_real_time_data"}) @KafkaListener(topics = {"cross_real_time_data"})
public void receiveCrossRealTimeData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveCrossRealTimeData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
...@@ -79,14 +90,17 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -79,14 +90,17 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
if (Objects.nonNull(map) && !map.isEmpty()) { if (Objects.nonNull(map) && !map.isEmpty()) {
List<CrossDataRealtimePO> crossDataRealtimePOS = map.get("crossRealTimeData"); List<CrossDataRealtimePO> crossDataRealtimePOS = map.get("crossRealTimeData");
if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) { if (!CollectionUtils.isEmpty(crossDataRealtimePOS)) {
crossDataRealTimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList())); Map<String, String> deviceNoMap = getDeviceNoMap();
crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS); List<CrossDataRealtimePO> noWanjiList = getNoWjList(crossDataRealtimePOS, deviceNoMap);
List<CrossDataHistPO> crossDataHistPOS = new ArrayList<>(crossDataRealtimePOS.size()); if (!CollectionUtils.isEmpty(noWanjiList)) {
BeanListUtils.populateList(crossDataRealtimePOS, crossDataHistPOS, CrossDataHistPO.class); crossDataRealTimeSave(noWanjiList);
crossDataHistMapper.insertBatch(crossDataHistPOS); }
List<CrossDataRealtimePO> wanjiList = crossDataRealtimePOS.stream().filter(po -> !deviceNoMap.containsKey(po.getCrossId())).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(wanjiList)) {
crossDataRealTimeSave(wanjiList);
}
} }
} }
} }
acknowledgment.acknowledge(); acknowledgment.acknowledge();
} catch (Exception e) { } catch (Exception e) {
...@@ -95,6 +109,26 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -95,6 +109,26 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
} }
} }
private List<CrossDataRealtimePO> getNoWjList(List<CrossDataRealtimePO> crossDataRealtimePOS, Map<String, String> deviceNoMap) {
List<CrossDataRealtimePO> noWanjiList = new ArrayList<>();
for (CrossDataRealtimePO realtimePO : crossDataRealtimePOS) {
String crossId = realtimePO.getCrossId();
if (deviceNoMap.containsKey(crossId)) {
realtimePO.setCrossId(deviceNoMap.get(crossId));
noWanjiList.add(realtimePO);
}
}
return noWanjiList;
}
private void crossDataRealTimeSave(List<CrossDataRealtimePO> crossDataRealtimePOS) {
crossDataRealTimeMapper.deleteBatch(crossDataRealtimePOS.stream().map(CrossDataRealtimePO::getCrossId).collect(Collectors.toList()));
crossDataRealTimeMapper.insertBatch(crossDataRealtimePOS);
List<CrossDataHistPO> crossDataHistPOS = new ArrayList<>(crossDataRealtimePOS.size());
BeanListUtils.populateList(crossDataRealtimePOS, crossDataHistPOS, CrossDataHistPO.class);
crossDataHistMapper.insertBatch(crossDataHistPOS);
}
@Transactional @Transactional
@KafkaListener(topics = {"cross_real_time_dir_data"}) @KafkaListener(topics = {"cross_real_time_dir_data"})
public void receiveCrossRealTimeDirData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveCrossRealTimeDirData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
......
package net.wanji.opt.po;
import lombok.Data;
import java.util.Date;
/**
* 外部设备表
*/
@Data
public class ForeignDevicePO {
/** */
private Integer id ;
private String laneId ;
private String code ;
private Integer sort ;
private Integer type ;
private Integer dir ;
private Integer turn ;
private Integer category ;
private String crossId ;
private String rid ;
private String segmentId ;
private Double length ;
private Double width ;
private String wkt ;
private Date gmtCreate ;
private Date gmtModified ;
private Double endAngle ;
private String manufacturer ;
private String crossName ;
private String signalIp ;
private Integer signalId ;
private Double crossLon ;
private Double crossLat ;
private String deviceType ;
private Integer sectionNo ;
private Double deviceLon ;
private Double deviceLat ;
private String deviceIp ;
private String deviceNo ;
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="net.wanji.opt.dao.mapper.HoloForeignDeviceMapper">
<select id="selectDistinctDevice" resultType="net.wanji.opt.po.ForeignDevicePO">
select distinct cross_id, device_no from t_foreign_device where cross_id != device_no
</select>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment