Commit ee22abb0 authored by duanruiming's avatar duanruiming

[update] 优化实时数据处理

parent 5af7d9e6
...@@ -41,10 +41,23 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -41,10 +41,23 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
return realTimeCarInfoMap.get(topic); return realTimeCarInfoMap.get(topic);
} }
@KafkaListener(topics = {"JN040001LanePeriodicData"}) @KafkaListener(topics = {"cross_real_time_data"})
public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) { public void receiveCrossRealTimeData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
log.info("收到kafka消息,消息主题:" + record.topic()); realTimeCarInfoMap.put(record.topic(), String.valueOf(record.value()));
log.info("收到kafka消息,消息内容:" + record.value()); acknowledgment.acknowledge();
}
@KafkaListener(topics = {"cross_real_time_dir_data"})
public void receiveCrossRealTimeDirData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
realTimeCarInfoMap.put(record.topic(), String.valueOf(record.value()));
acknowledgment.acknowledge();
}
@KafkaListener(topics = {"cross_real_time_turn_data"})
public void receiveCrossRealTimeTurnData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
realTimeCarInfoMap.put(record.topic(), String.valueOf(record.value()));
acknowledgment.acknowledge();
}
@KafkaListener(topics = {"cross_real_time_lane_data"})
public void receiveCrossRealTimeLaneData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
realTimeCarInfoMap.put(record.topic(), String.valueOf(record.value())); realTimeCarInfoMap.put(record.topic(), String.valueOf(record.value()));
acknowledgment.acknowledge(); acknowledgment.acknowledge();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment