Commit 9668e4f3 authored by zhoushiguang's avatar zhoushiguang

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	wj-realtime-computing/src/main/java/com/wanji/indicators/entity/EventInfo.java
parents 992909ec f1ead06f
...@@ -203,6 +203,11 @@ public class LanePeriodicDataEventListConvert { ...@@ -203,6 +203,11 @@ public class LanePeriodicDataEventListConvert {
crossLaneDataRealTimePO.setGreenLightEfficiency(greenLightEfficiency); crossLaneDataRealTimePO.setGreenLightEfficiency(greenLightEfficiency);
crossLaneDataRealTimePO.setTimeOccupancy(timeOccupancy); crossLaneDataRealTimePO.setTimeOccupancy(timeOccupancy);
crossLaneDataRealTimePO.setVehicleNumsRatioMean(vehicleNumsRatioMean); crossLaneDataRealTimePO.setVehicleNumsRatioMean(vehicleNumsRatioMean);
crossLaneDataRealTimePO.setNonMotorFlow(eventList.getNonMotorFlow());
crossLaneDataRealTimePO.setV85(eventList.getV85());
crossLaneDataRealTimePO.setTrafficFlowA(eventList.getTrafficFlowA());
crossLaneDataRealTimePO.setTrafficFlowB(eventList.getTrafficFlowB());
crossLaneDataRealTimePO.setTrafficFlowC(eventList.getTrafficFlowC());
return crossLaneDataRealTimePO; return crossLaneDataRealTimePO;
} }
} }
package com.wanji.indicators.constant.citybrain;
import lombok.AllArgsConstructor;
import lombok.Getter;
import net.wanji.common.utils.tool.StringUtils;
/**
* @author duanruiming
* @date 2024/05/16 9:37
*/
@Getter
@AllArgsConstructor
public enum ParkTypeEnum {
ONE("parking", "停车事件"),
TWO("jam", "拥堵事件"),
THREE("accident", "事故事件");
private final String code;
private final String message;
public static String getMsg(String code) {
for (ParkTypeEnum value : ParkTypeEnum.values()) {
if (StringUtils.equalsIgnoreCase(value.getCode(), code)) {
return value.getMessage();
}
}
return ParkTypeEnum.THREE.getMessage();
}
}
\ No newline at end of file
...@@ -2,6 +2,7 @@ package com.wanji.indicators.constant.citybrain; ...@@ -2,6 +2,7 @@ package com.wanji.indicators.constant.citybrain;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import net.wanji.common.utils.tool.StringUtils;
/** /**
* @author duanruiming * @author duanruiming
...@@ -10,12 +11,21 @@ import lombok.Getter; ...@@ -10,12 +11,21 @@ import lombok.Getter;
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public enum SpilloverStateEnum { public enum SpilloverStateEnum {
FINISH("0"), FINISH("0", "消散"),
START("1"), START("1", "触发"),
CONTINUE("2"); CONTINUE("2", "持续");
private final String code; private final String code;
private final String message;
public static String getMsg(String code) {
for (SpilloverStateEnum value : SpilloverStateEnum.values()) {
if (StringUtils.equalsIgnoreCase(value.getCode(), code)) {
return value.getMessage();
}
}
return SpilloverStateEnum.FINISH.getMessage();
}
} }
...@@ -4,8 +4,8 @@ import com.alibaba.fastjson.annotation.JSONField; ...@@ -4,8 +4,8 @@ import com.alibaba.fastjson.annotation.JSONField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
//import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
//import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
...@@ -16,86 +16,87 @@ import java.time.LocalDateTime; ...@@ -16,86 +16,87 @@ import java.time.LocalDateTime;
@Data @Data
@EqualsAndHashCode(callSuper = false) @EqualsAndHashCode(callSuper = false)
@TableName("t_event_info") @TableName("t_event_info")
//@ApiModel(value = "EventInfo", description = "交通事件信息") @ApiModel(value = "EventInfo", description = "交通事件信息")
public class EventInfo implements Serializable { public class EventInfo implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
//@ApiModelProperty(value = "主键ID", hidden = true) @ApiModelProperty(value = "主键ID", hidden = true)
@JSONField(serialize = false) @JSONField(serialize = false)
private Long oid; private Long oid;
//@ApiModelProperty(value = "车牌号") @ApiModelProperty(value = "车牌号")
private String plateNo; private String plateNo;
//@ApiModelProperty(value = "参与者类别 1:机动车 2:非机动车 3:行人") @ApiModelProperty(value = "参与者类别 1:机动车 2:非机动车 3:行人")
private String objectType; private String objectType;
//@ApiModelProperty(value = "可信度") @ApiModelProperty(value = "可信度")
private Integer confidence; private Integer confidence;
//@ApiModelProperty(value = "检测时间", notes = "格式:yyyy-MM-dd HH:mm:ss") @ApiModelProperty(value = "检测时间", notes = "格式:yyyy-MM-dd HH:mm:ss")
@JSONField(format = "yyyy-MM-dd HH:mm:ss") @JSONField(format = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime detectTime; private LocalDateTime detectTime;
//@ApiModelProperty(value = "事件等级:1扣分 2 罚款 3警告 0 未知") @ApiModelProperty(value = "事件等级:1扣分 2 罚款 3警告 0 未知")
private Integer grade; private Integer grade;
//@ApiModelProperty(value = "事件地点描述") @ApiModelProperty(value = "事件地点描述")
private String placeDesc; private String placeDesc;
//@ApiModelProperty(value = "发生地点经度") @ApiModelProperty(value = "发生地点经度")
private BigDecimal lng; private BigDecimal lng;
//@ApiModelProperty(value = "发生地点纬度") @ApiModelProperty(value = "发生地点纬度")
private BigDecimal lat; private BigDecimal lat;
//@ApiModelProperty(value = "事件一级类别 1:非机动事件 2:行人事件 3:机动车事件") @ApiModelProperty(value = "事件一级类别 1:非机动事件 2:行人事件 3:机动车事件")
private String category; private String category;
//@ApiModelProperty(value = "二级类别") @ApiModelProperty(value = "二级类别")
@JsonProperty("eventType") @JsonProperty("eventType")
@JSONField(name = "eventType") @JSONField(name = "eventType")
private String type; private String type;
//@ApiModelProperty(value = "事件发生时间") @ApiModelProperty(value = "事件发生时间")
@JSONField(format = "yyyy-MM-dd HH:mm:ss") @JSONField(format = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime startTime; private LocalDateTime startTime;
//@ApiModelProperty(value = "事件结束时间") @ApiModelProperty(value = "事件结束时间")
@JSONField(format = "yyyy-MM-dd HH:mm:ss") @JSONField(format = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime endTime; private LocalDateTime endTime;
//@ApiModelProperty(value = "事件持续时长,单位分钟") @ApiModelProperty(value = "事件持续时长,单位分钟")
private Integer duration; private Integer duration;
//@ApiModelProperty(value = "事件来源") @ApiModelProperty(value = "事件来源", notes = "1-系统检测;2-接处警;3-互联网;4-交管部门;5-济南城市大脑;")
private String source; private String source;
//@ApiModelProperty(value = "车道ID") @ApiModelProperty(value = "车道ID")
private String laneId; private String laneId;
//@ApiModelProperty(value = "路段ID") @ApiModelProperty(value = "路段ID")
private String rid; private String rid;
//@ApiModelProperty(value = "路口ID") @ApiModelProperty(value = "路口ID")
private String crossId; private String crossId;
//@ApiModelProperty(value = "渠化ID") @ApiModelProperty(value = "渠化ID")
private String segmentId; private String segmentId;
//@ApiModelProperty(value = "关联的事件摄像头信息ID") @ApiModelProperty(value = "关联的事件摄像头信息ID")
private String cameraOid; private String cameraOid;
//@ApiModelProperty(value = "序列号") @ApiModelProperty(value = "序列号")
private String eventSerialNumber; private String eventSerialNumber;
//@ApiModelProperty(value = "事件状态") @ApiModelProperty(value = "事件状态")
private String dataStatus; private String dataStatus;
private String globalId; private String globalId;
private Integer stationId; private Integer stationId;
private String eventId; private String eventId;
//@ApiModelProperty(value = "入库时间", hidden = true) @ApiModelProperty(value = "入库时间", hidden = true)
@JSONField(serialize = false) @JSONField(serialize = false)
private LocalDateTime ruksj; private LocalDateTime ruksj;
private String remark;
} }
...@@ -3,6 +3,8 @@ package com.wanji.indicators.task.citybrainreceive.event.park; ...@@ -3,6 +3,8 @@ package com.wanji.indicators.task.citybrainreceive.event.park;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.constant.Constant; import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.constant.citybrain.ParkTypeEnum;
import com.wanji.indicators.constant.citybrain.SpilloverStateEnum;
import com.wanji.indicators.entity.EventInfo; import com.wanji.indicators.entity.EventInfo;
import com.wanji.indicators.task.citybrain.vo.LaneParkEventVO; import com.wanji.indicators.task.citybrain.vo.LaneParkEventVO;
import com.wanji.indicators.task.citybrainreceive.CommonReceiveModel; import com.wanji.indicators.task.citybrainreceive.CommonReceiveModel;
...@@ -39,12 +41,11 @@ public class LaneParkEventReceiveMap implements MapFunction<String, List<EventIn ...@@ -39,12 +41,11 @@ public class LaneParkEventReceiveMap implements MapFunction<String, List<EventIn
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class); CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
String body = model.getBody(); String body = model.getBody();
String headers = model.getHeaders(); String headers = model.getHeaders();
if (StringUtils.isNotBlank(headers) && headers.contains(Constant.WAN_JI_NAME)) { if (StringUtils.isNotBlank(headers) && headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body)) {
return Collections.emptyList(); return Collections.emptyList();
} }
List<EventInfo> eventInfos = new ArrayList<>(); List<EventInfo> eventInfos = new ArrayList<>();
List<LaneParkEventVO> laneParkEventVOS = mapper.readValue(body, new TypeReference<List<LaneParkEventVO>>() { List<LaneParkEventVO> laneParkEventVOS = mapper.readValue(body, new TypeReference<List<LaneParkEventVO>>() {});
});
if (!CollectionUtils.isEmpty(laneParkEventVOS)) { if (!CollectionUtils.isEmpty(laneParkEventVOS)) {
for (LaneParkEventVO laneParkEventVO : laneParkEventVOS) { for (LaneParkEventVO laneParkEventVO : laneParkEventVOS) {
...@@ -58,7 +59,11 @@ public class LaneParkEventReceiveMap implements MapFunction<String, List<EventIn ...@@ -58,7 +59,11 @@ public class LaneParkEventReceiveMap implements MapFunction<String, List<EventIn
EventInfo eventInfo = new EventInfo(); EventInfo eventInfo = new EventInfo();
eventInfo.setDetectTime(localDateTime); eventInfo.setDetectTime(localDateTime);
eventInfo.setCategory("1"); eventInfo.setCategory("1");
String type = laneParkEventVO.getType();
eventInfo.setType("401"); eventInfo.setType("401");
if (StringUtils.equalsIgnoreCase(type, "illegal")) {
eventInfo.setType("1");
}
eventInfo.setStartTime(localDateTime); eventInfo.setStartTime(localDateTime);
//eventInfo.setEndTime(localDateTime); //eventInfo.setEndTime(localDateTime);
eventInfo.setDuration(0); eventInfo.setDuration(0);
...@@ -74,7 +79,8 @@ public class LaneParkEventReceiveMap implements MapFunction<String, List<EventIn ...@@ -74,7 +79,8 @@ public class LaneParkEventReceiveMap implements MapFunction<String, List<EventIn
eventInfo.setLng(new BigDecimal(split[0])); eventInfo.setLng(new BigDecimal(split[0]));
eventInfo.setLat(new BigDecimal(split[1])); eventInfo.setLat(new BigDecimal(split[1]));
} }
eventInfo.setPlaceDesc(laneParkEventVO.getType() + "-" + laneParkEventVO.getState()); eventInfo.setSource("5");
eventInfo.setRemark(ParkTypeEnum.getMsg(laneParkEventVO.getType()) + SpilloverStateEnum.getMsg(laneParkEventVO.getState()));
eventInfos.add(eventInfo); eventInfos.add(eventInfo);
} }
} }
......
package com.wanji.indicators.task.citybrainreceive.event.spillover; package com.wanji.indicators.task.citybrainreceive.event.spillover;
import com.wanji.indicators.task.citybrain.vo.SpilloverEventVO; import com.wanji.indicators.entity.EventInfo;
import com.wanji.indicators.task.citybrainreceive.event.park.LaneParkEventSaveFunction;
import com.wanji.indicators.util.PropertiesHelper; import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
...@@ -48,10 +49,11 @@ public class SpilloverEventReceiveMain { ...@@ -48,10 +49,11 @@ public class SpilloverEventReceiveMain {
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"city-brain-spilloverEvent-receive-data-source"); "city-brain-spilloverEvent-receive-data-source");
SingleOutputStreamOperator<List<SpilloverEventVO>> frameModelStream = stream SingleOutputStreamOperator<List<EventInfo>> frameModelStream = stream
.map(new SpilloverEventReceiveMap()) .map(new SpilloverEventReceiveMap())
.name("接收城市大脑车道溢出事件数据-JsonToObject"); .name("接收城市大脑车道溢出事件数据-JsonToObject");
frameModelStream.addSink(new LaneParkEventSaveFunction());
// 启动 Flink 作业 // 启动 Flink 作业
env.execute("接收城市大脑车道溢出事件数据"); env.execute("接收城市大脑车道溢出事件数据");
} catch (Exception e) { } catch (Exception e) {
......
...@@ -2,12 +2,28 @@ package com.wanji.indicators.task.citybrainreceive.event.spillover; ...@@ -2,12 +2,28 @@ package com.wanji.indicators.task.citybrainreceive.event.spillover;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.constant.citybrain.SpilloverStateEnum;
import com.wanji.indicators.entity.EventInfo;
import com.wanji.indicators.task.citybrain.vo.SpilloverEventVO; import com.wanji.indicators.task.citybrain.vo.SpilloverEventVO;
import com.wanji.indicators.task.citybrainreceive.CommonReceiveModel; import com.wanji.indicators.task.citybrainreceive.CommonReceiveModel;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.framework.Constants;
import net.wanji.common.utils.tool.DateUtil;
import net.wanji.common.utils.tool.JacksonUtils; import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List; import java.util.List;
/** /**
...@@ -15,15 +31,65 @@ import java.util.List; ...@@ -15,15 +31,65 @@ import java.util.List;
* @date 2024/04/11 10:40 * @date 2024/04/11 10:40
*/ */
@Service @Service
public class SpilloverEventReceiveMap implements MapFunction<String, List<SpilloverEventVO>> { @Slf4j
public class SpilloverEventReceiveMap implements MapFunction<String, List<EventInfo>> {
@Override @Override
public List<SpilloverEventVO> map(String value) throws Exception { public List<EventInfo> map(String value) throws Exception {
try {
ObjectMapper mapper = JacksonUtils.getInstance(); ObjectMapper mapper = JacksonUtils.getInstance();
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class); CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
String body = model.getBody(); String body = model.getBody();
String headers = model.getHeaders();
if (StringUtils.isNotBlank(headers) && headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body)) {
return Collections.emptyList();
}
List<EventInfo> eventInfos = new ArrayList<>();
List<SpilloverEventVO> spilloverEventVOS = mapper.readValue(body, new TypeReference<List<SpilloverEventVO>>() {}); List<SpilloverEventVO> spilloverEventVOS = mapper.readValue(body, new TypeReference<List<SpilloverEventVO>>() {});
if (!CollectionUtils.isEmpty(spilloverEventVOS)) {
for (SpilloverEventVO spilloverEventVO : spilloverEventVOS) {
String dataTime = spilloverEventVO.getData_time();
Instant instant = Instant.ofEpochMilli(Long.parseLong(dataTime));
ZonedDateTime zonedDateTime = instant.atZone(ZoneId.of("+8"));
LocalDateTime localDateTime = zonedDateTime.toLocalDateTime();
Date date = new Date(Long.parseLong(dataTime));
String dateStr = DateUtil.format(date, Constants.DATE_FORMAT.E_DATE_FORMAT_MILLIS);
EventInfo eventInfo = new EventInfo();
eventInfo.setDetectTime(localDateTime);
eventInfo.setCategory("1");
String type = spilloverEventVO.getEventType();
String typeMeg = null;
if (StringUtils.equalsIgnoreCase(type, "spillback")) {
eventInfo.setType("605");
typeMeg = "溢出";
}
eventInfo.setStartTime(localDateTime);
//eventInfo.setEndTime(localDateTime);
eventInfo.setDuration(0);
eventInfo.setRuksj(localDateTime);
eventInfo.setLaneId(spilloverEventVO.getEuuid());
eventInfo.setCrossId(spilloverEventVO.getDeviceNo());
eventInfo.setEventSerialNumber(spilloverEventVO.getDeviceNo() + dataTime);
String geography = spilloverEventVO.getGeography();
String[] split = null;
if (StringUtils.isNotBlank(geography)) {
split = geography.replaceAll("\\[", "").replaceAll("\\]", "").split(",");
eventInfo.setLng(new BigDecimal(split[0]));
eventInfo.setLat(new BigDecimal(split[1]));
}
//eventInfo.setPlaceDesc(SpilloverEventVO.getType() + "-" + SpilloverEventVO.getState());
eventInfo.setSource("5");
eventInfo.setRemark( typeMeg + SpilloverStateEnum.getMsg(spilloverEventVO.getState()));
eventInfos.add(eventInfo);
}
}
return spilloverEventVOS; return eventInfos;
} catch (Exception e) {
log.error("溢出事件数据转化异常:", e);
throw new RuntimeException(e);
}
} }
} }
\ No newline at end of file
package com.wanji.indicators.task.citybrainreceive.event.spillover;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.task.citybrain.vo.SpilloverEventVO;
import com.wanji.indicators.task.citybrainreceive.CommonReceiveModel;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
/**
* @author duanruiming
* @date 2024/05/16 10:32
*/
@Service
@Slf4j
public class SpilloverEventTempReceiveMap implements MapFunction<String, List<SpilloverEventVO>> {
@Override
public List<SpilloverEventVO> map(String value) throws Exception {
try {
ObjectMapper mapper = JacksonUtils.getInstance();
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
String body = model.getBody();
String headers = model.getHeaders();
if (StringUtils.isNotBlank(headers) && headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body)) {
return Collections.emptyList();
}
List<SpilloverEventVO> spilloverEventVOS = mapper.readValue(body, new TypeReference<List<SpilloverEventVO>>() {});
if (!CollectionUtils.isEmpty(spilloverEventVOS)) {
return spilloverEventVOS;
}
} catch (Exception e) {
log.error("溢出事件数据转化异常:", e);
throw new RuntimeException(e);
}
return Collections.emptyList();
}
}
...@@ -35,10 +35,10 @@ public class LaneStatisticsReceive2CrossDataCoFlatMap implements CoFlatMapFuncti ...@@ -35,10 +35,10 @@ public class LaneStatisticsReceive2CrossDataCoFlatMap implements CoFlatMapFuncti
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class); CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
List<CrossDataRealtimePO> results = new ArrayList<>(); List<CrossDataRealtimePO> results = new ArrayList<>();
String headers = model.getHeaders(); String headers = model.getHeaders();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || headers.contains("yigou"))) { String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))) {
return; return;
} }
String body = model.getBody();
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {}); List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {});
if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) { if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) {
for (LaneStatisticsDataVO inVo : laneStatisticsDataVOS) { for (LaneStatisticsDataVO inVo : laneStatisticsDataVOS) {
......
...@@ -30,10 +30,10 @@ public class LaneStatisticsReceive2CrossDataMap implements MapFunction<String, S ...@@ -30,10 +30,10 @@ public class LaneStatisticsReceive2CrossDataMap implements MapFunction<String, S
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class); CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
List<CrossDataRealtimePO> results = new ArrayList<>(); List<CrossDataRealtimePO> results = new ArrayList<>();
String headers = model.getHeaders(); String headers = model.getHeaders();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || headers.contains("yigou"))) { String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))) {
return null; return null;
} }
String body = model.getBody();
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() { List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {
}); });
if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) { if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) {
......
...@@ -30,10 +30,10 @@ public class LaneStatisticsReceive2DirDataMap implements MapFunction<String, Str ...@@ -30,10 +30,10 @@ public class LaneStatisticsReceive2DirDataMap implements MapFunction<String, Str
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class); CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
List<CrossDirDataRealtimePO> results = new ArrayList<>(); List<CrossDirDataRealtimePO> results = new ArrayList<>();
String headers = model.getHeaders(); String headers = model.getHeaders();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || headers.contains("yigou"))) { String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))) {
return null; return null;
} }
String body = model.getBody();
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {}); List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {});
if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) { if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) {
for (LaneStatisticsDataVO inVo : laneStatisticsDataVOS) { for (LaneStatisticsDataVO inVo : laneStatisticsDataVOS) {
......
...@@ -30,10 +30,10 @@ public class LaneStatisticsReceive2LaneDataMap implements MapFunction<String, St ...@@ -30,10 +30,10 @@ public class LaneStatisticsReceive2LaneDataMap implements MapFunction<String, St
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class); CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
List<CrossLaneDataRealTimePO> results = new ArrayList<>(); List<CrossLaneDataRealTimePO> results = new ArrayList<>();
String headers = model.getHeaders(); String headers = model.getHeaders();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || headers.contains("yigou"))) { String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))) {
return null; return null;
} }
String body = model.getBody();
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() { List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {
}); });
if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) { if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) {
......
...@@ -30,10 +30,10 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St ...@@ -30,10 +30,10 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class); CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
List<CrossTurnDataRealtimePO> results = new ArrayList<>(); List<CrossTurnDataRealtimePO> results = new ArrayList<>();
String headers = model.getHeaders(); String headers = model.getHeaders();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || headers.contains("yigou"))) { String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))) {
return null; return null;
} }
String body = model.getBody();
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() { List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {
}); });
if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) { if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) {
......
...@@ -2,7 +2,7 @@ package com.wanji.indicators.task.citybrainreceive.lanestatistics; ...@@ -2,7 +2,7 @@ package com.wanji.indicators.task.citybrainreceive.lanestatistics;
import com.wanji.indicators.constant.Constant; import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.task.citybrain.vo.SpilloverEventVO; import com.wanji.indicators.task.citybrain.vo.SpilloverEventVO;
import com.wanji.indicators.task.citybrainreceive.event.spillover.SpilloverEventReceiveMap; import com.wanji.indicators.task.citybrainreceive.event.spillover.SpilloverEventTempReceiveMap;
import com.wanji.indicators.util.PropertiesHelper; import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
...@@ -78,7 +78,7 @@ public class LaneStatisticsReceiveMain { ...@@ -78,7 +78,7 @@ public class LaneStatisticsReceiveMain {
private static void dealCrossRealTimeData(StreamExecutionEnvironment env, DataStream<String> staticticsStream, DataStream<String> spilloverStream) { private static void dealCrossRealTimeData(StreamExecutionEnvironment env, DataStream<String> staticticsStream, DataStream<String> spilloverStream) {
SingleOutputStreamOperator<List<SpilloverEventVO>> spilloverVoStream = spilloverStream.map(new SpilloverEventReceiveMap()); SingleOutputStreamOperator<List<SpilloverEventVO>> spilloverVoStream = spilloverStream.map(new SpilloverEventTempReceiveMap());
SingleOutputStreamOperator<String> laneRealtimeStream = staticticsStream SingleOutputStreamOperator<String> laneRealtimeStream = staticticsStream
.connect(spilloverVoStream.broadcast()) .connect(spilloverVoStream.broadcast())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment