Commit cc678e8d authored by duanruiming's avatar duanruiming

[add] kafka消费者处理器

parent 317b7d5c
...@@ -11,8 +11,8 @@ import org.springframework.kafka.support.Acknowledgment; ...@@ -11,8 +11,8 @@ import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* @author duanruiming * @author duanruiming
...@@ -26,7 +26,7 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -26,7 +26,7 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
* kafka数据缓存 * kafka数据缓存
* key:topic value:data * key:topic value:data
*/ */
private static final Map<String, String> lanePeriodicDataMap = new ConcurrentHashMap<>(); private static final Map<String, String> realTimeCarInfoMap = new HashMap<>();
/** /**
* 通过消息主题获取消息信息 * 通过消息主题获取消息信息
...@@ -35,17 +35,17 @@ public class ConsumerHandler implements KafkaListenerErrorHandler { ...@@ -35,17 +35,17 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
* @return * @return
*/ */
public String getTopicMessage(String topic) { public String getTopicMessage(String topic) {
if (lanePeriodicDataMap.isEmpty()) { if (realTimeCarInfoMap.isEmpty()) {
return null; return null;
} }
return lanePeriodicDataMap.get(topic); return realTimeCarInfoMap.get(topic);
} }
@KafkaListener(topics = {"JN040001LanePeriodicData"}) @KafkaListener(topics = {"JN040001LanePeriodicData"})
public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) { public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
log.info("收到kafka消息,消息主题:" + record.topic()); log.info("收到kafka消息,消息主题:" + record.topic());
log.info("收到kafka消息,消息内容:" + record.value()); log.info("收到kafka消息,消息内容:" + record.value());
lanePeriodicDataMap.put(record.topic(), String.valueOf(record.value())); realTimeCarInfoMap.put(record.topic(), String.valueOf(record.value()));
acknowledgment.acknowledge(); acknowledgment.acknowledge();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment