Commit 1eddcfba authored by duanruiming's avatar duanruiming

[update] 数据中心编译优化

parent 2797d73e
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/
This diff is collapsed.
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>1.0</id>
<formats>
<format>tar.gz</format>
<format>dir</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
<fileSets>
<!-- for bin -->
<fileSet>
<directory>src/main/bin</directory>
<includes>
<include>*.*</include>
</includes>
<directoryMode>775</directoryMode>
<outputDirectory>bin/</outputDirectory>
</fileSet>
<!-- for configs -->
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>*.yml</include>
<include>*.xml</include>
</includes>
<outputDirectory>/</outputDirectory>
</fileSet>
<!-- for jar -->
<fileSet>
<directory>${project.build.directory}</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
</fileSets>
<!-- for lib -->
<dependencySets>
<dependencySet>
<outputDirectory>lib/</outputDirectory>
<scope>runtime</scope>
<excludes>
<exclude>${groupId}:${artifactId}</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>
package net.wanji.datacenter;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication(scanBasePackages = {"net.wanji.datacenter", "net.wanji.common"})
@MapperScan(basePackages = {"net.wanji.datacenter.dao.mapper"})
@EnableTransactionManagement
@EnableScheduling
@EnableAsync
public class DataCenterApplication {
public static void main(String[] args) {
SpringApplication.run(DataCenterApplication.class, args);
}
}
package net.wanji.datacenter.cache;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.framework.spring.ServiceBeanContext;
import net.wanji.datacenter.service.dataconvert.DataProcessService;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author duanruiming
* @date 2023/03/10 14:04
*/
@Component
@Slf4j
public class DataProcessServiceRegistrator implements CommandLineRunner {
private final Map<String, DataProcessService> dataProcessServiceRegistor = new HashMap<>();
/**
* 获取注册器
*
* @return
*/
public Map<String, DataProcessService> getDataProcessServiceRegistor() {
return dataProcessServiceRegistor;
}
@Override
public void run(String... args) throws Exception {
Map<String, DataProcessService> map = ServiceBeanContext.getInterfaceBeanMap(DataProcessService.class);
if (!map.isEmpty()) {
dataProcessServiceRegistor.putAll(map);
} else {
log.error("初始化数据处理注册器失败");
}
}
}
package net.wanji.datacenter.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author duanruiming
* @date 2023/03/10 20:32
*/
public interface CrossBaseLaneInfoMapper extends BaseMapper {
}
package net.wanji.datacenter.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author duanruiming
* @date 2023/03/10 20:06
*/
public interface CrossDataRealTimeMapper extends BaseMapper {
}
package net.wanji.datacenter.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author duanruiming
* @date 2023/03/10 21:09
*/
public interface CrossDirDataRealTimeMapper extends BaseMapper {
}
package net.wanji.datacenter.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author duanruiming
* @date 2023/03/10 21:11
*/
public interface CrossLaneDataRealTimeMapper extends BaseMapper {
}
package net.wanji.datacenter.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author duanruiming
* @date 2023/03/10 21:11
*/
public interface CrossTurnDataRealTimeMapper extends BaseMapper {
}
package net.wanji.datacenter.kafka;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author duanruiming
* @date 2023/03/09 16:03
*/
@Component
@Slf4j
public class ConsumerHandler implements KafkaListenerErrorHandler {
/**
* kafka数据缓存
* key:topic value:data
*/
private static final Map<String, String> lanePeriodicDataMap = new ConcurrentHashMap<>();
/**
* 通过消息主题获取消息信息
*
* @param topic
* @return
*/
public String getTopicMessage(String topic) {
if (StringUtils.isNotBlank(topic) && !lanePeriodicDataMap.isEmpty()) {
return lanePeriodicDataMap.get(topic);
}
return null;
}
/**
* 通过主题移除缓存中消息
*
* @param topic
*/
public void removeTopicMessage(String topic) {
lanePeriodicDataMap.remove(topic);
}
@KafkaListener(topics = {"JN040001LanePeriodicData"})
public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
log.info("收到kafka消息,消息主题:" + record.topic());
log.info("收到kafka消息,消息内容:" + record.value());
lanePeriodicDataMap.put(record.topic(), String.valueOf(record.value()));
acknowledgment.acknowledge();
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
return new Object();
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
String errorMessage = String.format("监听主题:%s,消费者详情:%s,异常信息:%s,消息详情:%s",
consumer.listTopics(), consumer.groupMetadata(), exception, message);
log.error(errorMessage);
return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
}
}
package net.wanji.datacenter.pojo.dto;
import lombok.Data;
import java.util.List;
/**
* @author duanruiming
* @date 2023/03/09 17:02
*/
@Data
public class LanePeriodicDataDTO {
/**
* 车道事件集合
*/
private List<EventList> eventList;
/**
* 车道数量
*/
private Integer laneNum;
/**
* 全域编号
*/
private String orgCode;
/**
* 数据生成时间: yyyy-MM-dd HH:mm:ss:SSS
*/
private String timeStamp;
@Data
public class EventList {
/**
* 最大动态排队长度
*/
private double dynamicQueueLengthMax;
/**
* 最小动态排队长度
*/
private Integer dynamicQueueLengthMin;
/**
* 绿灯有效利用率
*/
private Integer greenLightEfficiency;
/**
* 车道号(路网数据)
*/
private String laneId;
/**
* 车道通行能力
*/
private Integer laneCapacity;
/**
* 车道流率
*/
private Integer laneFlowRate;
/**
* 不停车率
*/
private Integer laneNoStopRate;
/**
* 一次停车率
*/
private Integer laneOneStopRate;
/**
* 车道饱和度
*/
private double laneSaturation;
/**
* 车道饱和流率
*/
private Integer laneSaturationFlowRate;
/**
* 三次停车率
*/
private Integer laneThreeStopRate;
/**
* 二次停车率
*/
private Integer laneTwoStopRate;
/**
* 绿灯结束排队长度
*/
private double lightGreenFinishQueueLength;
/**
* 绿灯起亮排队长度
*/
private Integer lightGreenStartQueueLength;
/**
* 平均延误
*/
private Integer meanDelay;
/**
* 平均停车次数
*/
private Integer meanStopsNumber;
/**
* 平均点速度
*/
private double meanV;
/**
* 最大静态排队长度
*/
private Integer staticQueueLengthMax;
/**
* 最小静态排队长度
*/
private Integer staticQueueLengthMin;
/**
* 时间占有率
*/
private double timeOccupancy;
/**
* 自然流量
*/
private Integer trafficFlow;
/**
* 小车当量
*/
private Integer trolleyEquivalent;
/**
* 平均空间密度(长度占比)
*/
private double vehicleLengthRatioMean;
/**
* 平均空间密度(车辆负荷比)
*/
private double vehicleNumsRatioMean;
}
}
package net.wanji.datacenter.pojo.po;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* @author duanruiming
* @date 2023/03/10 17:27
*/
@Data
@ApiModel(value = "AreaDataRealPO", description = "区域实时数据")
public class AreaDataRealTimePO {
@ApiModelProperty(name = "区域id",notes = "")
private String areaId;
@ApiModelProperty(name = "交通状态:1畅通 2缓行 3拥挤 4严重拥堵",notes = "")
private Integer status;
@ApiModelProperty(name = "拥堵类型 1常规 2异常",notes = "")
private Integer type;
@ApiModelProperty(name = "交通指数",notes = "")
private Double trafficIndex;
@ApiModelProperty(name = "开始时间",notes = "")
private Date startTime;
@ApiModelProperty(name = "持续时间",notes = "")
private Integer duration;
@ApiModelProperty(name = "平均速度",notes = "")
private Double speed;
@ApiModelProperty(name = "停车次数",notes = "")
private Double stopTimes;
@ApiModelProperty(name = "延误时间",notes = "")
private Integer delayTime;
@ApiModelProperty(name = "饱和度",notes = "")
private Double sturation;
@ApiModelProperty(name = "数据批次 十位日期int",notes = "")
private Integer batchTime;
@ApiModelProperty(name = "创建时间",notes = "")
private Date gmtCreate ;
@ApiModelProperty(name = "修改时间",notes = "")
private Date gmtModified ;
}
package net.wanji.datacenter.pojo.po;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* @author duanruiming
* @date 2023/03/10 20:17
*/
@Data
@TableName("t_base_lane_info")
@ApiModel(value = "CrossBaseLaneInfoPO", description = "路口基础车道数据")
public class CrossBaseLaneInfoPO {
@ApiModelProperty(name = "修改时间",notes = "")
private String id;
@ApiModelProperty(name = "修改时间",notes = "")
private String code;
@ApiModelProperty(name = "车道序号,从左车道开始编号11、12、13...",notes = "")
private Integer sort;
@ApiModelProperty(name = "车道类型:1路段车道;2进口车道;3出口车道;4左转弯待转区;6直行待行区",notes = "")
private Integer type;
@ApiModelProperty(name = "车道方向:1北;2东北;3东;4东南;5南;6西南;7西;8西北",notes = "")
private Integer dir;
@ApiModelProperty(name = "车道转向:1左转;2直行;3右转;4掉头;5直左;6直右;7左直右;8左右;9左转掉头;10直行掉头;11右转掉头;12左直掉头;13直右掉头;14左直右掉头;15左右掉头",notes = "")
private Integer turn;
@ApiModelProperty(name = "车道类别:1机动车;2非机动车;3公交专用;4可变;5潮汐",notes = "")
private Integer category;
@ApiModelProperty(name = "路口编号",notes = "")
private String crossId;
@ApiModelProperty(name = "路段编号",notes = "")
private String rid;
@ApiModelProperty(name = "渠化编号",notes = "")
private String segmentId;
@ApiModelProperty(name = "车道长度",notes = "")
private Double length;
@ApiModelProperty(name = "车道宽度",notes = "")
private Double width;
@ApiModelProperty(name = "空间对象",notes = "")
private String wkt;
@ApiModelProperty(name = "创建时间",notes = "")
private Date gmtCreate ;
@ApiModelProperty(name = "修改时间",notes = "")
private Date gmtModified ;
}
package net.wanji.datacenter.pojo.po;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* @author duanruiming
* @date 2023/03/10 19:02
*/
@Data
@TableName("t_cross_data_realtime")
@ApiModel(value = "CrossDataRealTimePO", description = "路口实时数据")
public class CrossDataRealTimePO {
@ApiModelProperty(name = "路口编号",notes = "")
private String crossId;
@ApiModelProperty(name = "路口状态:0正常 1失衡 2拥堵 3溢出 4死锁",notes = "")
private Integer status;
@ApiModelProperty(name = "拥堵类型 1常规 2异常",notes = "")
private Integer type;
@ApiModelProperty(name = "交通指数(1-10)",notes = "")
private Double trafficIndex;
@ApiModelProperty(name = "开始时间 yyyy-MM-dd HH:mm:sss",notes = "")
private Date startTime;
@ApiModelProperty(name = "持续时间(分钟)",notes = "")
private Integer duration;
@ApiModelProperty(name = "是否失衡 0否 1是",notes = "")
private Integer unbalanceStatus;
@ApiModelProperty(name = "是否溢出 0否 1是",notes = "")
private Integer spilloverStatus;
@ApiModelProperty(name = "是否拥堵 0否 1是",notes = "")
private Integer congestionStatus;
@ApiModelProperty(name = "失衡指数",notes = "")
private Double unbalanceIndex;
@ApiModelProperty(name = "溢出指数",notes = "")
private Double spilloverIndex;
@ApiModelProperty(name = "拥堵指数",notes = "")
private Double congestionIndex;
@ApiModelProperty(name = "路口失衡方向:1,2,3...",notes = "")
private String unbalanceDirs;
@ApiModelProperty(name = "溢出方向",notes = "")
private String spilloverDirs;
@ApiModelProperty(name = "拥堵方向",notes = "")
private String congestionDirs;
@ApiModelProperty(name = "交通流量(辆)",notes = "")
private Integer flow;
@ApiModelProperty(name = "交通流率/h",notes = "")
private Double flowRate;
@ApiModelProperty(name = "平均速度",notes = "")
private Double speed;
@ApiModelProperty(name = "最大排队长度",notes = "")
private Double queueLength;
@ApiModelProperty(name = "停车次数",notes = "")
private Integer stopTimes;
@ApiModelProperty(name = "延误时间",notes = "")
private Integer delayTime;
@ApiModelProperty(name = "饱和度:欠饱和:<0.6;近饱和:>=0.6 & < 0.8;过饱和:>0.8",notes = "")
private Double sturation;
@ApiModelProperty(name = "数据批次 十位日期int",notes = "")
private Integer batchTime;
@ApiModelProperty(name = "创建时间",notes = "")
private Date gmtCreate ;
@ApiModelProperty(name = "修改时间",notes = "")
private Date gmtModified ;
}
package net.wanji.datacenter.pojo.po;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* @author hfx
* @date 2023/1/10 16:36
* @desc CrossDataRealtimePO
*/
@Data
@ApiModel(value = "CrossDirDataHistPO", description = "路口转向实时历史数据")
public class CrossDirDataHistPO {
/** 路口方向ID(路口ID_方向_进出口_主辅路序号) */
@ApiModelProperty(name = "路口方向ID(路口ID_方向_进出口_主辅路序号)",notes = "")
private String id ;
/** 路口方向类型:1北;2东北;3东;4东南;5南;6西南;7西;8西北 */
@ApiModelProperty(name = "路口方向类型:1北;2东北;3东;4东南;5南;6西南;7西;8西北",notes = "")
private Integer dirType ;
/** 进出口类型:1进口;2出口 */
@ApiModelProperty(name = "进出口类型:1进口;2出口",notes = "")
private Integer inOutType ;
/** 路口ID */
@ApiModelProperty(name = "路口ID",notes = "")
private String crossId ;
/** 路段长度 */
@ApiModelProperty(name = "路段长度",notes = "")
private Double length ;
/** 交通状态:1畅通;2缓行;3拥堵;4严重拥堵;5未知 */
@ApiModelProperty(name = "交通状态:1畅通;2缓行;3拥堵;4严重拥堵;5未知",notes = "")
private Integer status ;
/** 交通指数 */
@ApiModelProperty(name = "交通指数",notes = "")
private Double trafficIndex ;
@ApiModelProperty(value = "开始时间:yyyy-MM-dd HH:mm:ss",notes = "")
private Date startTime ;
@ApiModelProperty(value = "持续时间(单位:分钟)",notes = "")
private Integer duration ;
/** 交通流量(辆) */
@ApiModelProperty(name = "交通流量(辆)",notes = "")
private Integer flow ;
/** 平均速度(km/h) */
@ApiModelProperty(name = "平均速度(km/h)",notes = "")
private Double speed ;
/** 排队长度(米) */
@ApiModelProperty(name = "排队长度(米)",notes = "")
private Double queueLength ;
/** 停车次数(次) */
@ApiModelProperty(name = "停车次数(次)",notes = "")
private Double stopTimes ;
/** 延误时间(秒) */
@ApiModelProperty(name = "延误时间(秒)",notes = "")
private Integer delayTime ;
/** 饱和度 */
@ApiModelProperty(name = "饱和度",notes = "")
private Double sturation ;
/** 采集时间(10位时间戳) */
@ApiModelProperty(name = "采集时间(10位时间戳)",notes = "")
private Integer batchTime ;
/** 创建时间 */
@ApiModelProperty(name = "创建时间",notes = "")
private Date gmtCreate ;
/** 修改时间 */
@ApiModelProperty(name = "修改时间",notes = "")
private Date gmtModified ;
}
package net.wanji.datacenter.pojo.po;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* @author duanruiming
* @date 2023/03/10 19:22
*/
@Data
@TableName("t_cross_dir_data_realtime")
@ApiModel(value = "CrossDirDataRealTimePO", description = "路口转向实时数据")
public class CrossDirDataRealTimePO {
@ApiModelProperty(name = "路口方向ID(路口ID_方向_进出口_主辅路序号)",notes = "")
private String id;
@ApiModelProperty(name = "路口方向类型:1北;2东北;3东;4东南;5南;6西南;7西;8西北",notes = "")
private Integer dirType;
@ApiModelProperty(name = "进出口类型:1进口;2出口",notes = "")
private Integer inOutType;
@ApiModelProperty(name = "进出口类型:1进口;2出口",notes = "")
private String crossId;
@ApiModelProperty(name = "路段长度",notes = "")
private Double length;
@ApiModelProperty(name = "交通状态:1畅通;2缓行;3拥堵;4严重拥堵;5未知",notes = "")
private Integer status;
@ApiModelProperty(name = "交通指数",notes = "")
private Double trafficIndex;
@ApiModelProperty(name = "开始时间",notes = "")
private Date startTime;
@ApiModelProperty(name = "持续时间",notes = "")
private Integer duration;
@ApiModelProperty(name = "交通流量(辆)",notes = "")
private Integer flow;
@ApiModelProperty(name = "平均速度(km/h)",notes = "")
private Double speed;
@ApiModelProperty(name = "排队长(米)",notes = "")
private Double queueLength;
@ApiModelProperty(name = "停车次数",notes = "")
private Integer stopTimes;
@ApiModelProperty(name = "延误时间",notes = "")
private Integer delayTime;
@ApiModelProperty(name = "饱和度",notes = "")
private Double sturation;
@ApiModelProperty(name = "采集时间(10位时间戳)",notes = "")
private Integer batchTime;
@ApiModelProperty(name = "创建时间",notes = "")
private Date gmtCreate ;
@ApiModelProperty(name = "修改时间",notes = "")
private Date gmtModified ;
@ApiModelProperty(name = "溢流时长,计算溢流率提供;溢流率:交叉口间路段车辆排队长度超出可容纳空间的时长与观测总时长的百分比",notes = "")
private Double effusionTime;
}
package net.wanji.datacenter.pojo.po;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* @author duanruiming
* @date 2023/03/10 19:36
*/
@Data
@TableName("t_cross_turn_data_realtime")
@ApiModel(value = "CrossLaneDataRealTimePO", description = "路口车道实时数据")
public class CrossLaneDataRealTimePO {
@ApiModelProperty(name = "车道编号",notes = "")
private String id;
@ApiModelProperty(name = "路口编号",notes = "")
private String crossId;
@ApiModelProperty(name = "交通流量(辆)",notes = "")
private String flow;
@ApiModelProperty(name = "平均速度(km/h)",notes = "")
private String speed;
@ApiModelProperty(name = "驶入速度(km/h)",notes = "")
private String in_speed;
@ApiModelProperty(name = "驶出速度(km/h)",notes = "")
private String out_speed;
@ApiModelProperty(name = "排队长度(米)",notes = "")
private String queue_length;
@ApiModelProperty(name = "停车次数",notes = "")
private String stop_times;
@ApiModelProperty(name = "延误时间",notes = "")
private String delay_time;
@ApiModelProperty(name = "通行能力(辆/某一时段内最大过车数),通行能力=自然流量",notes = "")
private String capacity;
@ApiModelProperty(name = "饱和度",notes = "")
private String sturation;
@ApiModelProperty(name = "车头间距(米)",notes = "")
private String vehhead_dist;
@ApiModelProperty(name = "车头时距(秒)",notes = "")
private String vehhead_time;
@ApiModelProperty(name = "可信度(0-1)",notes = "")
private String quality;
@ApiModelProperty(name = "采集时间(10位时间戳)",notes = "")
private String batch_time;
@ApiModelProperty(name = "创建时间",notes = "")
private Date gmtCreate ;
@ApiModelProperty(name = "修改时间",notes = "")
private Date gmtModified ;
}
package net.wanji.datacenter.pojo.po;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.Date;
/**
* @author hfx
* @date 2023/1/10 16:36
* @desc CrossDataRealtimePO
*/
@Data
@TableName("t_lane_data_realtime")
@ApiModel(value = "CrossTurnDataRealtimePO", description = "路口转向实时数据")
public class CrossTurnDataRealtimePO {
/** 转向ID(路口ID_驶入方向_转向类型) */
@ApiModelProperty(name = "转向ID(路口ID_驶入方向_转向类型)",notes = "")
private String id ;
/** 转向类型:u掉头;l左转;s直行;r右转; */
@ApiModelProperty(name = "转向类型:u掉头;l左转;s直行;r右转;",notes = "")
private String turnType ;
/** 驶入方向:1北;2东北;3东;4东南;5南;6西南;7西;8西北 */
@ApiModelProperty(name = "驶入方向:1北;2东北;3东;4东南;5南;6西南;7西;8西北",notes = "")
private Integer inDir ;
/** 驶出方向:1北;2东北;3东;4东南;5南;6西南;7西;8西北 */
@ApiModelProperty(name = "驶出方向:1北;2东北;3东;4东南;5南;6西南;7西;8西北",notes = "")
private Integer outDir ;
/** 路口ID */
@ApiModelProperty(name = "路口ID",notes = "")
private String crossId ;
/** 交通流量(辆) */
@ApiModelProperty(name = "交通流量(辆)",notes = "")
private Integer flow ;
@ApiModelProperty(name = "交通状态:1畅通;2缓行;3拥堵;4严重拥堵;5未知",notes = "")
private Integer status;
@ApiModelProperty(name = "交通指数",notes = "")
private Double trafficIndex;
/** 平均速度(km/h) */
@ApiModelProperty(name = "平均速度(km/h)",notes = "")
private Double speed ;
/** 驶入速度(km/h) */
@ApiModelProperty(name = "驶入速度(km/h)",notes = "")
private Double inSpeed ;
/** 驶出速度(km/h) */
@ApiModelProperty(name = "驶出速度(km/h)",notes = "")
private Double outSpeed ;
/** 排队长度(米) */
@ApiModelProperty(name = "排队长度(米)",notes = "")
private Double queueLength ;
/** 停车次数(次) */
@ApiModelProperty(name = "停车次数(次)",notes = "")
private Double stopTimes ;
/** 延误时间(秒) */
@ApiModelProperty(name = "延误时间(秒)",notes = "")
private Integer delayTime ;
/** 饱和度 */
@ApiModelProperty(name = "饱和度",notes = "")
private Double sturation ;
/** 车头间距(米) */
@ApiModelProperty(name = "车头间距(米)",notes = "")
private Double vehheadDist ;
/** 车头时距(秒) */
@ApiModelProperty(name = "车头时距(秒)",notes = "")
private Double vehheadTime ;
/** 可信度:0~1 */
@ApiModelProperty(name = "可信度:0~1",notes = "")
private Double quality ;
/** 采集时间(10位时间戳) */
@ApiModelProperty(name = "采集时间(10位时间戳)",notes = "")
private Integer batchTime ;
/** 创建时间 */
@ApiModelProperty(name = "创建时间",notes = "")
private Date gmtCreate ;
/** 修改时间 */
@ApiModelProperty(name = "修改时间",notes = "")
private Date gmtModified ;
}
package net.wanji.datacenter.service.dataconvert;
/**
* @author duanruiming
* @date 2023/03/10 9:48
*/
public interface DataProcessService {
/**
* 数据中心收到数据
*
* @param topic
* @return
*/
String receive(String topic);
/**
* 数据中心发送数据, 通过远程调用调用不同的服务
*
* @param convertData
*/
void send(String topic, Object convertData);
/**
* 数据中心转换数据
*
* @param originalData
* @return
*/
Object convert(String originalData) throws Exception;
/**
* 转换后保存数据
*/
void save(Object convertData) throws Exception;
}
package net.wanji.datacenter.service.dataconvert.impl;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.wanji.datacenter.dao.mapper.*;
import net.wanji.datacenter.kafka.ConsumerHandler;
import net.wanji.datacenter.pojo.dto.LanePeriodicDataDTO;
import net.wanji.datacenter.pojo.po.*;
import net.wanji.datacenter.service.dataconvert.DataProcessService;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2023/03/10 10:30
* @description: 车道周期实时数据加工处理服务,优化模块实时监控操作数据
*/
@Slf4j
@Service(value = "JN040001LanePeriodicData")
@RequiredArgsConstructor
public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
private final ConsumerHandler consumerHandler;
private final CrossBaseLaneInfoMapper crossBaseLaneInfoMapper;
private final CrossDirDataRealTimeMapper crossDirDataRealTimeMapper;
private final CrossTurnDataRealTimeMapper crossTurnDataRealTimeMapper;
private final CrossLaneDataRealTimeMapper crossLaneDataRealTimeMapper;
private final CrossDataRealTimeMapper crossDataRealTimeMapper;
@Override
public String receive(String topic) {
return consumerHandler.getTopicMessage(topic);
}
@Override
public void send(String topic, Object convertData) {
consumerHandler.removeTopicMessage(topic);
}
@Override
public Object convert(String originalData) throws Exception {
LanePeriodicDataDTO lanePeriodicDataPO = null;
try {
lanePeriodicDataPO = JSONObject.parseObject(originalData, LanePeriodicDataDTO.class);
} catch (Exception e) {
log.error("车道周期实时数据转换异常", e);
throw new Exception();
}
return lanePeriodicDataPO;
}
@Override
public void save(Object convertData) throws Exception {
LanePeriodicDataDTO lanePeriodicDataDTO = (LanePeriodicDataDTO) convertData;
int laneNum = lanePeriodicDataDTO.getLaneNum();
List<CrossDataRealTimePO> crossDataRealTimePOS = new ArrayList<>(laneNum);
List<CrossDirDataRealTimePO> crossDirDataRealTimePOS = new ArrayList<>();
List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = new ArrayList<>();
List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = new ArrayList<>();
List<LanePeriodicDataDTO.EventList> eventLists = lanePeriodicDataDTO.getEventList();
if (CollectionUtils.isEmpty(eventLists)) {
log.error("当前车道周期实时数据车道列表为空,参数信息:", lanePeriodicDataDTO);
throw new Exception("当前车道周期实时数据车道列表为空");
}
// 通过车道编号获取车道信息
List<String> LaneIdList = eventLists.stream().map(LanePeriodicDataDTO.EventList::getLaneId).collect(Collectors.toList());
List<CrossBaseLaneInfoPO> crossBaseLaneInfoPOS = crossBaseLaneInfoMapper.selectBatchIds(LaneIdList);
for (LanePeriodicDataDTO.EventList laneRealTimeItem : eventLists) {
String laneId = laneRealTimeItem.getLaneId();
Date now = new Date();
for (CrossBaseLaneInfoPO laneInfoPO : crossBaseLaneInfoPOS) {
String id = laneInfoPO.getId();
String crossId = laneInfoPO.getCrossId();
// todo 路口状态
Double trafficFlow = Double.valueOf(laneRealTimeItem.getTrafficFlow());
String dir = String.valueOf(laneInfoPO.getDir());
String turn = String.valueOf(laneInfoPO.getTurn());
if (StringUtils.endsWithIgnoreCase(laneId, id)) {
CrossDataRealTimePO crossDataRealTimePO = new CrossDataRealTimePO();
crossDataRealTimePO.setCrossId(crossId);
crossDataRealTimePO.setStatus(0000);
crossDataRealTimePO.setType(1); // 1常规 2异常
crossDataRealTimePO.setStartTime(now);
crossDataRealTimePO.setDuration(5); // todo 通过第二批次判断
crossDataRealTimePO.setUnbalanceStatus(0);
crossDataRealTimePO.setUnbalanceIndex(1.0);
crossDataRealTimePO.setUnbalanceDirs(dir);
crossDataRealTimePO.setCongestionStatus(0);
crossDataRealTimePO.setCongestionIndex(1.0);
crossDataRealTimePO.setCongestionDirs(dir);
crossDataRealTimePO.setSpilloverStatus(0);
crossDataRealTimePO.setSpilloverIndex(1.0);
crossDataRealTimePO.setSpilloverDirs(dir);
crossDataRealTimePO.setTrafficIndex(trafficFlow);
crossDataRealTimePOS.add(crossDataRealTimePO);
CrossDirDataRealTimePO crossDirDataRealTimePO = new CrossDirDataRealTimePO();
crossDirDataRealTimePO.setCrossId(crossId);
crossDirDataRealTimePO.setStatus(0000);
crossDirDataRealTimePO.setTrafficIndex(trafficFlow);
crossDirDataRealTimePOS.add(crossDirDataRealTimePO);
CrossTurnDataRealtimePO crossTurnDataRealtimePO = new CrossTurnDataRealtimePO();
crossTurnDataRealtimePO.setCrossId(crossId);
crossTurnDataRealtimePO.setStatus(0000);
crossTurnDataRealtimePOS.add(crossTurnDataRealtimePO);
CrossLaneDataRealTimePO crossLaneDataRealTimePO = new CrossLaneDataRealTimePO();
crossLaneDataRealTimePO.setCrossId(crossId);
crossLaneDataRealTimePO.setId(id);
crossLaneDataRealTimePOS.add(crossLaneDataRealTimePO);
}
}
}
crossDataRealTimeMapper.insert(crossDataRealTimePOS);
crossDirDataRealTimeMapper.insert(crossDirDataRealTimePOS);
crossTurnDataRealTimeMapper.insert(crossTurnDataRealtimePOS);
crossLaneDataRealTimeMapper.insert(crossLaneDataRealTimePOS);
}
}
package net.wanji.datacenter.service.dataconvert.impl;
import net.wanji.datacenter.service.dataconvert.DataProcessService;
import org.springframework.stereotype.Service;
/**
* @author duanruiming
* @date 2023/03/10 14:17
*/
@Service(value = "test")
public class Test implements DataProcessService {
@Override
public String receive(String originalData) {
System.err.println("test receice successful");
return null;
}
@Override
public void send(String topic, Object convertData) {
System.err.println("test send successful");
}
@Override
public String convert(String originalData) {
System.err.println("test convert successful");
return null;
}
@Override
public void save(Object convertData) throws Exception{
System.err.println("test save successful");
}
}
server:
port: 39002
servlet:
display-name: Wanji
context-path: /dataCenter
multipart:
max-file-size: 50MB
max-request-size: 50MB
tomcat:
threads:
max: 10000
#线程池配置
threadPool:
corePoolSize: 5
maxPoolSize: 200
queueCapacity: 5
keepAliveTime: 60000
spring:
datasource:
dynamic:
primary: master
datasource:
master:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://106.120.201.126:14726/t_signal_control?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root
password: Wanji300552
driverClassName: com.mysql.cj.jdbc.Driver
slave:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://106.120.201.126:14726/t_signal_control?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root
password: Wanji300552
driverClassName: com.mysql.cj.jdbc.Driver
webService:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://106.120.201.126:14726/t_signal?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root
password: Wanji300552
type: com.alibaba.druid.pool.DruidDataSource
redis:
host: 106.120.201.126
port: 14728
password: Wanji300552
jedis:
pool:
max-active: 200
max-wait: 5000
max-idle: 20
min-idle: 10
timeout: 5000
database: 3
kafka:
bootstrap-servers: 10.102.1.182:9092
#消费者配置
consumer:
max-poll-records: 1
broker-id: 0
auto-commit-interval: 1S
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: group
listener:
concurrency: 3
ack-mode: manual_immediate
# 信号平台
system:
address:
# UTC服务地址
utc: http://10.102.1.182:32000/utc
# control服务地址
control: http://10.102.1.182:32001/web
\ No newline at end of file
spring:
# dubbo启动需要程序名称
application:
name: dataCenter
profiles:
active: dev
mvc:
pathmatch:
matching-strategy: ant_path_matcher
main:
allow-circular-references: true
server:
undertow:
url-charset: UTF-8
max-headers: -1
# 设置 HTTP POST 内容的最大长度,默认不做限制
# max-http-post-size: -1
max-http-post-size: 4MB
threads:
# 设置IO线程数, 它主要执行非阻塞的任务,它们会负责多个连接, 默认设置每个CPU核心一个线程
# 不要设置过大,如果过大,启动项目会报错:打开文件数过多
io: 8
# 阻塞任务线程池, 当执行类似servlet请求阻塞IO操作, undertow会从这个线程池中取得线程
# 它的值设置取决于系统线程执行任务的阻塞系数,默认值是IO线程数*8
worker: 256
# 以下的配置会影响buffer,这些buffer会用于服务器连接的IO操作,有点类似netty的池化内存管理
# 每块buffer的空间大小,越小的空间被利用越充分,不要设置太大,以免影响其他应用,合适即可
buffer-size: 1024
# 是否分配的直接内存(NIO直接分配的堆外内存)
direct-buffers: true
accesslog:
# 是否打开 undertow 日志,默认为 false
enabled: true
# 设置访问日志所在目录
dir: logs
mybatis-plus:
mapper-locations: classpath:mapper/*.xml,classpath:mapper/*/*.xml
typeAliasesPackage: net.wanji.datacenter.entity
check-config-location: true
configuration:
#是否开启自动驼峰命名规则(camel case)映射
map-underscore-to-camel-case: true
#全局地开启或关闭配置文件中的所有映射器已经配置的任何缓存
cache-enabled: false
call-setters-on-nulls: true
#配置JdbcTypeForNull, oracle数据库必须配置
jdbc-type-for-null: 'null'
#MyBatis 自动映射时未知列或未知属性处理策略 NONE:不做任何处理 (默认值), WARNING:以日志的形式打印相关警告信息, FAILING:当作映射失败处理,并抛出异常和详细信息
auto-mapping-unknown-column-behavior: warning
#开启SQL打印
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# PageHelper分页插件
pagehelper:
helperDialect: mysql
supportMethodsArguments: true
params: count=countSql
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="net.wanji.opt.dao.mapper.trend.CrossDataRealtimeMapper">
<select id="selectAbnormalCross" resultType="net.wanji.opt.vo.AbnormalCrossListVO">
select
t2.id, t2.name, t1.unbalance_dirs, t1.congestion_dirs, t1.spillover_dirs, t1.status as realtimeStatus,
t1.start_time, t1.duration, t2.location as locationStr, t1.is_unbalance, t1.is_spillover, t1.is_congestion
from t_cross_data_realtime t1 JOIN t_base_cross_info t2
ON t1.cross_id = t2.id
<where>
<if test="status != null">
and (t1.status = #{status}
<if test="status == 1"> or t1.is_unbalance = 1 </if>
<if test="status == 2"> or t1.is_congestion = 1 </if>
<if test="status == 3"> or t1.is_spillover = 1 </if>)
</if>
<if test="name != null and name != ''">
and t2.name like concat('%',#{name},'%')
</if>
<if test="type != null">
and t1.type = #{type}
</if>
<if test="status == null">
and t1.status in (0, 1, 2, 3)
</if>
</where>
</select>
<select id="selectByCrossId" resultType="net.wanji.opt.po.trend.CrossDataRealtimePO">
select
cross_id,status,type,traffic_index,start_time,duration,is_unbalance,is_spillover,is_congestion,unbalance_index,spillover_index,congestion_index,unbalance_dirs,spillover_dirs,congestion_dirs,flow,flow_rate,speed,queue_length,stop_times,delay_time,sturation,batch_time,gmt_create,gmt_modified
from t_cross_data_realtime
where cross_id = #{crossId}
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="net.wanji.opt.dao.mapper.trend.CrossDirDataHistMapper">
<select id="selectByCrossIdAndTimestamp" resultType="net.wanji.opt.po.trend.CrossDirDataHistPO">
select
id,dir_type,in_out_type,cross_id,length,status,traffic_index,flow,speed,queue_length,stop_times,delay_time,sturation,batch_time,gmt_create,gmt_modified
from t_cross_dir_data_hist
where cross_id = #{crossId} and in_out_type = 1
and batch_time <![CDATA[ >= ]]> #{preSeconds}
order by batch_time
</select>
<select id="selectByCrossIdDirAndTimestamp" resultType="net.wanji.opt.po.trend.CrossDirDataHistPO">
select
id,dir_type,in_out_type,cross_id,length,status,traffic_index,flow,speed,queue_length,stop_times,delay_time,sturation,batch_time,gmt_create,gmt_modified
from t_cross_dir_data_hist
where cross_id = #{crossId} and in_out_type = 1 and dir_type = #{dir}
and batch_time <![CDATA[ >= ]]> #{preSeconds}
order by batch_time
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="net.wanji.opt.dao.mapper.trend.CrossTurnDataRealtimeMapper">
<select id="selectByCrossId" resultType="net.wanji.opt.po.trend.CrossTurnDataRealtimePO">
select
id,turn_type,in_dir,out_dir,cross_id,flow,speed,in_speed,out_speed,queue_length,stop_times,delay_time,sturation,vehhead_dist,vehhead_time,quality,batch_time,gmt_create,gmt_modified
from t_cross_turn_data_realtime
where cross_id = #{crossId}
</select>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment