Commit 02a1ac3b authored by hanbing's avatar hanbing

主控迁移Flink-接收 E1 数据

parent 08bc3541
......@@ -423,6 +423,13 @@
</exclusions>
</dependency>
<!-- SparkJava Dependency -->
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.9.3</version>
</dependency>
</dependencies>
......
package com.wanji.indicators.task.maincontrol;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import static spark.Spark.port;
import static spark.Spark.post;
public class MainControlMain {
public static final Properties properties = PropertiesHelper.getInstance().getProperties();
public static final String originE1Topic;
public static final String kafkaServerAddress;
private static final PropertiesHelper instance = PropertiesHelper.getInstance();
static {
originE1Topic = properties.getProperty("origin.e1.topic");
kafkaServerAddress = properties.getProperty("bootstrap.servers");
}
public static void main(String[] args) throws Exception {
// 启动 HTTP 服务器
startHttpServer();
// 启动 Flink 作业
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取配置文件中的kafka消费topic
String sourceTopic = properties.getProperty("origin.e1.topic");
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(kafkaServerAddress)
.setTopics(sourceTopic)
.setGroupId(properties.getProperty("consumer.group.id") + "-origin-e1")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"origin-e1-topic");
stream.print();
env.execute("启动主控 Job");
}
private static void startHttpServer() {
port(19355);
// Kafka producer configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
post("/submitE1Frame", (request, response) -> {
String body = request.body();
producer.send(new ProducerRecord<>(originE1Topic, body));
response.status(200);
return "主控收到 E1 数据";
});
}
}
......@@ -175,9 +175,9 @@ device.cross=13NI00B5RM0:13NI00B5RM0,13NGH0B5RC0:13NGH0B5RC0,13NF80B5QN0:13NF80B
# \u57CE\u5E02\u5927\u8111 laneId \u4E0E\u4E07\u96C6 laneId \u6620\u5C04
brain.wanji.lane.id=test:test
jdbc.driver=com.mysql.cj.jdbc.Driver
jdbc.url=jdbc:mysql://localhost:3306/beihang?userUnicode=true&characterEncoding=utf-8
jdbc.username=root
jdbc.password=123456
\ No newline at end of file
jdbc.password=123456
origin.e1.topic=origin-e1
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment