Commit 9c2c4a55 authored by zhoushiguang's avatar zhoushiguang

rocksdb

parent 4e2c8d78
...@@ -23,6 +23,8 @@ import org.apache.flink.api.java.utils.ParameterTool; ...@@ -23,6 +23,8 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
...@@ -57,14 +59,21 @@ public class CrossTrafficEventMain { ...@@ -57,14 +59,21 @@ public class CrossTrafficEventMain {
env.enableCheckpointing(2 * 60 * 1000); env.enableCheckpointing(2 * 60 * 1000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000); env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(10);
// EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
// env.setStateBackend(embeddedRocksDBStateBackend);
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri"))); env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri")));
//env.setStateBackend(new FsStateBackend(properties.getProperty("check.point.uri")));
env.setStateBackend(new FsStateBackend(properties.getProperty("check.point.uri")));
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 60 * 1000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 60 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
DateTime currentDateTime = new DateTime(); DateTime currentDateTime = new DateTime();
DateTime dateTime = currentDateTime.withMillisOfDay(0).minusHours(-4); DateTime dateTime = currentDateTime.withMillisOfDay(0).minusHours(-4);
long startTimeStamp = DateUtil.StringToMillis("2023-10-23 20:00:00"); long startTimeStamp = DateUtil.StringToMillis("2023-10-23 20:00:00");
...@@ -74,7 +83,7 @@ public class CrossTrafficEventMain { ...@@ -74,7 +83,7 @@ public class CrossTrafficEventMain {
.setProperty("enable.offset.commit", "true") .setProperty("enable.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000") .setProperty("auto.commit.interval.ms", "1000")
.setProperty("auto.offset.reset", "latest") .setProperty("auto.offset.reset", "latest")
// .setProperty("commit.offsets.on.checkpoint", "true") .setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers")) .setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic) .setTopics(topic)
.setGroupId(GROUP_ID+"-event_track") .setGroupId(GROUP_ID+"-event_track")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment