Commit 2046c4a3 authored by zhoushiguang's avatar zhoushiguang

car route path optimize

parent 4b8b40c3
......@@ -93,7 +93,7 @@ public class CarRoutePathMainNew {
// ElasticSearchSinkUtil.addSink(esAddresses, 2, splitLineStream, elasticsearchSinkFunction);
PartitionRoutePathElasticSearchSinkFunction elasticsearchSinkFunction1 = new PartitionRoutePathElasticSearchSinkFunction(indexName);
ElasticSearchSinkUtil.addSink(esAddresses, 2, splitLineStream, elasticsearchSinkFunction1);
ElasticSearchSinkUtil.addSink("Sink Route Path",esAddresses, 5, splitLineStream, elasticsearchSinkFunction1);
} catch (MalformedURLException e) {
e.printStackTrace();
......
......@@ -112,7 +112,7 @@ public class TrackStoreMainNew {
PartitionBatchTrackElasticSearchSinkFunction elasticsearchSinkFunction1 = new PartitionBatchTrackElasticSearchSinkFunction(indexName);
ElasticSearchSinkUtil.addSink(esAddresses, 2, groupByPlateStream, elasticsearchSinkFunction1);
ElasticSearchSinkUtil.addSink("Sink Track Point",esAddresses, 5, groupByPlateStream, elasticsearchSinkFunction1);
} catch (MalformedURLException e) {
......
......@@ -34,7 +34,7 @@ public class SplitRoutePathFunction extends KeyedProcessFunction<Tuple3<String,
public void open(Configuration parameters) throws Exception {
PropertiesHelper instance = PropertiesHelper.getInstance();
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(12))//状态过期时间
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))//状态过期时间
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.cleanupFullSnapshot()
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)//不返回过期值
......@@ -148,6 +148,26 @@ public class SplitRoutePathFunction extends KeyedProcessFunction<Tuple3<String,
int interval = timeOut * 60 * 1000;
long fireTime = context.timestamp() - context.timestamp() % 60000 + interval;
context.timerService().registerEventTimeTimer(fireTime);
/**************************过期key-start**********************************************/
Set<Tuple3<String, Integer, String>> expireKeys = new HashSet<>();
Iterator<Map.Entry<Tuple3<String, Integer, String>, RoutePathCarTrackModel>> iterator = mapStateOfLineSplit.iterator();
while (iterator.hasNext()) {
Map.Entry<Tuple3<String, Integer, String>, RoutePathCarTrackModel> entry = iterator.next();
RoutePathCarTrackModel model = entry.getValue();
long timestamp = model.getStartGlobalTimeStamp();
if (System.currentTimeMillis() - timestamp > timeOut * 1000) {
//缓存超过10分钟
expireKeys.add(entry.getKey());
}
}
if (!expireKeys.isEmpty()){
log.info("Car-Route 过期key:{}",expireKeys);
for (Tuple3<String, Integer, String> key : expireKeys) {
mapStateOfLineSplit.remove(key);
lineNoState.remove(key);
}
}
/**************************过期key-end**********************************************/
}
@Override
......@@ -185,6 +205,7 @@ public class SplitRoutePathFunction extends KeyedProcessFunction<Tuple3<String,
}
for (Tuple3<String, Integer, String> key : expireKeys) {
mapStateOfLineSplit.remove(key);
lineNoState.remove(key);
}
ctx.timerService().deleteProcessingTimeTimer(timestamp);
}
......
......@@ -97,7 +97,9 @@ public class ElasticSearchSinkUtil {
index.mapping(mappingBuilder);
}
//设置嵌套窗口
index.settings(Settings.builder().put("index.max_inner_result_window",2000000000));
index.settings(Settings.builder()
.put("index.max_inner_result_window",2000000000)
);
ElasticSearchSinkUtil.client.indices().create(index, RequestOptions.DEFAULT);
} catch (IOException e) {
......@@ -154,7 +156,7 @@ public class ElasticSearchSinkUtil {
* @param func
* @param <T>
*/
public static <T> void addSink(List<HttpHost> hosts, int parallelism,
public static <T> void addSink(String sinkName,List<HttpHost> hosts, int parallelism,
SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func) {
ElasticsearchSink.Builder<T> esBuilder = new ElasticsearchSink.Builder<>(hosts, func);
......@@ -172,7 +174,7 @@ public class ElasticSearchSinkUtil {
// 设置失败处理
esBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
data.addSink(esBuilder.build()).setParallelism(parallelism).name("BatchTrack Sink To ES");
data.addSink(esBuilder.build()).setParallelism(parallelism).name(sinkName);
}
public static RestClientFactory restClientFactory = new RestClientFactory() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment