Commit 04a97b9a authored by zhoushiguang's avatar zhoushiguang

groupid param config

parent 5d7cdb3a
...@@ -64,6 +64,11 @@ public class TrackIndicatorMain { ...@@ -64,6 +64,11 @@ public class TrackIndicatorMain {
String topic = properties.getProperty("consumer.topic"); String topic = properties.getProperty("consumer.topic");
String groupId = properties.getProperty("consumer.group.id");
if (StringUtils.isEmpty(groupId)) {
groupId = GROUP_ID + "_run_route_new";
}
//根据指定时间消费 //根据指定时间消费
long startTimeStampOffset = 0; long startTimeStampOffset = 0;
long endTimeStampOffset = 0; long endTimeStampOffset = 0;
...@@ -105,7 +110,7 @@ public class TrackIndicatorMain { ...@@ -105,7 +110,7 @@ public class TrackIndicatorMain {
.setProperty("commit.offsets.on.checkpoint", "true") .setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers")) .setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic) .setTopics(topic)
.setGroupId(GROUP_ID + "_run_route") .setGroupId(groupId)
// 从大于等于此时间戳开始的偏移量开始 // 从大于等于此时间戳开始的偏移量开始
.setStartingOffsets(OffsetsInitializer.timestamp(startTimeStampOffset)) .setStartingOffsets(OffsetsInitializer.timestamp(startTimeStampOffset))
.setBounded(OffsetsInitializer.timestamp(endTimeStampOffset)) .setBounded(OffsetsInitializer.timestamp(endTimeStampOffset))
...@@ -121,7 +126,7 @@ public class TrackIndicatorMain { ...@@ -121,7 +126,7 @@ public class TrackIndicatorMain {
// .setProperty("commit.offsets.on.checkpoint", "true") // .setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers")) .setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic) .setTopics(topic)
.setGroupId(GROUP_ID + "_run_route_new") .setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build(); .build();
......
...@@ -29,7 +29,7 @@ public class SplitRoutePathFunction extends KeyedProcessFunction<Tuple3<String, ...@@ -29,7 +29,7 @@ public class SplitRoutePathFunction extends KeyedProcessFunction<Tuple3<String,
//连续点间隔超时时间,单位分钟 //连续点间隔超时时间,单位分钟
private Integer timeOut=10; private Integer timeOut=5;
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
PropertiesHelper instance = PropertiesHelper.getInstance(); PropertiesHelper instance = PropertiesHelper.getInstance();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment