Commit a4e338c1 authored by zhoushiguang's avatar zhoushiguang

分类统计

parent 9f1caa5d
......@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.sink.JsonStringFileSink;
import com.wanji.indicators.task.area.service.func.LaneCarProcessWindow;
import com.wanji.indicators.task.area.service.key.ExactCategoryKeySelector;
import com.wanji.indicators.task.area.service.model.LaneCarCountAnalysisOutputModel;
import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j;
......@@ -75,7 +76,7 @@ public class LaneNonMotorAnalysisMainNew {
filterStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(o -> (o.getRoadnet().getLaneId()+"_"+o.getCategory()+"_"+o.getOriginalType()))
.keyBy(new ExactCategoryKeySelector())
.window(TumblingEventTimeWindows.of(Time.minutes(this.windowTime)))
.process(new LaneCarProcessWindow())
.name("车道非机动车数量计算-按车道分组统计");
......
......@@ -8,6 +8,7 @@ import com.wanji.indicators.util.ArithOfBigDecmial;
import com.wanji.indicators.util.CommonUtil;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
......@@ -25,7 +26,7 @@ import java.util.stream.StreamSupport;
* @date 2023/4/6
* @description
*/
public class LaneCarProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneCarCountAnalysisOutputModel, String, TimeWindow> {
public class LaneCarProcessWindow extends ProcessWindowFunction<CarTrackModel, LaneCarCountAnalysisOutputModel, Tuple5<String, String, String,Integer,Integer>, TimeWindow> {
private static final Logger log = LoggerFactory.getLogger(LaneCarProcessWindow.class);
......@@ -37,10 +38,9 @@ public class LaneCarProcessWindow extends ProcessWindowFunction<CarTrackModel, L
}
@Override
public void process(String key, Context context, Iterable<CarTrackModel> elements, Collector<LaneCarCountAnalysisOutputModel> out) throws Exception {
public void process(Tuple5<String, String, String,Integer,Integer> key, Context context, Iterable<CarTrackModel> elements, Collector<LaneCarCountAnalysisOutputModel> out) throws Exception {
long windowStartTs = context.window().getStart();
long windowEndTs = context.window().getEnd();
String[] sps = key.split("_");
List<CarTrackModel> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList());
//窗口期内车辆数量
......@@ -48,11 +48,11 @@ public class LaneCarProcessWindow extends ProcessWindowFunction<CarTrackModel, L
if (count > 0) {
LaneCarCountAnalysisOutputModel outModel = new LaneCarCountAnalysisOutputModel();
outModel.setCrossId(list.get(0).getRoadnet().getCrossId());
outModel.setRid(sps[0]);
outModel.setLaneId(list.get(0).getRoadnet().getLaneId());
outModel.setCategory(Integer.parseInt(sps[1]));
outModel.setOriginalType(Integer.parseInt(sps[2]));
outModel.setCrossId(key.f0);
outModel.setRid(key.f1);
outModel.setLaneId(key.f2);
outModel.setCategory(key.f3);
outModel.setOriginalType(key.f4);
outModel.setNumber(count);
outModel.setTimestamp(context.window().maxTimestamp());
outModel.setStartTimestamp(windowStartTs);
......
package com.wanji.indicators.task.area.service.key;
import com.wanji.indicators.model.CarTrackModel;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
public class ExactCategoryKeySelector implements KeySelector<CarTrackModel, Tuple5<String, String, String,Integer,Integer>> {
@Override
public Tuple5<String, String, String,Integer,Integer> getKey(CarTrackModel carTrackModel) throws Exception {
return new Tuple5<>(carTrackModel.getRoadnet().getCrossId(),carTrackModel.getRoadnet().getRid(),carTrackModel.getRoadnet().getLaneId(),carTrackModel.getCategory(),carTrackModel.getOriginalType());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment