Commit 6b2fdf0c authored by zhoushiguang's avatar zhoushiguang

行人指标

parent 57b86cc7
......@@ -139,8 +139,8 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
}
if (isCanWalk) {
log.info("行人绿灯长:rid={},行人绿灯剩余时长={}s,行人绿灯总时长={}s,时间:{},当前灯态:{},上次灯态:{},方向:{}",
rid, greenRemainTimeState.get(rid), greenTotalTimeState.get(rid), DateUtil.toDateTime(globalTimeStamp, "yyyy-MM-dd HH:mm:ss.SSS"),nowLightColor,agoLightColor,lightDir);
// log.info("行人绿灯长:rid={},行人绿灯剩余时长={}s,行人绿灯总时长={}s,时间:{},当前灯态:{},上次灯态:{},方向:{}",
// rid, greenRemainTimeState.get(rid), greenTotalTimeState.get(rid), DateUtil.toDateTime(globalTimeStamp, "yyyy-MM-dd HH:mm:ss.SSS"),nowLightColor,agoLightColor,lightDir);
if (runList != null && !runList.isEmpty()) {
//按rid分组
......
......@@ -48,20 +48,20 @@ public class PedestrianMain {
String topic = properties.getProperty("consumer.topic");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(10 * 60 * 1000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(10);
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri")));
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
env.setParallelism(1);
// env.enableCheckpointing(10 * 60 * 1000);
// env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(10);
// env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri")));
// env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
//============================================================================================//
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperty("commit.offsets.on.checkpoint", "false")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic)
.setGroupId(GROUP_ID + UUID.randomUUID().toString())
......@@ -75,7 +75,7 @@ public class PedestrianMain {
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperty("commit.offsets.on.checkpoint", "false")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics("cross_lights_status")
.setGroupId(GROUP_ID + UUID.randomUUID().toString())
......
......@@ -510,10 +510,10 @@ public class GeomsConvertUtil {
public static boolean withinGeo(String poing,String geometry) {
try {
WKTReader reader = new WKTReader(geometryFactory);
Point point = (Point) reader.read(poing);
Polygon polygon = (Polygon) reader.read(geometry);
Point point = createPoint(poing);
Geometry polygon = genGeometry(geometry);
return point.within(polygon);
}catch (ParseException e){
}catch (Exception e){
e.printStackTrace();
}
return false;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment