Commit 370538b2 authored by duanruiming's avatar duanruiming

[update] 测试平台-停车事件接收入库优化

parent d4287251
package com.wanji.indicators.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
......@@ -9,6 +10,7 @@ import java.util.Date;
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("t_analysis_event")
public class EventAnalysis implements Serializable {
private static final long serialVersionUID = 1L;
......
package com.wanji.indicators.entity;
import com.alibaba.fastjson.annotation.JSONField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("t_event_info")
@ApiModel(value = "EventInfo", description = "交通事件信息")
public class EventInfo implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "主键ID", hidden = true)
@JSONField(serialize = false)
private Long oid;
@ApiModelProperty(value = "车牌号")
private String plateNo;
@ApiModelProperty(value = "参与者类别 1:机动车 2:非机动车 3:行人")
private String objectType;
@ApiModelProperty(value = "可信度")
private Integer confidence;
@ApiModelProperty(value = "检测时间", notes = "格式:yyyy-MM-dd HH:mm:ss")
@JSONField(format = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime detectTime;
@ApiModelProperty(value = "事件等级:1扣分 2 罚款 3警告 0 未知")
private Integer grade;
@ApiModelProperty(value = "事件地点描述")
private String placeDesc;
@ApiModelProperty(value = "发生地点经度")
private BigDecimal lng;
@ApiModelProperty(value = "发生地点纬度")
private BigDecimal lat;
@ApiModelProperty(value = "事件一级类别 1:非机动事件 2:行人事件 3:机动车事件")
private String category;
@ApiModelProperty(value = "二级类别")
@JsonProperty("eventType")
@JSONField(name = "eventType")
private String type;
@ApiModelProperty(value = "事件发生时间")
@JSONField(format = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime startTime;
@ApiModelProperty(value = "事件结束时间")
@JSONField(format = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime endTime;
@ApiModelProperty(value = "事件持续时长,单位分钟")
private Integer duration;
@ApiModelProperty(value = "事件来源")
private String source;
@ApiModelProperty(value = "车道ID")
private String laneId;
@ApiModelProperty(value = "路段ID")
private String rid;
@ApiModelProperty(value = "路口ID")
private String crossId;
@ApiModelProperty(value = "渠化ID")
private String segmentId;
@ApiModelProperty(value = "关联的事件摄像头信息ID")
private String cameraOid;
@ApiModelProperty(value = "序列号")
private String eventSerialNumber;
@ApiModelProperty(value = "事件状态")
private String dataStatus;
private String globalId;
private Integer stationId;
private String eventId;
@ApiModelProperty(value = "入库时间", hidden = true)
@JSONField(serialize = false)
private LocalDateTime ruksj;
}
package com.wanji.indicators.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.wanji.indicators.entity.EventAnalysis;
import com.wanji.indicators.entity.EventInfo;
public interface EventAnalysisMapper extends BaseMapper<EventAnalysis> {
/**
* @author duanruiming
* @date 2024/05/10 10:34
*/
public interface EventInfoMapper extends BaseMapper<EventInfo> {
}
package com.wanji.indicators.task.citybrainreceive.event.park;
import com.wanji.indicators.entity.EventAnalysis;
import com.wanji.indicators.entity.EventInfo;
import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
......@@ -48,11 +48,11 @@ public class LaneParkEventReceiveMain {
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"city-brain-laneParkEvent-receive-data-source");
SingleOutputStreamOperator<List<EventAnalysis>> eventAnalysisStream = stream
SingleOutputStreamOperator<List<EventInfo>> eventInfoStream = stream
.map(new LaneParkEventReceiveMap())
.name("接收城市大脑车道停车事件数据-JsonToObject");
eventAnalysisStream.addSink(new LaneParkEventSaveFunction());
eventInfoStream.addSink(new LaneParkEventSaveFunction());
// 启动 Flink 作业
env.execute("接收城市大脑车道停车事件数据存入数据库");
......
......@@ -2,19 +2,26 @@ package com.wanji.indicators.task.citybrainreceive.event.park;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.entity.EventAnalysis;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.entity.EventInfo;
import com.wanji.indicators.task.citybrain.vo.LaneParkEventVO;
import com.wanji.indicators.task.citybrainreceive.CommonReceiveModel;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.framework.Constants;
import net.wanji.common.utils.tool.DateUtil;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
......@@ -23,35 +30,59 @@ import java.util.List;
* @date 2024/04/11 9:34
*/
@Service
public class LaneParkEventReceiveMap implements MapFunction<String, List<EventAnalysis>> {
@Slf4j
public class LaneParkEventReceiveMap implements MapFunction<String, List<EventInfo>> {
@Override
public List<EventAnalysis> map(String value) throws Exception {
public List<EventInfo> map(String value) throws Exception {
try {
ObjectMapper mapper = JacksonUtils.getInstance();
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
String body = model.getBody();
List<EventAnalysis> eventAnalyses = new ArrayList<>();
List<LaneParkEventVO> laneParkEventVOS = mapper.readValue(body, new TypeReference<List<LaneParkEventVO>>() {});
String headers = model.getHeaders();
if (StringUtils.isNotBlank(headers) && headers.contains(Constant.WAN_JI_NAME)) {
return Collections.emptyList();
}
List<EventInfo> eventInfos = new ArrayList<>();
List<LaneParkEventVO> laneParkEventVOS = mapper.readValue(body, new TypeReference<List<LaneParkEventVO>>() {
});
if (!CollectionUtils.isEmpty(laneParkEventVOS)) {
for (LaneParkEventVO laneParkEventVO : laneParkEventVOS) {
String dataTime = laneParkEventVO.getData_time();
Date date = new Date(Integer.parseInt(dataTime));
Instant instant = Instant.ofEpochMilli(Long.parseLong(dataTime));
ZonedDateTime zonedDateTime = instant.atZone(ZoneId.of("+8"));
LocalDateTime localDateTime = zonedDateTime.toLocalDateTime();
Date date = new Date(Long.parseLong(dataTime));
String dateStr = DateUtil.format(date, Constants.DATE_FORMAT.E_DATE_FORMAT_MILLIS);
LocalDateTime localDateTime = date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
EventAnalysis eventAnalysis = new EventAnalysis();
eventAnalysis.setOid(Integer.parseInt(dataTime));
eventAnalysis.setEventCategory("1");
eventAnalysis.setEventType("401");
eventAnalysis.setRuksj(localDateTime);
eventAnalysis.setWindowStartTime(date);
eventAnalysis.setWindowEndTime(DateUtil.addMinute(dateStr, 5));
eventAnalysis.setGranularity("5m");
eventAnalysis.setCrossId(laneParkEventVO.getDeviceNo());
eventAnalysis.setLaneId(laneParkEventVO.getLaneId());
eventAnalyses.add(eventAnalysis);
EventInfo eventInfo = new EventInfo();
eventInfo.setDetectTime(localDateTime);
eventInfo.setCategory("1");
eventInfo.setType("401");
eventInfo.setStartTime(localDateTime);
//eventInfo.setEndTime(localDateTime);
eventInfo.setDuration(0);
eventInfo.setRuksj(localDateTime);
eventInfo.setLaneId(laneParkEventVO.getLaneId());
eventInfo.setCrossId(laneParkEventVO.getDeviceNo());
eventInfo.setEventSerialNumber(laneParkEventVO.getDeviceNo() + dataTime);
eventInfo.setPlateNo(laneParkEventVO.getPlateNumber());
String geography = laneParkEventVO.getGeography();
String[] split = null;
if (StringUtils.isNotBlank(geography)) {
split = geography.replaceAll("\\[", "").replaceAll("\\]", "").split(",");
eventInfo.setLng(new BigDecimal(split[0]));
eventInfo.setLat(new BigDecimal(split[1]));
}
eventInfo.setPlaceDesc(laneParkEventVO.getType() + "-" + laneParkEventVO.getState());
eventInfos.add(eventInfo);
}
}
return eventAnalyses;
return eventInfos;
} catch (Exception e) {
log.error("停车事件数据转化异常:", e);
throw new RuntimeException(e);
}
}
}
\ No newline at end of file
package com.wanji.indicators.task.citybrainreceive.event.park;
import com.wanji.indicators.entity.EventAnalysis;
import com.wanji.indicators.mapper.EventAnalysisMapper;
import com.wanji.indicators.entity.EventInfo;
import com.wanji.indicators.mapper.EventInfoMapper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.context.ApplicationContext;
......@@ -16,22 +16,22 @@ import java.util.List;
* @date 2024/04/16 16:15
*/
@Service
public class LaneParkEventSaveFunction extends RichSinkFunction<List<EventAnalysis>> {
public class LaneParkEventSaveFunction extends RichSinkFunction<List<EventInfo>> {
EventAnalysisMapper eventAnalysisMapper;
EventInfoMapper eventInfoMapper;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ApplicationContext beanConf = new ClassPathXmlApplicationContext("spring-container.xml");
eventAnalysisMapper = beanConf.getBean(EventAnalysisMapper.class);
eventInfoMapper = beanConf.getBean(EventInfoMapper.class);
}
@Override
public void invoke(List<EventAnalysis> value, Context context) throws Exception {
public void invoke(List<EventInfo> value, Context context) throws Exception {
if (!CollectionUtils.isEmpty(value)) {
for (EventAnalysis eventAnalysis : value) {
eventAnalysisMapper.insert(eventAnalysis);
for (EventInfo eventInfo : value) {
eventInfoMapper.insert(eventInfo);
}
}
}
......
......@@ -38,19 +38,19 @@ public class LaneStatisticsReceiveMain {
String statisticsTopic = properties.getProperty("lane.statistics.receive.topic");
String spilloverTopic = properties.getProperty("lane.spillover.event.receive.topic");
KafkaSource<String> staticticsSource = getStringKafkaSource(statisticsTopic);
KafkaSource<String> spilloverRource = getStringKafkaSource(spilloverTopic);
KafkaSource<String> statisticsSource = getStringKafkaSource(statisticsTopic);
KafkaSource<String> spilloverSource = getStringKafkaSource(spilloverTopic);
DataStream<String> staticticsStream = env.fromSource(staticticsSource, WatermarkStrategy.noWatermarks(),
DataStream<String> statisticsStream = env.fromSource(statisticsSource, WatermarkStrategy.noWatermarks(),
"city-brain-laneStatistics-receive-data-source");
DataStream<String> spilloverStream = env.fromSource(spilloverRource, WatermarkStrategy.noWatermarks(),
DataStream<String> spilloverStream = env.fromSource(spilloverSource, WatermarkStrategy.noWatermarks(),
"city-brain-spillover-receive-data-source");
dealLaneRealTimeData(staticticsStream);
dealLaneRealTimeData(statisticsStream);
// 车道数据中不能处理转向,暂不处理
//dealTurnRealTimeData(staticticsStream);
dealDirRealTimeData(staticticsStream);
dealCrossRealTimeData(env, staticticsStream, spilloverStream);
//dealTurnRealTimeData(statisticsStream);
dealDirRealTimeData(statisticsStream);
dealCrossRealTimeData(env, statisticsStream, spilloverStream);
env.execute("接收城市大脑统计数据发送到kafka准备入库");
} catch (Exception e) {
......@@ -63,7 +63,7 @@ public class LaneStatisticsReceiveMain {
// 获取配置文件中的kafka消费topic
String kafkaServerAddress = properties.getProperty("bootstrap.servers.citybrain.receive");
KafkaSource<String> source = KafkaSource.<String>builder()
return KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
......@@ -74,7 +74,6 @@ public class LaneStatisticsReceiveMain {
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
return source;
}
private static void dealCrossRealTimeData(StreamExecutionEnvironment env, DataStream<String> staticticsStream, DataStream<String> spilloverStream) {
......@@ -112,6 +111,7 @@ public class LaneStatisticsReceiveMain {
}
private static void dealLaneRealTimeData(DataStream<String> stream) {
// todo 添加溢出事件,将溢出状态写入,默认最小溢出指数
SingleOutputStreamOperator<String> laneRealtimeStream = stream
.map(new LaneStatisticsReceive2LaneDataMap())
.name("接收城市大脑统计数据-JsonToObject");
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.wanji.indicators.mapper.EventAnalysisMapper">
<mapper namespace="com.wanji.indicators.mapper.EventInfoMapper">
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment