Commit 45271677 authored by hanbing's avatar hanbing

路口事件数据接入Redis

parent 12711daa
package net.wanji.datacenter.config;
import net.wanji.datacenter.util.FastJson2JsonRedisSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.cache.RedisCacheWriter;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@Configuration
@EnableAutoConfiguration
public class RedisConfig extends CachingConfigurerSupport {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.database}")
private int database;
@Value("${spring.redis.timeout}")
private int timeout;
@Value("${spring.redis.jedis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.max-active}")
private int maxTotal;
@Value("${spring.redis.jedis.pool.min-idle}")
private int minIdle;
@Value("${spring.redis.jedis.pool.max-wait}")
private long maxWaitMillis;
/**
* 获取Jedis连接工厂
* <p>用于创建Jedis对象</p>
* @return JedisPool
*/
@Bean
public JedisPool redisPoolFactory() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(maxTotal);
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
jedisPoolConfig.setMinIdle(minIdle);
jedisPoolConfig.setTestOnBorrow(false);
jedisPoolConfig.setTestOnReturn(false);
return new JedisPool(jedisPoolConfig, host, port,timeout, password, database);
}
/**
* retemplate相关配置
* @param factory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);
//不开启事务
template.setEnableTransactionSupport(false);
// 值采用json序列化value
template.setValueSerializer(fastJson2JsonRedisSerializer());
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
// 设置hash key 和value序列化模式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
template.afterPropertiesSet();
return template;
}
/**
* 选择redis作为默认缓存工具
* @param redisConnectionFactory
* @return
*/
@Bean
public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {
RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig();
return RedisCacheManager
.builder(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory))
.cacheDefaults(redisCacheConfiguration).build();
}
/**
* @description: 自定义Redis序列化类
* @date 2019/4/29 20:23
*/
@Bean
public RedisSerializer fastJson2JsonRedisSerializer() {
return new FastJson2JsonRedisSerializer(Object.class);
}
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private Integer redisPort;
@Bean
public Jedis jedis() {
Jedis jedis = new Jedis(redisHost, redisPort);
jedis.auth("Wanji300552");
return jedis;
}
}
\ No newline at end of file
package net.wanji.datacenter.constant;
/**
* @author duanruiming
* @date 2023/01/16 10:45
*/
public class Constant {
/**
* Redis存储Key前缀
*/
public static final String KEY_PREFIX = "utcsystem_";
/**
* 分隔符:减号
**/
public static final String SEPARATOR_MINUS = "-";
public static final String WEBSOCKET_TOPIC_EVENT_WARN = "eventWarn";
public static final String WEBSOCKET_TOPIC_SYS_SERVICE_STATUS = "sysStatus";
public static final String WEBSOCKET_TOPIC_CAR_TRAIL_INFO = "carTrailInfo";
// WebSocket自动解锁路口topic
public static final String WEBSOCKET_AUTO_UNLOCK = "autoUnlock";
}
package net.wanji.datacenter.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.datacenter.constant.EventAbnormalEnum;
import net.wanji.datacenter.pojo.dto.CrossEventDTO;
import net.wanji.datacenter.pojo.dto.CrossLaneSnapshotDataDTO;
import net.wanji.datacenter.pojo.dto.CrossSnapshotDataDTO;
import net.wanji.datacenter.service.DataProcessService;
import net.wanji.datacenter.util.RedisUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
......@@ -19,6 +25,7 @@ import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
......@@ -29,6 +36,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ConsumerHandler implements KafkaListenerErrorHandler {
// Redis缓存过期时间
public static final int TTL_IN_MILLIS = 10 * 60 * 1000;
@Resource(name = "lanePeriodicDataProcessService")
DataProcessService lanePeriodicDataProcessService;
@Resource(name = "crossSnapshotDataProcessService")
......@@ -36,10 +45,13 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
@Resource(name = "laneSnapshotDataProcessService")
DataProcessService laneSnapshotDataProcessService;
@Autowired
RedisUtils redisUtils;
public static final Map<String, List<CrossSnapshotDataDTO>> crossSnapshotDataMap = new ConcurrentHashMap<>();
@KafkaListener(topics = {"JN050001LanePeriodicData"}, groupId = "group")
public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception{
public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
Object convert = lanePeriodicDataProcessService.convert(String.valueOf(record.value()));
// 修改逻辑,将保存逻辑改为将方向,转向数据返回kafka
lanePeriodicDataProcessService.save(convert);
......@@ -77,6 +89,64 @@ public class ConsumerHandler implements KafkaListenerErrorHandler {
acknowledgment.acknowledge();
}
@KafkaListener(topics = {"cross.event.index"}, groupId = "group2")
public void receiveCrossEventData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment)
throws Exception {
try {
ObjectMapper objectMapper = new ObjectMapper();
String recordStr = String.valueOf(record.value());
CrossEventDTO crossEventDTO = JacksonUtils.getInstance().readValue(recordStr, CrossEventDTO.class);
String msgType = crossEventDTO.getMsgType();
String crossId = crossEventDTO.getCrossId();
if (Objects.equals(msgType, EventAbnormalEnum.CROSS_UNBALANCE.getType())) {
String redisListElement = objectMapper.writeValueAsString(crossEventDTO);
// 失衡事件
// 方向缓存
Integer unbalanceDir = crossEventDTO.getUnbalanceDir();
String redisKey = crossId + ":" + unbalanceDir;
redisUtils.addToSortedSetWithExpiry(redisKey, redisListElement, TTL_IN_MILLIS);
// 路口缓存
redisUtils.addToSortedSetWithExpiry(crossId, redisListElement, TTL_IN_MILLIS);
} else if (msgType.startsWith("50")) {
// 拥堵事件
// 方向缓存
List<CrossEventDTO> details = crossEventDTO.getDetails();
for (CrossEventDTO detail : details) {
detail.setMsgType(detail.getCongestionCode());
Integer dir = detail.getDir();
String redisKey = crossId + ":" + dir;
String redisListElement = objectMapper.writeValueAsString(detail);
redisUtils.addToSortedSetWithExpiry(redisKey, redisListElement, TTL_IN_MILLIS);
}
// 路口缓存
String redisListElement = objectMapper.writeValueAsString(crossEventDTO);
redisUtils.addToSortedSetWithExpiry(crossId, redisListElement, TTL_IN_MILLIS);
} else if (Objects.equals(msgType, EventAbnormalEnum.CROSS_OVERFLOW.getType())
|| Objects.equals(msgType, EventAbnormalEnum.CROSS_DEADLOCK.getType())) {
// 溢出和死锁事件
// 方向缓存
List<CrossEventDTO> details = crossEventDTO.getDetails();
for (CrossEventDTO detail : details) {
detail.setMsgType(EventAbnormalEnum.CROSS_OVERFLOW.getType());
Integer dir = detail.getDir();
String redisKey = crossId + ":" + dir;
String redisListElement = objectMapper.writeValueAsString(detail);
redisUtils.addToSortedSetWithExpiry(redisKey, redisListElement, TTL_IN_MILLIS);
}
// 路口缓存
String redisListElement = objectMapper.writeValueAsString(crossEventDTO);
redisUtils.addToSortedSetWithExpiry(crossId, redisListElement, TTL_IN_MILLIS);
}
} catch (Exception e) {
log.error("Kafka收到路口事件数据异常", e);
throw new Exception(e);
}
acknowledgment.acknowledge();
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
......
package net.wanji.datacenter.pojo.dto;
import lombok.Data;
import java.util.List;
@Data
public class CrossEventDTO {
private Integer dir;
private String rid;
//该路段的结束路口
private String crossId;
private Double index;
private String indexName;
// 501畅通 502轻微拥堵 503中度拥堵 504重度拥堵
private String congestionCode;
private String timestamp;
private Long globalTimeStamp;
private Long startTime;
private Long endTime;
private Integer duration;
// 事件序列号
private String eventSerialNumber;
// 检测时间
private Long detectTime;
private List<CrossEventDTO> details;
private String msgType;
// 失衡方向
private Integer unbalanceDir;
}
......@@ -98,22 +98,27 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
List<CrossDirDataRealtimePO> crossDirDataRealtimePOS = new ArrayList<>(laneNum);
List<CrossTurnDataRealtimePO> crossTurnDataRealtimePOS = new ArrayList<>(laneNum);
List<CrossLaneDataRealTimePO> crossLaneDataRealTimePOS = new ArrayList<>(laneNum);
List<LanePeriodicDataDTO.EventList> eventLists = lanePeriodicDataDTO.getEventList();
String timeStamp = lanePeriodicDataDTO.getTimeStamp();
Date endDate = DateUtil.parse(timeStamp, DateStyle.YYYY_MM_DD_HH_MM_SS.getValue());
Date startDate = DateUtil.offsetMinute(endDate, -5);
if (CollectionUtils.isEmpty(eventLists)) {
log.error("当前车道周期实时数据车道列表为空,参数信息:", lanePeriodicDataDTO);
throw new Exception("当前车道周期实时数据车道列表为空");
}
// 通过车道编号获取车道信息
List<String> LaneIdList = eventLists.stream().map(LanePeriodicDataDTO.EventList::getLaneId).collect(Collectors.toList());
List<String> LaneIdList = eventLists.stream()
.map(LanePeriodicDataDTO.EventList::getLaneId)
.collect(Collectors.toList());
List<CrossBaseLaneInfoPO> crossBaseLaneInfoPOS = crossBaseLaneInfoMapper.selectBatchIds(LaneIdList);
DecimalFormat decimalFormat = new DecimalFormat("#.##");
// 将kafka车道数据与路口车道数据对应,获取路口编号,方向,转向,分组计算
for (LanePeriodicDataDTO.EventList laneRealTimeItem : eventLists) {
String laneId = laneRealTimeItem.getLaneId();
for (CrossBaseLaneInfoPO laneInfoPO : crossBaseLaneInfoPOS) {
String id = laneInfoPO.getId();
if (StringUtils.equals(laneId, id)) {
......@@ -270,11 +275,13 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
DecimalFormat decimalFormat, ProducerHandler producerHandler, Date startDate) throws Exception {
// 方向数据
List<CrossDirDataRealtimePO> insertCrossDirDataRealtimePOS = new ArrayList<>();
Map<String, List<CrossDirDataRealtimePO>> crossDirDataRealTimePOSMap = crossDirDataRealtimePOS.stream().collect(Collectors.groupingBy(CrossDirDataRealtimePO::getCrossId));
Map<String, List<CrossDirDataRealtimePO>> crossDirDataRealTimePOSMap = crossDirDataRealtimePOS.stream()
.collect(Collectors.groupingBy(CrossDirDataRealtimePO::getCrossId));
for (Map.Entry<String, List<CrossDirDataRealtimePO>> entry : crossDirDataRealTimePOSMap.entrySet()) {
String crossId = entry.getKey();
List<CrossDirDataRealtimePO> value = entry.getValue();
Map<Integer, List<CrossDirDataRealtimePO>> crossIdMap = value.stream().collect(Collectors.groupingBy(CrossDirDataRealtimePO::getDirType));
Map<Integer, List<CrossDirDataRealtimePO>> crossIdMap = value.stream()
.collect(Collectors.groupingBy(CrossDirDataRealtimePO::getDirType));
Double congestionIndex = 0.0;
Double spilloverIndex = 0.0;
Double saturationAverage = 0.0;
......@@ -448,7 +455,7 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
if (delayTimeIndexMin <= 0.0) {
delayTimeIndexMin = 1.0;
}
// 负载均衡
// 负载均衡
Double loadBalance = Math.sqrt(saturationSum / crossIdMap.size());
CrossDataIndexDTO crossDataIndexDTO = new CrossDataIndexDTO();
crossDataIndexDTO.setCongestionIndex(Double.valueOf(decimalFormat.format(congestionIndex)));
......
package net.wanji.datacenter.task;
import lombok.extern.slf4j.Slf4j;
import net.wanji.datacenter.util.RedisUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Set;
/**
* @author Kent HAN
* @date 2023/12/15 16:41
*/
@Component
@Slf4j
public class RedisTask {
@Resource
private RedisUtils redisUtils;
/**
* 每分钟清理路口事件缓存
* @author Kent HAN
* @date 2023/12/15 16:42
*/
@Scheduled(fixedRate = 1000 * 60)
public void cleanCrossEventCache() throws Exception {
try {
Set<String> keys = redisUtils.keys();
for (String key : keys) {
redisUtils.removeExpiredElements(key);
}
} catch (Exception e) {
log.error("清理路口事件缓存失败", e);
throw new Exception(e);
}
}
}
package net.wanji.datacenter.util;
/**
* @author wanji
* @version 1.0
* @Description: [日期风格工具类]
* Created on 2019/4/16 15:07
*/
public enum DateStyle {
YYYY_MM("yyyy-MM", false),
YYYY_MM_DD("yyyy-MM-dd", false),
YYYY_MM_DD_HH_MM("yyyy-MM-dd HH:mm", false),
YYYY_MM_DD_HH_MM_SS("yyyy-MM-dd HH:mm:ss", false),
YYYYMMDDHHMMSS("yyyyMMddHHmmss", false),
YYYYMMDDHHMMSSSSS("yyyyMMddHHmmssSSS", false),
YYYYMMDD("yyyyMMdd", false),
YYYY_MM_EN("yyyy/MM", false),
YYYY_MM_DD_EN("yyyy/MM/dd", false),
YYYY_MM_DD_HH_MM_EN("yyyy/MM/dd HH:mm", false),
YYYY_MM_DD_HH_MM_SS_EN("yyyy/MM/dd HH:mm:ss", false),
DD_MM_YYYY_EN("dd/MM/yyyy", false),
YYYY_MM_CN("yyyy年MM月", false),
YYYY_MM_DD_CN("yyyy年MM月dd日", false),
YYYY_MM_DD_HH_MM_CN("yyyy年MM月dd日 HH:mm", false),
YYYY_MM_DD_HH_MM_SS_CN("yyyy年MM月dd日 HH:mm:ss", false),
HH_MM("HH:mm", true),
HH_MM_SS("HH:mm:ss", true),
MM_DD("MM-dd", true),
MM_DD_HH_MM("MM-dd HH:mm", true),
MM_DD_HH_MM_SS("MM-dd HH:mm:ss", true),
MM_DD_EN("MM/dd", true),
MM_DD_HH_MM_EN("MM/dd HH:mm", true),
MM_DD_HH_MM_SS_EN("MM/dd HH:mm:ss", true),
MM_DD_CN("MM月dd日", true),
MM_DD_HH_MM_CN("MM月dd日 HH:mm", true),
MM_DD_HH_MM_SS_CN("MM月dd日 HH:mm:ss", true);
private String value;
private boolean isShowOnly;
DateStyle(String value, boolean isShowOnly) {
this.value = value;
this.isShowOnly = isShowOnly;
}
public String getValue() {
return value;
}
public boolean isShowOnly() {
return isShowOnly;
}
}
\ No newline at end of file
package net.wanji.datacenter.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import java.nio.charset.Charset;
public class FastJson2JsonRedisSerializer<T> implements RedisSerializer<T> {
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
private Class<T> clazz;
public FastJson2JsonRedisSerializer(Class<T> clazz) {
super(); this.clazz = clazz;
}
@Override
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
}
return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
}
@Override
public T deserialize(byte[] bytes) throws SerializationException {
if (bytes == null || bytes.length <= 0) {
return null;
}
String str = new String(bytes, DEFAULT_CHARSET);
return (T) JSON.parseObject(str, clazz);
}
}
\ No newline at end of file
package net.wanji.datacenter.util;
import net.wanji.datacenter.constant.Constant;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.support.atomic.RedisAtomicLong;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.DecimalFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @description: redis工具类
* @author wanji
* @version 1.0
* Created on 2019/4/29 20:34
*/
@Component
public class RedisUtils {
/**
* 注入redisTemplate
*/
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* @description: 判断是否存在Key
* @param key redis的Key
* @return boolean true:有 false:无
*/
public boolean hasKey(String key) {
return redisTemplate.hasKey(key);
}
/**
* @description: 添加字符串
* @param key redis的Key
* @param value 添加redis的value
*/
public void set(String key, String value) {
redisTemplate.opsForValue().set(key, value);
}
/**
* @description: 添加对象
* @param key redis的Key
* @param object 添加redis的value
*/
public void set(String key, Object object) {
redisTemplate.opsForValue().set(key, object);
}
/**
* @description: 添加带生命周期的对象
* @param key redis的Key
* @param object 添加redis的value
* @param seconds 失效时间
*/
public void setAndExpire(String key, Object object, int seconds) {
redisTemplate.opsForValue().set(key, object);
redisTemplate.expire(key, seconds, TimeUnit.SECONDS);
}
/**
* @description: 添加带生命周期的对象
* @param key redis的Key
* @param value 添加redis的value
* @param seconds 失效时间
*/
public void setAndExpire(String key, String value, int seconds) {
redisTemplate.opsForValue().set(key, value);
redisTemplate.expire(key, seconds, TimeUnit.SECONDS);
}
/**
* @description: 获取对象
* @param key redis的Key
* @return Object 返回对象
*/
public Object get(String key) {
return redisTemplate.opsForValue().get(key);
}
/**
* @description: 存入redis的hash
* @param key redis的Key
* @param field 字段值
* @param value 存入的值
*/
public void setHash(String key, String field, String value) {
redisTemplate.opsForHash().put(key, field, value);
}
/**
* @description: 存入redis的hash
* @param key redis的Key
* @param field 字段值
* @param value 存入的值
*/
public void setHash(String key, String field, Object value) {
redisTemplate.opsForHash().put(key, field, value);
}
/**
* @description: 根据key和字段值获取内容值
* @param key redis的Key
* @param field 字段值
* @return String 返回字符串
*/
public String getHash(String key, String field) {
return (String)redisTemplate.opsForHash().get(key, field);
}
/**
* @description: 根据field删除值
* @param key redis的Key
* @param field 字段值
*/
public void delHashMap(String key, String field) {
redisTemplate.boundHashOps(key).delete(field);
}
/**
* @description: 存入hash集合
* @param key redis的Key
* @param hashmap 存入的Map集合
*/
public void setHashMap(String key, Map<String, Object> hashmap){
redisTemplate.opsForHash().putAll(key, hashmap);
}
/**
* @description: 取出hash集合
* @param key redis的Key
* @return Map<Object, Object> 返回Map集合
*/
public Map<Object, Object> getHashMap(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* @description: 存入redis的Set
* @param key redis的Key
* @param object 对象
*/
public void setSet(String key,Object object){
redisTemplate.opsForSet().add(key, object);
}
/**
* @description: 获取redis的Set
* @param key redis的Key
* @return Set<Object> Set集合
*/
public Set<Object> getSet(String key){
return redisTemplate.opsForSet().members(key);
}
/**
* @discription: 查看值是否是set成员
* @param key set的key
* @param value set的成员
* @return 是否是set成员
*/
public Boolean isSetMember(String key, Object value) {
return redisTemplate.opsForSet().isMember(key, value);
}
/**
* @description:设置key的过期时间,endTime格式:yyyy-MM-dd hh:mm:ss
* @param key redis的Key
* @param endTime 结束时间
*/
public void setExpire(String key, Date endTime) {
long seconds = endTime.getTime() - System.currentTimeMillis();
redisTemplate.expire(key, (int) (seconds / 1000), TimeUnit.SECONDS);
}
/**
* @description: 设置key的过期时间
* @param key redis的Key
* @param time 过期时间(秒)
*/
public void setExpire(String key, int time) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
/**
* <p>Discription:获取key的过期时间
* @param key redis的Key
* @return 过期时间(秒)
*/
public Long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* @description: 在redis消息队列队尾插入数据
* @param key redis的Key
* @param object 添加的对象
*/
public void tailPush(String key, Object object){
redisTemplate.opsForList().rightPush(key, object);
}
/**
* @description: 在redis消息队列对头插入数据
* @param key redis的Key
* @param object 添加的对象
*/
public void headPush(String key,Object object){
redisTemplate.opsForList().leftPush(key, object);
}
/**
* @description: 在redis消息队列队尾删除数据
* @param key redis的Key
* @return Object 删除的对象
*/
public Object tailPop(String key){
return redisTemplate.opsForList().rightPop(key);
}
/**
* @description: 在redis消息队列队头删除数据
* @param key redis的Key
* @return Object 删除的对象
*/
public Object headPop(String key){
return redisTemplate.opsForList().leftPop(key);
}
/**
* @description: 删除redis的值
* @param key redis的Key
*/
public void del(String key) {
if (hasKey(key)) {
redisTemplate.delete(key);
}
}
/**
* @description: 清理redis缓存
*/
public void flushDB(){
redisTemplate.getConnectionFactory().getConnection().flushDb();
}
/**
* @description: 根据类型生成版本号
* @param type 类型key区分
* @return 版本号
* @author wanji
* @date 2019/5/5 19:26
*/
public String getVersion(String type) {
String formatDate = DateUtils.dateToString(new Date(), DateStyle.YYYYMMDD);
String key = Constant.KEY_PREFIX + type + formatDate;
//当前时间到第二天还剩多少时间
Date newDate = DateUtils.dateForMat(DateUtils.addDay(new Date(), 1), DateStyle.YYYY_MM_DD);
int liveTime = DateUtils.dayDiff(newDate, new Date(), Calendar.MILLISECOND);
//获取自增号
Long incr = getIncr(key, liveTime);
if(incr == 0) {
incr = getIncr(key, liveTime);//从001开始
}
DecimalFormat df = new DecimalFormat("000");//三位序列号
return formatDate + Constant.SEPARATOR_MINUS + df.format(incr);
}
/**
* 自增ID
* @param key 建
* @param liveTime 过期时间
* @return 自增结果
*/
public Long getIncr(String key, long liveTime) {
RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
Long increment = entityIdCounter.getAndIncrement();
if ((null == increment || increment.longValue() == 0) && liveTime > 0) {//初始设置过期时间
entityIdCounter.expire(liveTime, TimeUnit.MILLISECONDS);//单位毫秒
}
return increment;
}
/**
* 获取全部Redis的key
* @return
*/
public Set<String> keys() {
return redisTemplate.keys("*");
}
/**
* 获取全局ID范围
*
* @param key 建
* @param increment ID范围
* @return
*/
public Long getGlobalIdRange(String key, long increment) {
ValueOperations<String, Object> ops = redisTemplate.opsForValue();
Long newIdRangeStart = ops.increment(key, increment);
// The range of IDs that are now reserved are from newIdRangeStart - increment + 1 to newIdRangeStart.
return newIdRangeStart - increment + 1;
}
/**
* 带过期时间存入 zset
* @param ttlInMillis 存活时间毫秒数
*/
public void addToSortedSetWithExpiry(String key, String value, long ttlInMillis) {
long score = new Date().getTime() + ttlInMillis;
redisTemplate.opsForZSet().add(key, value, score);
}
/**
* 删除过期的 zset 元素
*/
public void removeExpiredElements(String key) {
long now = new Date().getTime();
redisTemplate.opsForZSet().removeRangeByScore(key, 0, now);
}
}
......@@ -43,7 +43,7 @@ spring:
max-poll-records: 1
broker-id: 0
auto-commit-interval: 1S
auto-offset-reset: earliest
auto-offset-reset: latest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
......@@ -52,6 +52,18 @@ spring:
concurrency: 3
ack-mode: manual_immediate
missing-topics-fatal: false
redis:
host: 37.12.182.29
port: 6379
password: Wanji300552
jedis:
pool:
max-active: 200
max-wait: 5000
max-idle: 20
min-idle: 10
timeout: 5000
database: 7
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml,classpath:mapper/*/*.xml
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment