Commit d4287251 authored by duanruiming's avatar duanruiming

Merge remote-tracking branch 'origin/master'

parents aa67a4c4 ef4813aa
...@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; ...@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
...@@ -103,6 +102,7 @@ public class MatchResultDataMain { ...@@ -103,6 +102,7 @@ public class MatchResultDataMain {
List<TargetInfoVO> targetInfoVOList = new ArrayList<>(); List<TargetInfoVO> targetInfoVOList = new ArrayList<>();
String orgCode = frameModel.getOrgCode(); String orgCode = frameModel.getOrgCode();
Long globalTimeStamp = frameModel.getGlobalTimeStamp();
for (CarTrackModel participant : participantList) { for (CarTrackModel participant : participantList) {
TargetInfoVO targetInfoVO = new TargetInfoVO(); TargetInfoVO targetInfoVO = new TargetInfoVO();
...@@ -123,7 +123,7 @@ public class MatchResultDataMain { ...@@ -123,7 +123,7 @@ public class MatchResultDataMain {
targetInfoVO.setLaneNo(Integer.parseInt(s)); targetInfoVO.setLaneNo(Integer.parseInt(s));
} }
targetInfoVO.setData_time(new Date().getTime()); targetInfoVO.setData_time(globalTimeStamp);
targetInfoVO.setDeviceNo(orgCode); targetInfoVO.setDeviceNo(orgCode);
targetInfoVO.setId(participant.getId()); targetInfoVO.setId(participant.getId());
targetInfoVO.setCarType(CarTypeConverter.convertWanjiToBrain(participant.getOriginalType())); targetInfoVO.setCarType(CarTypeConverter.convertWanjiToBrain(participant.getOriginalType()));
......
...@@ -12,14 +12,15 @@ import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDe ...@@ -12,14 +12,15 @@ import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDe
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
...@@ -63,27 +64,27 @@ public class TargetInfoReceiveMain { ...@@ -63,27 +64,27 @@ public class TargetInfoReceiveMain {
SingleOutputStreamOperator<TargetInfoReceiveModel> frameModelStream = stream SingleOutputStreamOperator<TargetInfoReceiveModel> frameModelStream = stream
.map(new TargetInfoReceiveMap()) .map(new TargetInfoReceiveMap())
.name("接收城市大脑轨迹数据-JsonToObject") .name("接收城市大脑轨迹数据-JsonToObject")
// .assignTimestampsAndWatermarks( .assignTimestampsAndWatermarks(
// WatermarkStrategy WatermarkStrategy
// .<TargetInfoReceiveModel>forBoundedOutOfOrderness(Duration.ofMillis(50)) .<TargetInfoReceiveModel>forBoundedOutOfOrderness(Duration.ofSeconds(6))
// .withTimestampAssigner((element, recordTimestamp) .withTimestampAssigner((element, recordTimestamp)
// -> element.getBody().get(0).getData_time()) -> element.getBody().get(0).getData_time())
// ) )
; ;
SingleOutputStreamOperator<E1FrameMiniVO> resultStream = frameModelStream SingleOutputStreamOperator<E1FrameMiniVO> resultStream = frameModelStream
// .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100))) .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(100))) // .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.apply(new AllWindowFunction<TargetInfoReceiveModel, E1FrameMiniVO, TimeWindow>() { .process(new ProcessAllWindowFunction<TargetInfoReceiveModel, E1FrameMiniVO, TimeWindow>() {
@Override @Override
public void apply(TimeWindow window, Iterable<TargetInfoReceiveModel> values, public void process(Context context, Iterable<TargetInfoReceiveModel> elements, Collector<E1FrameMiniVO> out) throws Exception {
Collector<E1FrameMiniVO> out) throws Exception {
E1FrameMiniVO e1FrameMiniVO = new E1FrameMiniVO(); E1FrameMiniVO e1FrameMiniVO = new E1FrameMiniVO();
TimeWindow window = context.window();
long windowEnd = window.getEnd(); long windowEnd = window.getEnd();
Date date = new Date(windowEnd); Date date = new Date(windowEnd);
e1FrameMiniVO.setTimeStamp(date); e1FrameMiniVO.setTimeStamp(date);
e1FrameMiniVO.setGlobalTimeStamp(windowEnd); e1FrameMiniVO.setGlobalTimeStamp(windowEnd);
List<E1FrameParticipantMiniVO> participantMiniVOList = buildE1FrameParticipant(values); List<E1FrameParticipantMiniVO> participantMiniVOList = buildE1FrameParticipant(elements);
e1FrameMiniVO.setE1FrameParticipant(participantMiniVOList); e1FrameMiniVO.setE1FrameParticipant(participantMiniVOList);
e1FrameMiniVO.setOrgCode(""); e1FrameMiniVO.setOrgCode("");
e1FrameMiniVO.setParticipantNum(participantMiniVOList.size()); e1FrameMiniVO.setParticipantNum(participantMiniVOList.size());
......
...@@ -758,33 +758,33 @@ public class CarTrajectoryIndexMain { ...@@ -758,33 +758,33 @@ public class CarTrajectoryIndexMain {
.name("将失衡事件写入mysql"); .name("将失衡事件写入mysql");
SingleOutputStreamOperator<LineCongestion> lineCongestion = splitCongestionStream // SingleOutputStreamOperator<LineCongestion> lineCongestion = splitCongestionStream
.keyBy(value -> "constantKey") // .keyBy(value -> "constantKey")
.process(new LineCongestionFunction()) // .process(new LineCongestionFunction())
.setParallelism(1) // .setParallelism(1)
.name("计算干线拥堵事件"); // .name("计算干线拥堵事件");
//
SingleOutputStreamOperator<LineCongestion> lineCongestionDuration = lineCongestion // SingleOutputStreamOperator<LineCongestion> lineCongestionDuration = lineCongestion
.keyBy(value -> "constantKey") // .keyBy(value -> "constantKey")
.process(new LineCongestionDuration()) // .process(new LineCongestionDuration())
.setParallelism(1) // .setParallelism(1)
.name("给干线拥堵事件添加开始时间和持续时长"); // .name("给干线拥堵事件添加开始时间和持续时长");
//
SingleOutputStreamOperator<String> lineCongestionResult = lineCongestionDuration // SingleOutputStreamOperator<String> lineCongestionResult = lineCongestionDuration
.flatMap(new FlatMapFunction<LineCongestion, String>() { // .flatMap(new FlatMapFunction<LineCongestion, String>() {
@Override // @Override
public void flatMap(LineCongestion result, Collector<String> collector) { // public void flatMap(LineCongestion result, Collector<String> collector) {
String value = JSONObject.toJSONString(result); // String value = JSONObject.toJSONString(result);
collector.collect(value); // collector.collect(value);
} // }
}) // })
.setParallelism(1).name("将干线拥堵事件转成json字符串"); // .setParallelism(1).name("将干线拥堵事件转成json字符串");
//
lineCongestionResult.addSink(new FlinkKafkaProducer<String>( // lineCongestionResult.addSink(new FlinkKafkaProducer<String>(
kafkaServerAddress, // kafkaServerAddress,
lineSinkTopic, // lineSinkTopic,
new SimpleStringSchema() // new SimpleStringSchema()
)).setParallelism(1).name("将将干线拥堵事件发送至kafka"); // )).setParallelism(1).name("将干线拥堵事件发送至kafka");
env.execute("计算拥堵-溢出-失衡-死锁-相位空放"); env.execute("计算拥堵-溢出-失衡-死锁-相位空放");
} catch (Exception e) { } catch (Exception e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment