Commit 97c5d2b0 authored by duanruiming's avatar duanruiming

[update] 交通大脑-统计数据优化转化数据优化························

parent de142ba3
...@@ -51,7 +51,7 @@ public class ForeignDeviceInfoSource extends RichParallelSourceFunction<Map<Stri ...@@ -51,7 +51,7 @@ public class ForeignDeviceInfoSource extends RichParallelSourceFunction<Map<Stri
while (running) { while (running) {
List<ForeignDevicePO> foreignDevicePOS = foreignDeviceMapper.selectAllDevices(); List<ForeignDevicePO> foreignDevicePOS = foreignDeviceMapper.selectAllDevices();
if (!CollectionUtils.isEmpty(foreignDevicePOS)) { if (!CollectionUtils.isEmpty(foreignDevicePOS)) {
Map<String, List<ForeignDevicePO>> map = foreignDevicePOS.stream().collect(Collectors.groupingBy(ForeignDevicePO::getCrossId)); Map<String, List<ForeignDevicePO>> map = foreignDevicePOS.stream().collect(Collectors.groupingBy(ForeignDevicePO::getDeviceNo));
sourceContext.collect(map); sourceContext.collect(map);
} }
TimeUnit.MINUTES.sleep(1); TimeUnit.MINUTES.sleep(1);
......
...@@ -16,7 +16,10 @@ import org.apache.flink.util.Collector; ...@@ -16,7 +16,10 @@ import org.apache.flink.util.Collector;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.*; import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.OptionalDouble;
/** /**
* @author duanruiming * @author duanruiming
...@@ -36,7 +39,8 @@ public class LaneStatisticsReceive2CrossDataCoFlatMap implements CoFlatMapFuncti ...@@ -36,7 +39,8 @@ public class LaneStatisticsReceive2CrossDataCoFlatMap implements CoFlatMapFuncti
List<CrossDataRealtimePO> results = new ArrayList<>(); List<CrossDataRealtimePO> results = new ArrayList<>();
String headers = model.getHeaders(); String headers = model.getHeaders();
String body = model.getBody(); String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))) { if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) ||
StringUtils.isBlank(body)) || !body.contains("data_list")) {
return; return;
} }
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {}); List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {});
...@@ -119,16 +123,15 @@ public class LaneStatisticsReceive2CrossDataCoFlatMap implements CoFlatMapFuncti ...@@ -119,16 +123,15 @@ public class LaneStatisticsReceive2CrossDataCoFlatMap implements CoFlatMapFuncti
} }
} }
po.setSpilloverDirs(spilloverDirs); po.setSpilloverDirs(spilloverDirs);
} }
results.add(po); results.add(po);
} }
} }
} }
Map<String, List<CrossDataRealtimePO>> map = new HashMap<>(); //Map<String, List<CrossDataRealtimePO>> map = new HashMap<>();
map.put(Constant.REAL_TIME_CROSS_DATA_KEY, results); //map.put(Constant.REAL_TIME_CROSS_DATA_KEY, results);
out.collect(mapper.writeValueAsString(map)); out.collect(mapper.writeValueAsString(results));
} catch (Exception e) { } catch (Exception e) {
log.error("LaneStatisticsReceive2CrossDataCoFlatMap: ", e); log.error("LaneStatisticsReceive2CrossDataCoFlatMap: ", e);
throw new RuntimeException(e); throw new RuntimeException(e);
......
package com.wanji.indicators.task.citybrainreceive.lanestatistics;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.entity.ForeignDevicePO;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import net.wanji.databus.po.CrossDataRealtimePO;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2024/07/19 15:49
*/
@Service
@Slf4j
public class LaneStatisticsReceive2CrossDataCoFlatMapVersion2 implements CoFlatMapFunction<String, Map<String, List<ForeignDevicePO>>, String> {
private static final Map<String, List<ForeignDevicePO>> foreignDeviceMap = new HashMap<>();
@Override
public void flatMap1(String value, Collector<String> out) throws Exception {
try {
ObjectMapper mapper = JacksonUtils.getInstance();
List<CrossDataRealtimePO> results = new ArrayList<>();
List<CrossDataRealtimePO> paramList = mapper.readValue(value, new TypeReference<List<CrossDataRealtimePO>>() {});
replaceDeviceNO(paramList);
Map<String, List<CrossDataRealtimePO>> crossMap = paramList.stream().collect(Collectors.groupingBy(CrossDataRealtimePO::getCrossId));
if (!crossMap.isEmpty()) {
for (Map.Entry<String, List<CrossDataRealtimePO>> entry : crossMap.entrySet()) {
String crossId = entry.getKey();
List<CrossDataRealtimePO> realtimePOS = entry.getValue();
CrossDataRealtimePO po = new CrossDataRealtimePO();
Integer status = 0;
Integer flow = 0;
Double speed = 0.0;
Double speed85 = 0.0;
Double queueLengthMax = 0.0;
Double stopTimes = 0.0;
int duration = 0;
String unbalaDirs = "";
String conDirs = "";
String spillDirs = "";
Integer delayTime = 0;
Double sturation = 0.0;
Double noStopRate = 0.0;
Double oneStopRate = 0.0;
Double twoStopRate = 0.0;
Double threeStopRate = 0.0;
Double effusionRate = 0.0;
Double greenLightEfficiency = 0.0;
Double timeOccupancy = 0.0;
Double vehicleLengthRatioMean = 0.0;
int trafficA = 0;
int trafficB = 0;
int trafficC = 0;
int nonMotorFlow = 0;
for (CrossDataRealtimePO realtimePO : realtimePOS) {
status = status >= realtimePO.getStatus() ? status : realtimePO.getStatus();
flow += realtimePO.getFlow();
speed += realtimePO.getSpeed();
speed85 += realtimePO.getV85();
duration = duration >= realtimePO.getDuration() ? duration : realtimePO.getDuration();
queueLengthMax = queueLengthMax >= realtimePO.getQueueLength() ? queueLengthMax : realtimePO.getQueueLength();
stopTimes = stopTimes >= realtimePO.getStopTimes() ? stopTimes : realtimePO.getStopTimes();
delayTime += realtimePO.getDelayTime();
sturation += realtimePO.getSturation();
noStopRate += realtimePO.getNoStopRate();
oneStopRate += realtimePO.getOneStopRate();
twoStopRate += realtimePO.getTwoStopRate();
threeStopRate += realtimePO.getThreeStopRate();
effusionRate += realtimePO.getEffusionRate();
timeOccupancy += realtimePO.getTimeOccupancy();
vehicleLengthRatioMean += realtimePO.getVehicleLengthRatioMean();
if (StringUtils.isNotBlank(realtimePO.getUnbalanceDirs())) {
unbalaDirs = unbalaDirs.concat(realtimePO.getUnbalanceDirs());
}
if (StringUtils.isNotBlank(realtimePO.getCongestionDirs())) {
conDirs = conDirs.concat(realtimePO.getCongestionDirs());
}
if (StringUtils.isNotBlank(realtimePO.getSpilloverDirs())) {
spillDirs = spillDirs.concat(realtimePO.getSpilloverDirs());
}
nonMotorFlow += realtimePO.getNonMotorFlow();
trafficA += realtimePO.getTrafficFlowA();
trafficB += realtimePO.getTrafficFlowB();
trafficC += realtimePO.getTrafficFlowC();
}
po.setCrossId(crossId);
po.setStatus(status);
po.setType(1);
if (status > 1) {
po.setType(2);
}
double delayIndex = Math.round(60 / speed * 100) / 100.0;
po.setTrafficIndex(delayIndex);
po.setStartTime(realtimePOS.get(0).getStartTime());
po.setDuration(duration);
if (status == 1) {
po.setIsUnbalance(1);
} else if (status == 2) {
po.setIsCongestion(1);
} else if (status == 3) {
po.setIsSpillover(1);
}
po.setUnbalanceIndex(delayIndex);
po.setSpilloverIndex(delayIndex);
po.setCongestionIndex(delayIndex);
po.setUnbalanceDirs(unbalaDirs);
po.setCongestionDirs(conDirs);
po.setSpilloverDirs(spillDirs);
po.setFlow(flow);
po.setFlowRate(Math.round(flow * 12 * 100) / 100.0);
po.setSpeed(Math.round(speed / realtimePOS.size() * 100) / 100.0);
po.setQueueLength(queueLengthMax);
po.setStopTimes(Math.round(stopTimes / realtimePOS.size() * 100) / 100.0);
po.setDelayTime(Math.round(delayTime / realtimePOS.size() * 100) / 100);
po.setSturation(Math.round(sturation / realtimePOS.size() * 100) / 100.0);
po.setBatchTime(realtimePOS.get(0).getBatchTime());
po.setGmtCreate(realtimePOS.get(0).getGmtCreate());
po.setGmtModified(realtimePOS.get(0).getGmtModified());
po.setEndTime(realtimePOS.get(0).getEndTime());
po.setLoadBalance(Math.round(Math.sqrt(sturation / realtimePOS.size()) * 100) / 100.0);
po.setNoStopRate(Math.round(noStopRate / realtimePOS.size() * 100) / 100.0);
po.setOneStopRate(Math.round(oneStopRate / realtimePOS.size() * 100) / 100.0);
po.setTwoStopRate(Math.round(twoStopRate / realtimePOS.size() * 100) / 100.0);
po.setThreeStopRate(Math.round(threeStopRate / realtimePOS.size() * 100) / 100.0);
po.setNonMotorFlow(nonMotorFlow);
po.setV85(Math.round(speed85 / realtimePOS.size() * 100) / 100.0);
po.setTrafficFlowA(trafficA);
po.setTrafficFlowB(trafficB);
po.setTrafficFlowC(trafficC);
po.setTimeOccupancy(Math.round(timeOccupancy / realtimePOS.size() * 100) / 100.0);
results.add(po);
}
}
if (!CollectionUtils.isEmpty(results)) {
Map<String, List<CrossDataRealtimePO>> map = new HashMap<>();
map.put(Constant.REAL_TIME_CROSS_DATA_KEY, results);
out.collect(mapper.writeValueAsString(map));
}
} catch (Exception e) {
log.error("LaneStatisticsReceive2CrossDataCoFlatMapVersion2-异常: ", e);
throw new RuntimeException(e);
}
}
private void replaceDeviceNO(List<CrossDataRealtimePO> paramList) {
if (!CollectionUtils.isEmpty(paramList)) {
List<ForeignDevicePO> foreignDevicePOS = foreignDeviceMap.get(paramList.get(0).getCrossId());
paramList.forEach(po -> {
if (!CollectionUtils.isEmpty(foreignDevicePOS)) {
po.setCrossId(foreignDevicePOS.get(0).getCrossId());
}
});
}
}
@Override
public void flatMap2(Map<String, List<ForeignDevicePO>> value, Collector<String> out) throws Exception {
foreignDeviceMap.putAll(value);
}
}
\ No newline at end of file
...@@ -31,7 +31,8 @@ public class LaneStatisticsReceive2DirDataMap implements MapFunction<String, Str ...@@ -31,7 +31,8 @@ public class LaneStatisticsReceive2DirDataMap implements MapFunction<String, Str
List<CrossDirDataRealtimePO> results = new ArrayList<>(); List<CrossDirDataRealtimePO> results = new ArrayList<>();
String headers = model.getHeaders(); String headers = model.getHeaders();
String body = model.getBody(); String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))) { if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME)
|| StringUtils.isBlank(body))) {
return null; return null;
} }
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {}); List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {});
......
package com.wanji.indicators.task.citybrainreceive.lanestatistics;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.entity.ForeignDevicePO;
import com.wanji.indicators.task.citybrain.vo.LaneStatisticsDataVO;
import com.wanji.indicators.task.citybrain.vo.LaneStatisticsDetail;
import com.wanji.indicators.task.citybrainreceive.CommonReceiveModel;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import net.wanji.databus.po.CrossDirDataRealtimePO;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.*;
/**
* @author duanruiming
* @date 2024/07/19 14:38
*/
@Service
@Slf4j
public class LaneStatisticsReceive2DirDataMapVersion2 implements CoFlatMapFunction<String, Map<String, List<ForeignDevicePO>>, String> {
private static final Map<String, List<ForeignDevicePO>> foreignDeviceMap = new HashMap<>();
@Override
public void flatMap1(String value, Collector<String> out) throws Exception {
try {
ObjectMapper mapper = JacksonUtils.getInstance();
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
List<CrossDirDataRealtimePO> results = new ArrayList<>();
String headers = model.getHeaders();
String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME)
|| StringUtils.isBlank(body)) || !body.contains("data_list")) {
return;
}
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {});
if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) {
for (LaneStatisticsDataVO inVo : laneStatisticsDataVOS) {
String deviceNo = inVo.getDeviceNo();
String startTime = inVo.getData_time();
Date dateTime = new Date(Long.parseLong(startTime));
String dir = inVo.getSectionNo();
List<LaneStatisticsDetail> dataList = inVo.getData_list();
if (!CollectionUtils.isEmpty(dataList)) {
ForeignDevicePO foreignDevicePO = getForeignDevicePO(deviceNo, dir, foreignDeviceMap);
if (Objects.isNull(foreignDevicePO)) {
continue;
}
String crossId = foreignDevicePO.getCrossId();
CrossDirDataRealtimePO po = new CrossDirDataRealtimePO();
po.setId(String.join(Constant.MIDDLE_LINE, crossId, dir, "2"));
po.setDirType(Integer.parseInt(dir));
po.setInOutType(2);
po.setCrossId(crossId);
po.setLength(0.0);
po.setStatus(1);
po.setStartTime(dateTime);
int flowSum = dataList.stream().filter(po1 -> StringUtils.isNotBlank(po1.getWArrivalFlow())).map(LaneStatisticsDetail::getWArrivalFlow).mapToInt(Integer::parseInt).sum();
po.setFlow(flowSum);
po.setCapacity(flowSum);
po.setDuration(5);
OptionalDouble speedAverage = dataList.stream().map(LaneStatisticsDetail::getSpeedAvg).mapToDouble(Double::parseDouble).average();
po.setSpeed(0.0);
if (speedAverage.isPresent()) {
po.setSpeed(speedAverage.getAsDouble());
}
if (po.getSpeed() > 0) {
po.setTrafficIndex(Math.round(60 / po.getSpeed() / dataList.size() * 100.0) / 100.0);
} else {
po.setTrafficIndex(1.0);
}
OptionalDouble queueLength = dataList.stream().map(LaneStatisticsDetail::getByQueueLen).mapToDouble(Double::parseDouble).max();
po.setQueueLength(0.0);
if (queueLength.isPresent()) {
po.setQueueLength(queueLength.getAsDouble());
}
OptionalDouble stopTimeAverage = dataList.stream().map(LaneStatisticsDetail::getByStoppingTimes).mapToDouble(Double::parseDouble).average();
po.setStopTimes(0.0);
if (stopTimeAverage.isPresent()) {
po.setStopTimes(stopTimeAverage.getAsDouble());
}
OptionalDouble delayTime = dataList.stream().filter(l -> StringUtils.isNotBlank(l.getWDelay())).map(LaneStatisticsDetail::getWDelay).mapToDouble(Double::parseDouble).average();
po.setDelayTime(0);
if (delayTime.isPresent()) {
po.setDelayTime((int) delayTime.getAsDouble());
}
po.setSturation(0.7);
po.setEffusionRate(0.0);
po.setNoStopRate(0.0);
po.setOneStopRate(0.0);
po.setTwoStopRate(0.0);
po.setThreeStopRate(0.0);
po.setBatchTime(Integer.parseInt(startTime.substring(0, 10)));
po.setEffusionRate(0.0);
po.setGreenLightEfficiency(0.0);
results.add(po);
}
}
}
if (!CollectionUtils.isEmpty(results)) {
Map<String, List<CrossDirDataRealtimePO>> map = new HashMap<>();
map.put(Constant.REAL_TIME_DIR_DATA_KEY, results);
out.collect(mapper.writeValueAsString(map));
}
} catch (Exception e) {
log.error("LaneStatisticsReceive2DirDataMap-异常", e);
throw new RuntimeException(e);
}
}
private ForeignDevicePO getForeignDevicePO(String deviceNo, String dir, Map<String, List<ForeignDevicePO>> foreignDeviceMap) {
if (!foreignDeviceMap.isEmpty()) {
List<ForeignDevicePO> foreignDevicePOS = foreignDeviceMap.get(deviceNo);
if (!CollectionUtils.isEmpty(foreignDevicePOS)) {
for (ForeignDevicePO foreignDevicePO : foreignDevicePOS) {
Integer type = foreignDevicePO.getType();
String currentDir = String.valueOf(foreignDevicePO.getDir());
String laneId = foreignDevicePO.getLaneId();
String sort = laneId.substring(laneId.length() -1);
if (Objects.equals(2, type) && StringUtils.equalsIgnoreCase(dir, currentDir)) {
return foreignDevicePO;
}
}
}
}
return null;
}
@Override
public void flatMap2(Map<String, List<ForeignDevicePO>> value, Collector<String> out) throws Exception {
foreignDeviceMap.putAll(value);
}
}
...@@ -2,6 +2,7 @@ package com.wanji.indicators.task.citybrainreceive.lanestatistics; ...@@ -2,6 +2,7 @@ package com.wanji.indicators.task.citybrainreceive.lanestatistics;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import com.wanji.indicators.constant.Constant; import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.entity.ForeignDevicePO; import com.wanji.indicators.entity.ForeignDevicePO;
import com.wanji.indicators.task.citybrain.vo.LaneStatisticsDataVO; import com.wanji.indicators.task.citybrain.vo.LaneStatisticsDataVO;
...@@ -36,13 +37,19 @@ public class LaneStatisticsReceive2LaneDataMapVersion2 implements CoFlatMapFunct ...@@ -36,13 +37,19 @@ public class LaneStatisticsReceive2LaneDataMapVersion2 implements CoFlatMapFunct
List<CrossLaneDataRealTimePO> results = new ArrayList<>(); List<CrossLaneDataRealTimePO> results = new ArrayList<>();
String headers = model.getHeaders(); String headers = model.getHeaders();
String body = model.getBody(); String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))) { if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME)
|| StringUtils.isBlank(body)) || !body.contains("data_list")) {
return; return;
} }
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {}); List<LaneStatisticsDataVO> laneStatisticsDataVOS = null;
try {
laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {});
} catch (MismatchedInputException e) {
log.error("==============:" + body);
}
if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) { if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) {
for (LaneStatisticsDataVO inVo : laneStatisticsDataVOS) { for (LaneStatisticsDataVO inVo : laneStatisticsDataVOS) {
String crossId = inVo.getDeviceNo(); String deviceNo = inVo.getDeviceNo();
String startTime = inVo.getData_time(); String startTime = inVo.getData_time();
Date dateTime = new Date(Long.parseLong(startTime)); Date dateTime = new Date(Long.parseLong(startTime));
String dir = inVo.getSectionNo(); String dir = inVo.getSectionNo();
...@@ -50,12 +57,15 @@ public class LaneStatisticsReceive2LaneDataMapVersion2 implements CoFlatMapFunct ...@@ -50,12 +57,15 @@ public class LaneStatisticsReceive2LaneDataMapVersion2 implements CoFlatMapFunct
if (!CollectionUtils.isEmpty(dataList)) { if (!CollectionUtils.isEmpty(dataList)) {
for (LaneStatisticsDetail detail : dataList) { for (LaneStatisticsDetail detail : dataList) {
String laneNo = detail.getLaneNo(); String laneNo = detail.getLaneNo();
String wjLaneId = getWjLaneId(crossId, dir, laneNo, foreignDeviceMap); ForeignDevicePO foreignDevicePO = getWjLaneId(deviceNo, dir, laneNo, foreignDeviceMap);
if (Objects.isNull(foreignDevicePO)) {
continue;
}
CrossLaneDataRealTimePO po = new CrossLaneDataRealTimePO(); CrossLaneDataRealTimePO po = new CrossLaneDataRealTimePO();
// todo // todo
po.setId(wjLaneId); po.setId(foreignDevicePO.getLaneId());
//po.setId(String.join(Constant.MIDDLE_LINE, crossId, dir, laneNo)); //po.setId(String.join(Constant.MIDDLE_LINE, crossId, dir, laneNo));
po.setCrossId(crossId); po.setCrossId(foreignDevicePO.getCrossId());
po.setStartTime(dateTime); po.setStartTime(dateTime);
String wArrivalFlow = detail.getWArrivalFlow(); String wArrivalFlow = detail.getWArrivalFlow();
wArrivalFlow = StringUtils.isBlank(wArrivalFlow) ? "0" : wArrivalFlow; wArrivalFlow = StringUtils.isBlank(wArrivalFlow) ? "0" : wArrivalFlow;
...@@ -86,18 +96,20 @@ public class LaneStatisticsReceive2LaneDataMapVersion2 implements CoFlatMapFunct ...@@ -86,18 +96,20 @@ public class LaneStatisticsReceive2LaneDataMapVersion2 implements CoFlatMapFunct
} }
} }
Map<String, List<CrossLaneDataRealTimePO>> map = new HashMap<>(); if (!CollectionUtils.isEmpty(results)) {
map.put(Constant.REAL_TIME_LANE_DATA_KEY, results); Map<String, List<CrossLaneDataRealTimePO>> map = new HashMap<>();
out.collect(mapper.writeValueAsString(map)); map.put(Constant.REAL_TIME_LANE_DATA_KEY, results);
out.collect(mapper.writeValueAsString(map));
}
} catch (Exception e) { } catch (Exception e) {
log.error("LaneStatisticsReceive2LaneDataMap ", e); log.error("LaneStatisticsReceive2LaneDataMap ", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private String getWjLaneId(String crossId, String dir, String laneNo, Map<String, List<ForeignDevicePO>> foreignDeviceMap) { private ForeignDevicePO getWjLaneId(String deviceNo, String dir, String laneNo, Map<String, List<ForeignDevicePO>> foreignDeviceMap) {
if (!foreignDeviceMap.isEmpty()) { if (!foreignDeviceMap.isEmpty()) {
List<ForeignDevicePO> foreignDevicePOS = foreignDeviceMap.get(crossId); List<ForeignDevicePO> foreignDevicePOS = foreignDeviceMap.get(deviceNo);
if (!CollectionUtils.isEmpty(foreignDevicePOS)) { if (!CollectionUtils.isEmpty(foreignDevicePOS)) {
for (ForeignDevicePO foreignDevicePO : foreignDevicePOS) { for (ForeignDevicePO foreignDevicePO : foreignDevicePOS) {
Integer type = foreignDevicePO.getType(); Integer type = foreignDevicePO.getType();
...@@ -106,12 +118,12 @@ public class LaneStatisticsReceive2LaneDataMapVersion2 implements CoFlatMapFunct ...@@ -106,12 +118,12 @@ public class LaneStatisticsReceive2LaneDataMapVersion2 implements CoFlatMapFunct
String sort = laneId.substring(laneId.length() -1); String sort = laneId.substring(laneId.length() -1);
if (Objects.equals(2, type) && StringUtils.equalsIgnoreCase(dir, currentDir) if (Objects.equals(2, type) && StringUtils.equalsIgnoreCase(dir, currentDir)
&& StringUtils.equalsIgnoreCase(sort, laneNo) ) { && StringUtils.equalsIgnoreCase(sort, laneNo) ) {
return foreignDevicePO.getLaneId(); return foreignDevicePO;
} }
} }
} }
} }
return ""; return null;
} }
@Override @Override
......
...@@ -32,7 +32,8 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St ...@@ -32,7 +32,8 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St
List<CrossTurnDataRealtimePO> results = new ArrayList<>(); List<CrossTurnDataRealtimePO> results = new ArrayList<>();
String headers = model.getHeaders(); String headers = model.getHeaders();
String body = model.getBody(); String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))) { if (StringUtils.isNotBlank(headers)
&& (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body)) || !body.contains("data_list")) {
return null; return null;
} }
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {}); List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {});
...@@ -61,9 +62,10 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St ...@@ -61,9 +62,10 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St
Map<String, List<CrossTurnDataRealtimePO>> map = new HashMap<>(); Map<String, List<CrossTurnDataRealtimePO>> map = new HashMap<>();
map.put(Constant.REAL_TIME_TURN_DATA_KEY, results); map.put(Constant.REAL_TIME_TURN_DATA_KEY, results);
log.error("字符串序列化: " + mapper.writeValueAsString(map));
return mapper.writeValueAsString(map); return mapper.writeValueAsString(map);
} catch (Exception e) { } catch (Exception e) {
log.error("LaneStatisticsReceive2TurnDataMap:", e); log.error("LaneStatisticsReceive2TurnDataMap-异常:", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
...@@ -124,14 +126,14 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St ...@@ -124,14 +126,14 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St
wArrivalFlow += Integer.parseInt(laneStatisticsDetail.getWArrivalFlow()); wArrivalFlow += Integer.parseInt(laneStatisticsDetail.getWArrivalFlow());
fSpaceOccupyRation += Double.parseDouble(laneStatisticsDetail.getFSpaceOccupyRation()); fSpaceOccupyRation += Double.parseDouble(laneStatisticsDetail.getFSpaceOccupyRation());
Double currrentQueue = Double.parseDouble(laneStatisticsDetail.getByQueueLen()); Double currrentQueue = Double.parseDouble(laneStatisticsDetail.getByQueueLen());
byQueueLen = byQueueLen >= currrentQueue ? byQueueLen :currrentQueue; byQueueLen = byQueueLen >= currrentQueue ? byQueueLen : currrentQueue;
wDelay += Integer.parseInt(laneStatisticsDetail.getWDelay()); wDelay += (int) Double.parseDouble(StringUtils.isBlank(laneStatisticsDetail.getWDelay()) ? "0" : laneStatisticsDetail.getWDelay());
byStoppingTimes += Double.parseDouble(laneStatisticsDetail.getByStoppingTimes()); byStoppingTimes += Double.parseDouble(laneStatisticsDetail.getByStoppingTimes());
dwLeftVehNum += laneStatisticsDetail.getDwLeftVehNum(); dwLeftVehNum += laneStatisticsDetail.getDwLeftVehNum();
dwRightVehNum += laneStatisticsDetail.getDwRightVehNum(); dwRightVehNum += laneStatisticsDetail.getDwRightVehNum();
dwStraightVehNum += laneStatisticsDetail.getDwStraightVehNum(); dwStraightVehNum += laneStatisticsDetail.getDwStraightVehNum();
} }
String id = String.join("_", crossId, dir, turn); String id = String.join("_", crossId, dir, turn);
realtimePO.setId(id); realtimePO.setId(id);
realtimePO.setTurnType(turn); realtimePO.setTurnType(turn);
realtimePO.setInDir(Integer.valueOf(dir)); realtimePO.setInDir(Integer.valueOf(dir));
...@@ -148,7 +150,7 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St ...@@ -148,7 +150,7 @@ public class LaneStatisticsReceive2TurnDataMap implements MapFunction<String, St
realtimePO.setSpeed(Math.round(speedAvg / dataList.size() * 100.0) / 100.0); realtimePO.setSpeed(Math.round(speedAvg / dataList.size() * 100.0) / 100.0);
realtimePO.setQueueLength(byQueueLen); realtimePO.setQueueLength(byQueueLen);
realtimePO.setStopTimes(Math.round(byStoppingTimes / dataList.size() * 100.0) / 100.0); realtimePO.setStopTimes(Math.round(byStoppingTimes / dataList.size() * 100.0) / 100.0);
realtimePO.setDelayTime( (int) Math.round(wDelay / dataList.size() * 100.0) / 100); realtimePO.setDelayTime((int) Math.round(wDelay / dataList.size() * 100.0) / 100);
realtimePO.setSturation(1.0); realtimePO.setSturation(1.0);
//realtimePO.setVehheadDist(); //realtimePO.setVehheadDist();
realtimePO.setVehheadTime(Math.round(headway / dataList.size() * 100.0) / 100.0); realtimePO.setVehheadTime(Math.round(headway / dataList.size() * 100.0) / 100.0);
......
package com.wanji.indicators.task.citybrainreceive.lanestatistics;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.entity.ForeignDevicePO;
import com.wanji.indicators.task.citybrain.vo.LaneStatisticsDataVO;
import com.wanji.indicators.task.citybrain.vo.LaneStatisticsDetail;
import com.wanji.indicators.task.citybrainreceive.CommonReceiveModel;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import net.wanji.databus.po.CrossTurnDataRealtimePO;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2024/07/19 14:10
*/
@Service
@Slf4j
public class LaneStatisticsReceive2TurnDataMapVersion2 implements CoFlatMapFunction<String, Map<String, List<ForeignDevicePO>>, String> {
private static final Map<String, List<ForeignDevicePO>> foreignDeviceMap = new HashMap<>();
@Override
public void flatMap1(String value, Collector<String> out) throws Exception {
try {
ObjectMapper mapper = JacksonUtils.getInstance();
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
List<CrossTurnDataRealtimePO> results = new ArrayList<>();
String headers = model.getHeaders();
String body = model.getBody();
if (StringUtils.isNotBlank(headers)
&& (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))
|| !body.contains("data_list")) {
return;
}
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {});
if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) {
for (LaneStatisticsDataVO inVo : laneStatisticsDataVOS) {
String deviceNo = inVo.getDeviceNo();
String startTime = inVo.getData_time();
Date dateTime = new Date(Long.parseLong(startTime));
String dir = inVo.getSectionNo();
List<LaneStatisticsDetail> dataList = inVo.getData_list();
if (!CollectionUtils.isEmpty(dataList)) {
ForeignDevicePO foreignDevicePO = getForeignDevicePO(deviceNo, dir, foreignDeviceMap);
if (Objects.isNull(foreignDevicePO)) {
continue;
}
String crossId = foreignDevicePO.getCrossId();
List<LaneStatisticsDetail> leftList = dataList.stream().filter(po -> po.getDwLeftVehNum() > 0).collect(Collectors.toList());
List<LaneStatisticsDetail> straightList = dataList.stream().filter(po -> po.getDwStraightVehNum() > 0).collect(Collectors.toList());
List<LaneStatisticsDetail> rightList = dataList.stream().filter(po -> po.getDwRightVehNum() > 0).collect(Collectors.toList());
CrossTurnDataRealtimePO leftPO = getCrossTurnDataRealtimePO(leftList, dir, "l", dateTime, crossId);
CrossTurnDataRealtimePO straightPO = getCrossTurnDataRealtimePO(straightList, dir, "s", dateTime, crossId);
CrossTurnDataRealtimePO rightPO = getCrossTurnDataRealtimePO(rightList, dir, "r", dateTime, crossId);
results.add(leftPO);
results.add(straightPO);
results.add(rightPO);
}
}
}
if (!CollectionUtils.isEmpty(results)) {
Map<String, List<CrossTurnDataRealtimePO>> map = new HashMap<>();
map.put(Constant.REAL_TIME_TURN_DATA_KEY, results);
out.collect(mapper.writeValueAsString(map));
}
} catch (Exception e) {
log.error("LaneStatisticsReceive2TurnDataMap-异常:", e);
throw new RuntimeException(e);
}
}
private CrossTurnDataRealtimePO getCrossTurnDataRealtimePO(List<LaneStatisticsDetail> dataList, String dir, String turn, Date startTime, String crossId) {
CrossTurnDataRealtimePO realtimePO = new CrossTurnDataRealtimePO();
if (!CollectionUtils.isEmpty(dataList)) {
// 不区分车型机动车总流量
int volumeSum = 0;
// 行人流量
int volumePerson = 0;
// 非机动车流量
int volumeCarNon = 0;
// 小车流量
int volumeCarSmall = 0;
// 中车流量
int volumeCarMiddle = 0;
// 大车流量
int volumeCarBig = 0;
// 平均速度(km/h)
double speedAvg = 0;
// 时间占有率(%)
double occupancy = 0;
// 平均车头时距(s)
double headway = 0;
// 平均车身间距(s)
double gap = 0;
// 85位速度(km/h)
double speed85 = 0;
// 车流量总和
int wArrivalFlow = 0;
// 平均空间占有率
double fSpaceOccupyRation = 0;
// 堵塞状态下最大排队长度
double byQueueLen = 0;
// 平均延误
int wDelay = 0;
// 平均停车次数
double byStoppingTimes = 0;
// 左转总流量
int dwLeftVehNum = 0;
// 右转总流量
int dwRightVehNum = 0;
// 直行总流量
int dwStraightVehNum = 0;
for (LaneStatisticsDetail laneStatisticsDetail : dataList) {
volumeSum += laneStatisticsDetail.getVolumeSum();
volumePerson += laneStatisticsDetail.getVolumePerson();
volumeCarNon += laneStatisticsDetail.getVolumeCarNon();
volumeCarSmall += laneStatisticsDetail.getVolumeCarSmall();
volumeCarMiddle += laneStatisticsDetail.getVolumeCarMiddle();
volumeCarBig += laneStatisticsDetail.getVolumeCarBig();
speedAvg += Double.parseDouble(laneStatisticsDetail.getSpeedAvg());
occupancy += Double.parseDouble(laneStatisticsDetail.getOccupancy());
headway += Double.parseDouble(laneStatisticsDetail.getHeadway());
gap += Double.parseDouble(laneStatisticsDetail.getGap());
speed85 += Double.parseDouble(laneStatisticsDetail.getSpeed85());
wArrivalFlow += Integer.parseInt(laneStatisticsDetail.getWArrivalFlow());
fSpaceOccupyRation += Double.parseDouble(laneStatisticsDetail.getFSpaceOccupyRation());
Double currrentQueue = Double.parseDouble(laneStatisticsDetail.getByQueueLen());
byQueueLen = byQueueLen >= currrentQueue ? byQueueLen : currrentQueue;
wDelay += (int) Double.parseDouble(StringUtils.isBlank(laneStatisticsDetail.getWDelay()) ? "0" : laneStatisticsDetail.getWDelay());
byStoppingTimes += Double.parseDouble(laneStatisticsDetail.getByStoppingTimes());
dwLeftVehNum += laneStatisticsDetail.getDwLeftVehNum();
dwRightVehNum += laneStatisticsDetail.getDwRightVehNum();
dwStraightVehNum += laneStatisticsDetail.getDwStraightVehNum();
}
String id = String.join("_", crossId, dir, turn);
realtimePO.setId(id);
realtimePO.setTurnType(turn);
realtimePO.setInDir(Integer.valueOf(dir));
realtimePO.setCrossId(crossId);
realtimePO.setStartTime(startTime);
realtimePO.setFlow(dwLeftVehNum + dwRightVehNum + dwStraightVehNum);
realtimePO.setStatus(1);
// 默认自由流速度60km/h
if (speedAvg > 0) {
realtimePO.setTrafficIndex(Math.round(60 / speedAvg / dataList.size() * 100.0) / 100.0);
} else {
realtimePO.setTrafficIndex(1.0);
}
realtimePO.setSpeed(Math.round(speedAvg / dataList.size() * 100.0) / 100.0);
realtimePO.setQueueLength(byQueueLen);
realtimePO.setStopTimes(Math.round(byStoppingTimes / dataList.size() * 100.0) / 100.0);
realtimePO.setDelayTime((int) Math.round(wDelay / dataList.size() * 100.0) / 100);
realtimePO.setSturation(1.0);
//realtimePO.setVehheadDist();
realtimePO.setVehheadTime(Math.round(headway / dataList.size() * 100.0) / 100.0);
realtimePO.setQuality(1.0);
//realtimePO.setNoStopRate();
realtimePO.setBatchTime((int) startTime.getTime());
realtimePO.setGmtCreate(new Date());
realtimePO.setGmtModified(new Date());
}
return realtimePO;
}
private ForeignDevicePO getForeignDevicePO(String deviceNo, String dir, Map<String, List<ForeignDevicePO>> foreignDeviceMap) {
if (!foreignDeviceMap.isEmpty()) {
List<ForeignDevicePO> foreignDevicePOS = foreignDeviceMap.get(deviceNo);
if (!CollectionUtils.isEmpty(foreignDevicePOS)) {
for (ForeignDevicePO foreignDevicePO : foreignDevicePOS) {
Integer type = foreignDevicePO.getType();
String currentDir = String.valueOf(foreignDevicePO.getDir());
String laneId = foreignDevicePO.getLaneId();
String sort = laneId.substring(laneId.length() -1);
if (Objects.equals(2, type) && StringUtils.equalsIgnoreCase(dir, currentDir)) {
return foreignDevicePO;
}
}
}
}
return null;
}
@Override
public void flatMap2(Map<String, List<ForeignDevicePO>> value, Collector<String> out) throws Exception {
foreignDeviceMap.putAll(value);
}
}
...@@ -49,9 +49,9 @@ public class LaneStatisticsReceiveMain { ...@@ -49,9 +49,9 @@ public class LaneStatisticsReceiveMain {
dealLaneRealTimeData(statisticsStream, env); dealLaneRealTimeData(statisticsStream, env);
// 车道数据中不能处理转向,暂不处理 数据不匹配,统计数据没办法计算 // 车道数据中不能处理转向,暂不处理 数据不匹配,统计数据没办法计算
dealTurnRealTimeData(statisticsStream); dealTurnRealTimeData(statisticsStream, env);
dealDirRealTimeData(statisticsStream); dealDirRealTimeData(statisticsStream, env);
dealCrossRealTimeData(env, statisticsStream, spilloverStream); dealCrossRealTimeData(statisticsStream, spilloverStream, env);
env.execute("接收城市大脑统计数据发送到kafka准备入库"); env.execute("接收城市大脑统计数据发送到kafka准备入库");
} catch (Exception e) { } catch (Exception e) {
...@@ -77,54 +77,82 @@ public class LaneStatisticsReceiveMain { ...@@ -77,54 +77,82 @@ public class LaneStatisticsReceiveMain {
.build(); .build();
} }
private static void dealCrossRealTimeData(StreamExecutionEnvironment env, DataStream<String> staticticsStream, DataStream<String> spilloverStream) { private static void dealCrossRealTimeData(DataStream<String> staticticsStream, DataStream<String> spilloverStream, StreamExecutionEnvironment env) throws Exception {
SingleOutputStreamOperator<List<SpilloverEventVO>> spilloverVoStream = spilloverStream.map(new SpilloverEventTempReceiveMap()); try {
SingleOutputStreamOperator<List<SpilloverEventVO>> spilloverVoStream = spilloverStream.map(new SpilloverEventTempReceiveMap());
SingleOutputStreamOperator<String> laneRealtimeStream = staticticsStream
.connect(spilloverVoStream.broadcast())
.flatMap(new LaneStatisticsReceive2CrossDataCoFlatMap())
.name("接收城市大脑统计数据-JsonToObject");
// 同周期数据一致,发送Kafka,准备入库 SingleOutputStreamOperator<String> laneRealtimeStream = staticticsStream
KafkaSink<String> sink = getStringKafkaSink(Constant.REAL_TIME_CROSS_DATA_TOPIC); .connect(spilloverVoStream.broadcast())
laneRealtimeStream.sinkTo(sink).name("接收城市大脑统计数据转化路口实时数据发送至kafka").setParallelism(1); .flatMap(new LaneStatisticsReceive2CrossDataCoFlatMap())
} .name("接收城市大脑统计数据-JsonToObject");
private static void dealDirRealTimeData(DataStream<String> stream) { SingleOutputStreamOperator<String> crossStream = laneRealtimeStream
SingleOutputStreamOperator<String> laneRealtimeStream = stream .connect(env.addSource(new ForeignDeviceInfoSource()).broadcast())
.map(new LaneStatisticsReceive2DirDataMap()) .flatMap(new LaneStatisticsReceive2CrossDataCoFlatMapVersion2())
.name("接收城市大脑统计数据-JsonToObject"); .name("接收城市大脑统计数据计算各个方向数据-JsonToObject");
// 同周期数据一致,发送Kafka,准备入库 // 同周期数据一致,发送Kafka,准备入库
KafkaSink<String> sink = getStringKafkaSink(Constant.REAL_TIME_DIR_DATA_TOPIC); KafkaSink<String> sink = getStringKafkaSink(Constant.REAL_TIME_CROSS_DATA_TOPIC);
laneRealtimeStream.sinkTo(sink).name("接收城市大脑统计数据转化方向实时数据发送至kafka").setParallelism(1); crossStream.sinkTo(sink).name("接收城市大脑统计数据转化路口实时数据发送至kafka").setParallelism(1);
} catch (Exception e) {
log.error("接收城市大脑统计数据转化路口实时数据发送至kafka-异常: ", e);
throw new RuntimeException(e);
}
} }
private static void dealTurnRealTimeData(DataStream<String> stream) { private static void dealDirRealTimeData(DataStream<String> stream, StreamExecutionEnvironment env) throws Exception {
SingleOutputStreamOperator<String> laneRealtimeStream = stream try {
.map(new LaneStatisticsReceive2TurnDataMap()) SingleOutputStreamOperator<String> laneRealtimeStream = stream
.name("接收城市大脑统计数据-JsonToObject"); .connect(env.addSource(new ForeignDeviceInfoSource()).broadcast())
.flatMap(new LaneStatisticsReceive2DirDataMapVersion2())
.name("接收城市大脑统计数据-JsonToObject");
// 同周期数据一致,发送Kafka,准备入库
KafkaSink<String> sink = getStringKafkaSink(Constant.REAL_TIME_DIR_DATA_TOPIC);
laneRealtimeStream.sinkTo(sink).name("接收城市大脑统计数据转化方向实时数据发送至kafka").setParallelism(1);
} catch (Exception e) {
log.error("接收城市大脑统计数据转化方向实时数据发送至kafka-异常: ", e);
throw new RuntimeException(e);
}
}
// 同周期数据一致,发送Kafka,准备入库 private static void dealTurnRealTimeData(DataStream<String> stream, StreamExecutionEnvironment env) throws Exception {
KafkaSink<String> sink = getStringKafkaSink(Constant.REAL_TIME_TURN_DATA_TOPIC); try {
laneRealtimeStream.sinkTo(sink).name("接收城市大脑统计数据转化转向实时数据发送至kafka").setParallelism(1); SingleOutputStreamOperator<String> laneRealtimeStream = stream
.connect(env.addSource(new ForeignDeviceInfoSource()).broadcast())
.flatMap(new LaneStatisticsReceive2TurnDataMapVersion2())
.name("接收城市大脑统计数据-JsonToObject");
// 同周期数据一致,发送Kafka,准备入库
KafkaSink<String> sink = getStringKafkaSink(Constant.REAL_TIME_TURN_DATA_TOPIC);
laneRealtimeStream.sinkTo(sink).name("接收城市大脑统计数据转化转向实时数据发送至kafka").setParallelism(1);
} catch (Exception e) {
log.error("接收城市大脑统计数据转化转向实时数据发送至kafka-异常:", e);
throw new RuntimeException(e);
}
} }
private static void dealLaneRealTimeData(DataStream<String> stream, StreamExecutionEnvironment env) { private static void dealLaneRealTimeData(DataStream<String> stream, StreamExecutionEnvironment env) throws Exception {
// todo 添加溢出事件,将溢出状态写入,默认最小溢出指数 try {
SingleOutputStreamOperator<String> laneRealtimeStream = stream // todo 添加溢出事件,将溢出状态写入,默认最小溢出指数
.map(new LaneStatisticsReceive2LaneDataMap()) //SingleOutputStreamOperator<String> laneRealtimeStream = stream
.name("接收城市大脑统计数据-JsonToObject"); // .map(new LaneStatisticsReceive2LaneDataMap())
// todo 修改车道入库版本2 // .name("接收城市大脑统计数据-JsonToObject");
SingleOutputStreamOperator<String> name = stream.connect(env.addSource(new ForeignDeviceInfoSource()).broadcast()) // todo 修改车道入库版本2
.flatMap(new LaneStatisticsReceive2LaneDataMapVersion2()) SingleOutputStreamOperator<String> laneRealtimeStream = stream
.setParallelism(1) .connect(env.addSource(new ForeignDeviceInfoSource()).broadcast())
.name("车道数据处理路口车道编号"); .flatMap(new LaneStatisticsReceive2LaneDataMapVersion2())
.setParallelism(1)
// 同周期数据一致,发送Kafka,准备入库 .name("车道数据处理路口车道编号");
KafkaSink<String> sink = getStringKafkaSink(Constant.REAL_TIME_LANE_DATA_TOPIC);
laneRealtimeStream.sinkTo(sink).name("接收城市大脑统计数据转化车道实时数据发送至kafka").setParallelism(1); // 同周期数据一致,发送Kafka,准备入库
KafkaSink<String> sink = getStringKafkaSink(Constant.REAL_TIME_LANE_DATA_TOPIC);
laneRealtimeStream.sinkTo(sink).name("接收城市大脑统计数据转化车道实时数据发送至kafka").setParallelism(1);
} catch (Exception e) {
log.error("接收城市大脑统计数据转化车道实时数据发送至kafka-异常:", e);
throw new RuntimeException(e);
}
} }
private static KafkaSink<String> getStringKafkaSink(String realTimeTurnDataTopic) { private static KafkaSink<String> getStringKafkaSink(String realTimeTurnDataTopic) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment