Commit e9d2e77c authored by zhoushiguang's avatar zhoushiguang

事件插入通知

parent 751f96c3
package net.wanji.datacenter.kafka;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
@Component
@Slf4j
public class KafkaProducerService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public boolean send(String topic, String dataKey, String data) {
boolean result = false;
long s = System.currentTimeMillis();
try {
ListenableFuture<?> lf = kafkaTemplate.send(topic, dataKey, data);
result = true;
long e = System.currentTimeMillis();
int datalen = data.length();
// logger.info("write to kafka success,topic:" + topic + ",key:" + dataKey + ",dataLeng:" + datalen
// + ",costTime:" + (e - s) + "ms\r\n");
//Thread.sleep(100);
} catch (Exception e) {
logger.error("topic=" + topic + ",往kafka生成消息失败", e);
} finally {
}
return result;
}
}
package net.wanji.datacenter.service; package net.wanji.datacenter.service;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.common.gts.Tools;
import net.wanji.common.utils.tool.JacksonUtils; import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils; import net.wanji.common.utils.tool.StringUtils;
import net.wanji.datacenter.cache.CrossGreenWaveInfoCache; import net.wanji.datacenter.cache.CrossGreenWaveInfoCache;
import net.wanji.datacenter.constant.EventAbnormalEnum; import net.wanji.datacenter.constant.EventAbnormalEnum;
import net.wanji.datacenter.kafka.KafkaProducerService;
import net.wanji.datacenter.mapper.EventInfoMapper; import net.wanji.datacenter.mapper.EventInfoMapper;
import net.wanji.datacenter.pojo.dto.EventDataDTO; import net.wanji.datacenter.pojo.dto.EventDataDTO;
import net.wanji.datacenter.pojo.po.EventInfoPO; import net.wanji.datacenter.pojo.po.EventInfoPO;
...@@ -28,6 +31,8 @@ public class EventDataServiceImpl implements EventDataService { ...@@ -28,6 +31,8 @@ public class EventDataServiceImpl implements EventDataService {
@Resource @Resource
private EventInfoMapper eventInfoMapper; private EventInfoMapper eventInfoMapper;
@Resource
private KafkaProducerService kafkaProducerService;
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
@Override @Override
...@@ -54,6 +59,12 @@ public class EventDataServiceImpl implements EventDataService { ...@@ -54,6 +59,12 @@ public class EventDataServiceImpl implements EventDataService {
EventInfoPO eventInfoPO = getEventInfoPO(eventDataDTO); EventInfoPO eventInfoPO = getEventInfoPO(eventDataDTO);
eventInfoMapper.insertOne(eventInfoPO); eventInfoMapper.insertOne(eventInfoPO);
// websocket // websocket
//插入通知
JSONObject insertNoticeJson = new JSONObject();
insertNoticeJson.put("insertTime", Tools.dateToStr(new Date()));
insertNoticeJson.put("eventSerialNumber",eventInfoPO.getEventSerialNumber());
kafkaProducerService.send("event.insert.notice.topic",eventInfoPO.getEventSerialNumber(),insertNoticeJson.toJSONString());
} }
} }
} catch (Exception e) { } catch (Exception e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment