Commit d2bb982a authored by duanruiming's avatar duanruiming

[update] 交通大脑-统计数据优化车道编号

parent 76aa4b5f
package com.wanji.indicators.source;
import com.wanji.indicators.entity.ForeignDevicePO;
import com.wanji.indicators.mapper.ForeignDeviceMapper;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author duanruiming
* @date 2024/07/17 11:22
*/
@Service
public class ForeignDeviceInfoSource extends RichParallelSourceFunction<Map<String, List<ForeignDevicePO>>> {
private static final Logger log = LoggerFactory.getLogger(ForeignDeviceInfoSource.class);
private volatile boolean running = true;
Properties properties = null;
private ForeignDeviceMapper foreignDeviceMapper;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ApplicationContext beanConf = new ClassPathXmlApplicationContext("spring-container.xml");
foreignDeviceMapper = beanConf.getBean(ForeignDeviceMapper.class);
PropertiesHelper instance = PropertiesHelper.getInstance();
properties = instance.getProperties();
}
@Override
public void run(SourceContext<Map<String, List<ForeignDevicePO>>> sourceContext) throws Exception {
try {
while (running) {
List<ForeignDevicePO> foreignDevicePOS = foreignDeviceMapper.selectAllDevices();
if (!CollectionUtils.isEmpty(foreignDevicePOS)) {
Map<String, List<ForeignDevicePO>> map = foreignDevicePOS.stream().collect(Collectors.groupingBy(ForeignDevicePO::getCrossId));
sourceContext.collect(map);
}
TimeUnit.MINUTES.sleep(1);
}
} catch (Exception e) {
log.error("查询外部设备表信息异常: " + e.getMessage(), e);
}
}
@Override
public void cancel() {
running = false;
}
}
package com.wanji.indicators.task.citybrainreceive.lanestatistics;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.entity.ForeignDevicePO;
import com.wanji.indicators.task.citybrain.vo.LaneStatisticsDataVO;
import com.wanji.indicators.task.citybrain.vo.LaneStatisticsDetail;
import com.wanji.indicators.task.citybrainreceive.CommonReceiveModel;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import net.wanji.databus.po.CrossLaneDataRealTimePO;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.*;
/**
* @author duanruiming
* @date 2024/07/17 13:51
*/
@Service
@Slf4j
public class LaneStatisticsReceive2LaneDataMapVersion2 implements CoFlatMapFunction<String, Map<String, List<ForeignDevicePO>>, String> {
private static final Map<String, List<ForeignDevicePO>> foreignDeviceMap = new HashMap<>();
@Override
public void flatMap1(String value, Collector<String> out) throws Exception {
try {
ObjectMapper mapper = JacksonUtils.getInstance();
CommonReceiveModel model = mapper.readValue(value, CommonReceiveModel.class);
List<CrossLaneDataRealTimePO> results = new ArrayList<>();
String headers = model.getHeaders();
String body = model.getBody();
if (StringUtils.isNotBlank(headers) && (headers.contains(Constant.WAN_JI_NAME) || StringUtils.isBlank(body))) {
return;
}
List<LaneStatisticsDataVO> laneStatisticsDataVOS = mapper.readValue(body, new TypeReference<List<LaneStatisticsDataVO>>() {});
if (!CollectionUtils.isEmpty(laneStatisticsDataVOS)) {
for (LaneStatisticsDataVO inVo : laneStatisticsDataVOS) {
String crossId = inVo.getDeviceNo();
String startTime = inVo.getData_time();
Date dateTime = new Date(Long.parseLong(startTime));
String dir = inVo.getSectionNo();
List<LaneStatisticsDetail> dataList = inVo.getData_list();
if (!CollectionUtils.isEmpty(dataList)) {
for (LaneStatisticsDetail detail : dataList) {
String laneNo = detail.getLaneNo();
CrossLaneDataRealTimePO po = new CrossLaneDataRealTimePO();
po.setId(String.join(Constant.MIDDLE_LINE, crossId, dir, laneNo));
po.setCrossId(crossId);
po.setStartTime(dateTime);
String wArrivalFlow = detail.getWArrivalFlow();
wArrivalFlow = StringUtils.isBlank(wArrivalFlow) ? "0" : wArrivalFlow;
po.setFlow((int) Double.parseDouble(wArrivalFlow));
po.setSpeed(Double.parseDouble(detail.getSpeedAvg()));
po.setInSpeed(Double.parseDouble(detail.getSpeedAvg()));
po.setOutSpeed(Double.parseDouble(detail.getSpeedAvg()));
po.setQueueLength(Double.parseDouble(detail.getByQueueLen()));
po.setStopTimes(Double.parseDouble(detail.getByStoppingTimes()));
String wDelay = detail.getWDelay();
wDelay = StringUtils.isNotBlank(wDelay) ? wDelay : "0";
po.setDelayTime((int) Double.parseDouble(wDelay));
po.setCapacity((int) Double.parseDouble(wArrivalFlow));
po.setVehheadDist(Double.parseDouble(detail.getGap()));
po.setVehheadTime(Double.parseDouble(detail.getHeadway()));
po.setQuality(0.6);
po.setNoStopRate(0.0);
po.setOneStopRate(0.0);
po.setTwoStopRate(0.0);
po.setThreeStopRate(0.0);
po.setSturation(0.7);
po.setBatchTime(Integer.parseInt(startTime.substring(0, 10)));
po.setEffusionRate(0.0);
po.setGreenLightEfficiency(0.0);
results.add(po);
}
}
}
}
Map<String, List<CrossLaneDataRealTimePO>> map = new HashMap<>();
map.put(Constant.REAL_TIME_LANE_DATA_KEY, results);
out.collect(mapper.writeValueAsString(map));
} catch (Exception e) {
log.error("LaneStatisticsReceive2LaneDataMap ", e);
throw new RuntimeException(e);
}
}
@Override
public void flatMap2(Map<String, List<ForeignDevicePO>> value, Collector<String> out) throws Exception {
foreignDeviceMap.putAll(value);
}
}
package com.wanji.indicators.task.citybrainreceive.lanestatistics;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.source.ForeignDeviceInfoSource;
import com.wanji.indicators.task.citybrain.vo.SpilloverEventVO;
import com.wanji.indicators.task.citybrainreceive.event.spillover.SpilloverEventTempReceiveMap;
import com.wanji.indicators.util.PropertiesHelper;
......@@ -46,7 +47,7 @@ public class LaneStatisticsReceiveMain {
DataStream<String> spilloverStream = env.fromSource(spilloverSource, WatermarkStrategy.noWatermarks(),
"city-brain-spillover-receive-data-source");
dealLaneRealTimeData(statisticsStream);
dealLaneRealTimeData(statisticsStream, env);
// 车道数据中不能处理转向,暂不处理
//dealTurnRealTimeData(statisticsStream);
dealDirRealTimeData(statisticsStream);
......@@ -110,11 +111,16 @@ public class LaneStatisticsReceiveMain {
laneRealtimeStream.sinkTo(sink).name("接收城市大脑统计数据转化转向实时数据发送至kafka").setParallelism(1);
}
private static void dealLaneRealTimeData(DataStream<String> stream) {
private static void dealLaneRealTimeData(DataStream<String> stream, StreamExecutionEnvironment env) {
// todo 添加溢出事件,将溢出状态写入,默认最小溢出指数
SingleOutputStreamOperator<String> laneRealtimeStream = stream
.map(new LaneStatisticsReceive2LaneDataMap())
.name("接收城市大脑统计数据-JsonToObject");
// todo 修改车道入库版本2
SingleOutputStreamOperator<String> name = stream.connect(env.addSource(new ForeignDeviceInfoSource()).broadcast())
.flatMap(new LaneStatisticsReceive2LaneDataMapVersion2())
.setParallelism(1)
.name("车道数据处理路口车道编号");
// 同周期数据一致,发送Kafka,准备入库
KafkaSink<String> sink = getStringKafkaSink(Constant.REAL_TIME_LANE_DATA_TOPIC);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment