Commit 658cc7cd authored by zhoushiguang's avatar zhoushiguang

filter change

parent dc458b65
......@@ -11,6 +11,7 @@ import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.task.trafficstate.LaneTrafficStateMain;
import com.wanji.indicators.task.trafficstate.RidTrafficStateMain;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
......@@ -25,7 +26,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.Properties;
......@@ -63,6 +66,7 @@ public class TrackIndicatorMain {
DateTime currentDateTime = new DateTime();
DateTime dateTime = currentDateTime.withMillisOfDay(0).minusHours(-4);
long startTimeStamp = DateUtil.StringToMillis("2023-10-23 20:00:00");
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
......@@ -79,13 +83,11 @@ public class TrackIndicatorMain {
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setStartingOffsets(OffsetsInitializer.latest())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// .setStartingOffsets(OffsetsInitializer.latest())
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
// .setStartingOffsets(OffsetsInitializer.timestamp(startTimeStamp))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
......@@ -103,22 +105,13 @@ public class TrackIndicatorMain {
.setParallelism(1)
.name("轨迹帧数据解析-ToCarTrackModel");
SingleOutputStreamOperator<CarTrackModel> filterStream =
carTrackModelStream.
filter(new FilterFunction<CarTrackModel>() {
@Override
public boolean filter(CarTrackModel value) throws Exception {
return Objects.nonNull(value.getPicLicense()) && !Objects.equals("null",value.getPicLicense());
}
})
.setParallelism(1)
.name("轨迹分析-筛选带车牌的车辆");
try {
//批量轨迹封装
TrackStoreMainNew.init(env, indexName).run(carTrackModelStream);
//车辆行驶路径分析
CarRoutePathMainNew.init(env, indexName1).run(filterStream);
CarRoutePathMainNew.init(env, indexName1).run(carTrackModelStream);
//区域指数相关
AreaIndexAnalysisMainNew.init(env, "indexName",false).run(carTrackModelStream);
//行人非机动车数量对机动车速度影响
......
......@@ -9,6 +9,7 @@ import com.wanji.indicators.task.track.service.sink.RoutePathElasticSearchSinkFu
import com.wanji.indicators.util.ElasticSearchSinkUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.http.HttpHost;
......@@ -17,6 +18,7 @@ import java.io.Serializable;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
public class CarRoutePathMainNew {
......@@ -43,6 +45,20 @@ public class CarRoutePathMainNew {
return new CarRoutePathMainNew(env, indexName);
}
//本地调试设为true输出结果保存到文件
private boolean isDebug = false;
private CarRoutePathMainNew(StreamExecutionEnvironment env, String path, boolean isDebug) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = instance.getProperties();
this.path = path;
this.isDebug = isDebug;
}
public static CarRoutePathMainNew init(StreamExecutionEnvironment env, String path, boolean isDebug) {
return new CarRoutePathMainNew(env, path,isDebug);
}
public void run(SingleOutputStreamOperator<CarTrackModel> carTrackModelStream) {
......@@ -51,8 +67,20 @@ public class CarRoutePathMainNew {
//从配置文件中读取 es 的地址
try {
SingleOutputStreamOperator<CarTrackModel> filterStream =
carTrackModelStream.
filter(new FilterFunction<CarTrackModel>() {
@Override
public boolean filter(CarTrackModel value) throws Exception {
return Objects.nonNull(value.getPicLicense()) && !Objects.equals("null",value.getPicLicense());
}
})
.setParallelism(1)
.name("轨迹分析-筛选带车牌的车辆");
//给每个车辆轨迹点进行路径规划分段,连续两点超过半小时的重新生成一条新路线
SingleOutputStreamOperator<RoutePathCarTrackModel> splitLineStream = carTrackModelStream
SingleOutputStreamOperator<RoutePathCarTrackModel> splitLineStream = filterStream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
......
......@@ -2,13 +2,17 @@ package com.wanji.indicators.task.track.service;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.SingleCarTrackListModel;
import com.wanji.indicators.task.area.service.AreaIndexAnalysisMainNew;
import com.wanji.indicators.task.track.service.func.ProcessCarTrackByKey;
import com.wanji.indicators.task.track.service.key.KeySelectorGlobalIdPlateNo;
import com.wanji.indicators.task.track.service.sink.BatchTrackElasticSearchSinkFunction;
import com.wanji.indicators.task.track.service.sink.PartitionBatchTrackElasticSearchSinkFunction;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.ElasticSearchSinkUtil;
import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
......@@ -19,8 +23,10 @@ import java.io.Serializable;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
@Slf4j
public class TrackStoreMainNew {
private final StreamExecutionEnvironment env;
......@@ -50,11 +56,42 @@ public class TrackStoreMainNew {
return new TrackStoreMainNew(env,paramInfo);
}
//本地调试设为true输出结果保存到文件
private boolean isDebug = false;
private TrackStoreMainNew(StreamExecutionEnvironment env, String path, boolean isDebug) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = instance.getProperties();
this.path = path;
this.isDebug = isDebug;
}
public static TrackStoreMainNew init(StreamExecutionEnvironment env, String path, boolean isDebug) {
return new TrackStoreMainNew(env, path,isDebug);
}
public void run(SingleOutputStreamOperator<CarTrackModel> carTrackModelStream) {
//从配置文件中读取 es 的地址
try {
// SingleOutputStreamOperator<CarTrackModel> filterStream =
// carTrackModelStream.
// filter(new FilterFunction<CarTrackModel>() {
// @Override
// public boolean filter(CarTrackModel value) throws Exception {
// String date = "2023-10-19 23:59:59";
// long endTs = DateUtil.StringToMillis(date);
// long nowts = value.getGlobalTimeStamp();
// if(nowts<endTs){
// log.info("");
// }
// return nowts<endTs;
// }
// })
// .setParallelism(1)
// .name("轨迹分析-筛选带车牌的车辆");
//根据车辆上报时间每5分钟一个窗口累积
SingleOutputStreamOperator<SingleCarTrackListModel> groupByPlateStream =
carTrackModelStream.assignTimestampsAndWatermarks(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment