Commit 39a8f777 authored by duanruiming's avatar duanruiming

[update] 持久化优化

parent fe04dd77
......@@ -5,6 +5,7 @@ import net.wanji.datacenter.pojo.po.CrossDataRealTimePO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
/**
......@@ -14,4 +15,5 @@ import java.util.List;
@Mapper
public interface CrossDataRealTimeMapper extends BaseMapper<CrossDataRealTimePO> {
void insertBatch(@Param("list") List<CrossDataRealTimePO> list);
void deleteBatch(@Param("list") Collection<String> crossIds);
}
......@@ -5,6 +5,7 @@ import net.wanji.datacenter.pojo.po.CrossDirDataRealTimePO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
/**
......@@ -14,4 +15,6 @@ import java.util.List;
@Mapper
public interface CrossDirDataRealTimeMapper extends BaseMapper<CrossDirDataRealTimePO> {
void insertBatch(@Param("list")List<CrossDirDataRealTimePO> list);
void deleteBatch(@Param("list") Collection<String> crossIds);
}
......@@ -5,6 +5,7 @@ import net.wanji.datacenter.pojo.po.CrossLaneDataRealTimePO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
/**
......@@ -14,4 +15,6 @@ import java.util.List;
@Mapper
public interface CrossLaneDataRealTimeMapper extends BaseMapper<CrossLaneDataRealTimePO> {
void insertBatch(@Param("list") List<CrossLaneDataRealTimePO> list);
void deleteBatch(@Param("list") Collection<String> crossIds);
}
package net.wanji.datacenter.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import net.wanji.datacenter.pojo.po.CrossTurnDataRealtimePO;
import net.wanji.datacenter.pojo.po.CrossTurnDataRealTimePO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
/**
......@@ -12,6 +13,8 @@ import java.util.List;
* @date 2023/03/10 21:11
*/
@Mapper
public interface CrossTurnDataRealTimeMapper extends BaseMapper<CrossTurnDataRealtimePO> {
void insertBatch(@Param("list") List<CrossTurnDataRealtimePO> list);
public interface CrossTurnDataRealTimeMapper extends BaseMapper<CrossTurnDataRealTimePO> {
void insertBatch(@Param("list") List<CrossTurnDataRealTimePO> list);
void deleteBatch(@Param("list") Collection<String> crossIds);
}
......@@ -100,7 +100,7 @@ public class LanePeriodicDataEventListConvert {
return crossDirDataRealTimePO;
}
public CrossTurnDataRealtimePO convert2CrossTurnDataRealtimePO(LanePeriodicDataDTO.EventList eventList, CrossBaseLaneInfoPO laneInfoPO) {
public CrossTurnDataRealTimePO convert2CrossTurnDataRealtimePO(LanePeriodicDataDTO.EventList eventList, CrossBaseLaneInfoPO laneInfoPO) {
String crossId = laneInfoPO.getCrossId();
// todo 路口状态
int trafficFlow = eventList.getTrafficFlow();
......@@ -116,7 +116,7 @@ public class LanePeriodicDataEventListConvert {
Integer type = laneInfoPO.getType();
Integer turn = laneInfoPO.getTurn();
CrossTurnDataRealtimePO crossTurnDataRealtimePO = new CrossTurnDataRealtimePO();
CrossTurnDataRealTimePO crossTurnDataRealtimePO = new CrossTurnDataRealTimePO();
crossTurnDataRealtimePO.setId( crossId.concat("").concat(String.valueOf(turn)));
crossTurnDataRealtimePO.setTurnType("");
crossTurnDataRealtimePO.setInDir(0000);
......
......@@ -15,7 +15,7 @@ import java.util.Date;
@Data
@TableName("t_lane_data_realtime")
@ApiModel(value = "CrossTurnDataRealtimePO", description = "路口转向实时数据")
public class CrossTurnDataRealtimePO {
public class CrossTurnDataRealTimePO {
/** 转向ID(路口ID_驶入方向_转向类型) */
@ApiModelProperty(name = "转向ID(路口ID_驶入方向_转向类型)",notes = "")
private String id ;
......
package net.wanji.datacenter.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.wanji.datacenter.kafka.ConsumerHandler;
......@@ -10,14 +9,14 @@ import net.wanji.datacenter.pojo.convert.LanePeriodicDataEventListConvert;
import net.wanji.datacenter.pojo.dto.LanePeriodicDataDTO;
import net.wanji.datacenter.pojo.po.*;
import net.wanji.datacenter.service.DataProcessService;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author duanruiming
......@@ -27,6 +26,7 @@ import java.util.List;
@Slf4j
@Service(value = "JN040001LanePeriodicData")
@RequiredArgsConstructor
@SuppressWarnings("all")
public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
private final ConsumerHandler consumerHandler;
......@@ -67,7 +67,7 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
int laneNum = lanePeriodicDataDTO.getLaneNum();
List<CrossDataRealTimePO> crossDataRealTimePOS = new ArrayList<>(laneNum);
List<CrossDirDataRealTimePO> crossDirDataRealTimePOS = new ArrayList<>(laneNum);
List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = new ArrayList<>(laneNum);
List<CrossTurnDataRealTimePO> crossTurnDataRealTimePOS = new ArrayList<>(laneNum);
List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = new ArrayList<>(laneNum);
List<LanePeriodicDataDTO.EventList> eventLists = lanePeriodicDataDTO.getEventList();
......@@ -77,10 +77,10 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
}
// 通过车道编号获取车道信息
// List<String> LaneIdList = eventLists.stream().map(LanePeriodicDataDTO.EventList::getLaneId).collect(Collectors.toList());
// "14PU609IQA014PUG09IRA0010012"
List<String> LaneIdList = Arrays.asList("14PU609IQA014PUG09IRA0010011");
List<String> LaneIdList = Arrays.asList("14PU609IQA014PUG09IRA0010011", "14PU609IQA014PUG09IRA0010012");
List<CrossBaseLaneInfoPO> crossBaseLaneInfoPOS = crossBaseLaneInfoMapper.selectBatchIds(LaneIdList);
// 将kafka车道数据与路口车道数据对应,获取路口编号,方向,转向,分组计算
for (LanePeriodicDataDTO.EventList laneRealTimeItem : eventLists) {
String laneId = laneRealTimeItem.getLaneId();
for (CrossBaseLaneInfoPO laneInfoPO : crossBaseLaneInfoPOS) {
......@@ -91,25 +91,107 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
crossDirDataRealTimePOS.add(lanePeriodicDataEventListConvert.convert2CrossDirDataRealTimePO(laneRealTimeItem, laneInfoPO));
crossTurnDataRealtimePOS.add(lanePeriodicDataEventListConvert.convert2CrossTurnDataRealtimePO(laneRealTimeItem, laneInfoPO));
crossTurnDataRealTimePOS.add(lanePeriodicDataEventListConvert.convert2CrossTurnDataRealtimePO(laneRealTimeItem, laneInfoPO));
crossLaneDataRealTimePOS.add(lanePeriodicDataEventListConvert.convert2CrossLaneDataRealTimePO(laneRealTimeItem, laneInfoPO));
String crossId = laneInfoPO.getCrossId();
crossDataRealTimeMapper.delete(new LambdaQueryWrapper<CrossDataRealTimePO>().eq(CrossDataRealTimePO::getCrossId, crossId));
crossDirDataRealTimeMapper.delete(new LambdaQueryWrapper<CrossDirDataRealTimePO>().eq(CrossDirDataRealTimePO::getCrossId, crossId));
crossTurnDataRealTimeMapper.delete(new LambdaQueryWrapper<CrossTurnDataRealtimePO>().eq(CrossTurnDataRealtimePO::getCrossId, crossId));
crossLaneDataRealTimeMapper.delete(new LambdaQueryWrapper<CrossLaneDataRealTimePO>().eq(CrossLaneDataRealTimePO::getCrossId, crossId));
}
}
}
crossDataRealTimeMapper.insertBatch(crossDataRealTimePOS);
crossDirDataRealTimeMapper.insertBatch(crossDirDataRealTimePOS);
crossTurnDataRealTimeMapper.insertBatch(crossTurnDataRealtimePOS);
crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS);
// 删除操作 实时数据表只能有一条实时数据
Set<String> crossDataRealTimeSet = crossDataRealTimePOS.stream().map(CrossDataRealTimePO::getCrossId).collect(Collectors.toSet());
crossDataRealTimeMapper.deleteBatch(crossDataRealTimeSet);
Set<String> crossDirDataRealTimeSet = crossDirDataRealTimePOS.stream().map(CrossDirDataRealTimePO::getCrossId).collect(Collectors.toSet());
crossDirDataRealTimeMapper.deleteBatch(crossDirDataRealTimeSet);
Set<String> crossTurnDataRealTimeSet = crossTurnDataRealTimePOS.stream().map(CrossTurnDataRealTimePO::getCrossId).collect(Collectors.toSet());
crossTurnDataRealTimeMapper.deleteBatch(crossTurnDataRealTimeSet);
Set<String> crossLaneDataRealTimeSet = crossLaneDataRealTimePOS.stream().map(CrossLaneDataRealTimePO::getCrossId).collect(Collectors.toSet());
crossLaneDataRealTimeMapper.deleteBatch(crossLaneDataRealTimeSet);
// 将车道转换的数据通过路口,方向,转向维度计算,存储
List<CrossDataRealTimePO> insertCrossDataRealTimePOS = new ArrayList<>();
Map<String, List<CrossDataRealTimePO>> crossDataRealTimePOSMap = crossDataRealTimePOS.stream().collect(Collectors.groupingBy(CrossDataRealTimePO::getCrossId));
for (Map.Entry<String, List<CrossDataRealTimePO>> entry : crossDataRealTimePOSMap.entrySet()) {
String crossId = entry.getKey();
List<CrossDataRealTimePO> value = entry.getValue();
CrossDataRealTimePO insertCrossDataRealTimePO = new CrossDataRealTimePO();
Integer status = 0;
Double trafficIndex = 0.0;
Integer flow = 0;
Double flowRate = 0.0;
Double speed = 0.0;
Double queueLength = 0.0;
Integer stopTimes = 0;
Integer delayTime = 0;
for (CrossDataRealTimePO crossDataRealTimePO : value) {
// 每个路口计算交通流量累加,筛选最大停车长度
flow += crossDataRealTimePO.getFlow();
flowRate += crossDataRealTimePO.getFlowRate();
speed += crossDataRealTimePO.getSpeed();
queueLength = queueLength >= crossDataRealTimePO.getQueueLength() ? queueLength : crossDataRealTimePO.getQueueLength();
stopTimes += crossDataRealTimePO.getStopTimes();
delayTime += crossDataRealTimePO.getDelayTime();
BeanUtils.copyProperties(crossDataRealTimePO, insertCrossDataRealTimePO);
}
insertCrossDataRealTimePO.setStatus(status);
insertCrossDataRealTimePO.setTrafficIndex(trafficIndex);
insertCrossDataRealTimePO.setFlow(flow);
insertCrossDataRealTimePO.setFlowRate(flowRate / value.size());
insertCrossDataRealTimePO.setSpeed(speed / value.size());
insertCrossDataRealTimePO.setQueueLength(queueLength);
insertCrossDataRealTimePO.setStopTimes(stopTimes / value.size());
insertCrossDataRealTimePO.setDelayTime(delayTime / value.size());
insertCrossDataRealTimePOS.add(insertCrossDataRealTimePO);
}
// crossDataRealTimeMapper.insertBatch(insertCrossDataRealTimePOS);
// 方向数据
List<CrossDirDataRealTimePO> insertCrossDirDataRealTimePOS = new ArrayList<>();
Map<String, List<CrossDirDataRealTimePO>> crossDirDataRealTimePOSMap = crossDirDataRealTimePOS.stream().collect(Collectors.groupingBy(CrossDirDataRealTimePO::getCrossId));
for (Map.Entry<String, List<CrossDirDataRealTimePO>> entry : crossDirDataRealTimePOSMap.entrySet()) {
String key = entry.getKey();
List<CrossDirDataRealTimePO> value = entry.getValue();
Map<Integer, List<CrossDirDataRealTimePO>> crossIdMap = value.stream().collect(Collectors.groupingBy(CrossDirDataRealTimePO::getDirType));
CrossDirDataRealTimePO crossDirDataRealTimePO = new CrossDirDataRealTimePO();
Integer status = 0;
Double trafficIndex = 0.0;
Integer flow = 0;
Double flowRate = 0.0;
Double speed = 0.0;
Double queueLength = 0.0;
Integer stopTimes = 0;
Integer delayTime = 0;
for (Map.Entry<Integer, List<CrossDirDataRealTimePO>> dirEntry : crossIdMap.entrySet()) {
Integer dir = dirEntry.getKey();
List<CrossDirDataRealTimePO> dirMap = dirEntry.getValue();
// 每个方向计算交通流量累加,筛选最大停车长度
}
insertCrossDirDataRealTimePOS.add(crossDirDataRealTimePO);
}
// crossDirDataRealTimeMapper.insertBatch(insertCrossDirDataRealTimePOS);
// 转向数据
ArrayList<CrossTurnDataRealTimePO> insertCrossTurnDataRealTimePOS = new ArrayList<>();
Map<String, List<CrossTurnDataRealTimePO>> crossTurnDataRealTimePOSMap = crossTurnDataRealTimePOS.stream().collect(Collectors.groupingBy(CrossTurnDataRealTimePO::getCrossId));
for (Map.Entry<String, List<CrossTurnDataRealTimePO>> entry : crossTurnDataRealTimePOSMap.entrySet()) {
String key = entry.getKey();
List<CrossTurnDataRealTimePO> value = entry.getValue();
CrossTurnDataRealTimePO crossTurnDataRealTimePO = new CrossTurnDataRealTimePO();
Map<String, List<CrossTurnDataRealTimePO>> crossIdMap = value.stream().collect(Collectors.groupingBy(CrossTurnDataRealTimePO::getTurnType));
for (Map.Entry<String, List<CrossTurnDataRealTimePO>> item : crossIdMap.entrySet()) {
String turn = item.getKey();
List<CrossTurnDataRealTimePO> turnMap = item.getValue();
}
insertCrossTurnDataRealTimePOS.add(crossTurnDataRealTimePO);
}
// crossTurnDataRealTimeMapper.insertBatch(insertCrossTurnDataRealTimePOS);
// 车道数据
// crossLaneDataRealTimeMapper.insertBatch(crossLaneDataRealTimePOS);
}
}
......@@ -46,4 +46,12 @@
#{entity.flow},#{entity.flowRate},#{entity.speed},#{entity.queueLength},#{entity.stopTimes},#{entity.delayTime},#{entity.sturation},#{entity.batchTime},#{entity.gmtCreate},#{entity.gmtModified})
</foreach>
</insert>
<delete id="deleteBatch" parameterType="String">
delete from t_cross_data_realtime
where cross_id in
<foreach collection="list" item="crossId" separator=",">
(#{crossId})
</foreach>
</delete>
</mapper>
\ No newline at end of file
......@@ -35,8 +35,16 @@
(<include refid="Base_Column_List"></include>)
values
<foreach collection="list" item="entity" separator=",">
(#{entity.id},#{entity.dirType},#{entity.inOutType},#{entity.crossId},#{entity.length},#{entity.status},#{entity.trafficIndex},#{entity.startTime},#{entity.capacity}, #{entity.duration},#{entity.flow},
#{entity.speed},#{entity.queueLength},#{entity.stopTimes},#{entity.delayTime},#{entity.sturation},#{entity.batchTime},#{entity.gmtCreate},#{entity.gmtModified},#{entity.effusionTime})
( #{entity.id}, #{entity.dirType}, #{entity.inOutType}, #{entity.crossId}, #{entity.length}, #{entity.status}, #{entity.trafficIndex}, #{entity.startTime}, #{entity.capacity}, #{entity.duration}, #{entity.flow},
#{entity.speed}, #{entity.queueLength}, #{entity.stopTimes}, #{entity.delayTime}, #{entity.sturation}, #{entity.batchTime}, #{entity.gmtCreate}, #{entity.gmtModified}, #{entity.effusionTime} )
</foreach>
</insert>
<delete id="deleteBatch" parameterType="String">
delete from t_cross_dir_data_realtime
where cross_id in
<foreach collection="list" item="crossId" separator=",">
(#{crossId})
</foreach>
</delete>
</mapper>
\ No newline at end of file
......@@ -36,4 +36,12 @@
#{entity.vehheadDist},#{entity.vehheadTime},#{entity.quality},#{entity.batchTime},#{entity.gmtCreate},#{entity.gmtModified})
</foreach>
</insert>
<delete id="deleteBatch" parameterType="String">
delete from t_lane_data_realtime
where cross_id in
<foreach collection="list" item="crossId" separator=",">
(#{crossId})
</foreach>
</delete>
</mapper>
\ No newline at end of file
......@@ -2,7 +2,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="net.wanji.datacenter.mapper.CrossTurnDataRealTimeMapper">
<resultMap id="BaseResultMap" type="net.wanji.datacenter.pojo.po.CrossTurnDataRealtimePO">
<resultMap id="BaseResultMap" type="net.wanji.datacenter.pojo.po.CrossTurnDataRealTimePO">
<result column="id" property="id"></result>
<result column="turn_type" property="turnType"></result>
<result column="in_dir" property="inDir"></result>
......@@ -31,7 +31,7 @@
stop_times, delay_time, sturation, vehhead_dist, vehhead_time, quality, batch_time, gmt_create, gmt_modified
</sql>
<insert id="insertBatch" parameterType="net.wanji.datacenter.pojo.po.CrossTurnDataRealtimePO">
<insert id="insertBatch" parameterType="net.wanji.datacenter.pojo.po.CrossTurnDataRealTimePO">
insert into t_cross_turn_data_realtime
(<include refid="Base_Column_List"></include>)
values
......@@ -40,4 +40,12 @@
#{entity.stopTimes},#{entity.delayTime},#{entity.sturation},#{entity.vehheadDist},#{entity.vehheadTime},#{entity.quality},#{entity.batchTime},#{entity.gmtCreate},#{entity.gmtModified})
</foreach>
</insert>
<delete id="deleteBatch" parameterType="String">
delete from t_cross_turn_data_realtime
where cross_id in
<foreach collection="list" item="crossId" separator=",">
(#{crossId})
</foreach>
</delete>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment