Commit cc150029 authored by duanruiming's avatar duanruiming

[update] 绿波更新

parent 067e0c40
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
</parent> </parent>
<modules> <modules>
<!--<module>wj-datacenter-service</module>--> <module>wj-datacenter-service</module>
<module>wj-realtime-computing</module> <module>wj-realtime-computing</module>
</modules> </modules>
......
package net.wanji.datacenter.config; package net.wanji.datacenter.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import net.wanji.datacenter.util.FastJson2JsonRedisSerializer; import net.wanji.datacenter.util.FastJson2JsonRedisSerializer;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
...@@ -13,6 +18,7 @@ import org.springframework.data.redis.cache.RedisCacheWriter; ...@@ -13,6 +18,7 @@ import org.springframework.data.redis.cache.RedisCacheWriter;
import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
...@@ -67,25 +73,25 @@ public class RedisConfig extends CachingConfigurerSupport { ...@@ -67,25 +73,25 @@ public class RedisConfig extends CachingConfigurerSupport {
return new JedisPool(jedisPoolConfig, host, port,timeout, password, database); return new JedisPool(jedisPoolConfig, host, port,timeout, password, database);
} }
/**
* retemplate相关配置
* @param factory
* @return
*/
@Bean @Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>(); RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂 template.setConnectionFactory(connectionFactory);
template.setConnectionFactory(factory);
//不开启事务 // 使用 Jackson2JsonRedisSerializer 替代默认的 JDK 序列化器
template.setEnableTransactionSupport(false); Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
// 值采用json序列化value ObjectMapper objectMapper = new ObjectMapper();
template.setValueSerializer(fastJson2JsonRedisSerializer()); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//使用StringRedisSerializer来序列化和反序列化redis的key值 objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
template.setKeySerializer(new StringRedisSerializer()); ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
// 设置hash key 和value序列化模式 serializer.setObjectMapper(objectMapper);
template.setKeySerializer(new StringRedisSerializer()); // 设置 Key 的序列化器
template.setValueSerializer(serializer); // 设置 Value 的序列化器
template.setHashKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setHashValueSerializer(serializer);
template.afterPropertiesSet(); template.afterPropertiesSet();
return template; return template;
} }
...@@ -102,21 +108,4 @@ public class RedisConfig extends CachingConfigurerSupport { ...@@ -102,21 +108,4 @@ public class RedisConfig extends CachingConfigurerSupport {
.builder(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory)) .builder(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory))
.cacheDefaults(redisCacheConfiguration).build(); .cacheDefaults(redisCacheConfiguration).build();
} }
/**
* @description: 自定义Redis序列化类
* @date 2019/4/29 20:23
*/
@Bean
public RedisSerializer fastJson2JsonRedisSerializer() {
return new FastJson2JsonRedisSerializer(Object.class);
}
@Bean
public Jedis jedis() {
Jedis jedis = new Jedis(host, port);
jedis.auth(password);
return jedis;
}
} }
\ No newline at end of file
...@@ -36,16 +36,32 @@ public class GreenBeltDataConsumerHandler implements KafkaListenerErrorHandler { ...@@ -36,16 +36,32 @@ public class GreenBeltDataConsumerHandler implements KafkaListenerErrorHandler {
@KafkaListener(topics = {"${kafka-consumer.greenBeltInfoDataTopic}"}, groupId = "group1") @KafkaListener(topics = {"${kafka-consumer.greenBeltInfoDataTopic}"}, groupId = "group1")
public void receiveGreenBeltInfoData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveGreenBeltInfoData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String message = String.valueOf(record.value()); try {
if (StringUtils.isNotBlank(message)) { String message = String.valueOf(record.value());
GreenBeltOptInfoVO greenBeltOptInfoVO = greenBeltInfoService.convertData(message); if (StringUtils.isNotBlank(message)) {
greenBeltInfoService.save(greenBeltOptInfoVO); GreenBeltOptInfoVO greenBeltOptInfoVO = greenBeltInfoService.convertData(message);
String key = Constants.GREEN_ID_OPT_KEY.concat(greenBeltOptInfoVO.getGreenId()); greenBeltInfoService.save(greenBeltOptInfoVO);
redisUtils.set(key, JacksonUtils.getInstance().writeValueAsString(greenBeltOptInfoVO)); String key = Constants.GREEN_ID_OPT_KEY.concat(greenBeltOptInfoVO.getGreenId());
//redisUtils.setAndExpire(key, greenBeltOptInfoVO, 3600 * 24);
GreenBeltChartVO greenBeltChartVO = greenBeltInfoService.convertChartData(greenBeltOptInfoVO); GreenBeltChartVO greenBeltChartVO = greenBeltInfoService.convertChartData(greenBeltOptInfoVO);
String chartKey = Constants.GREEN_ID_OPT_CHART_KEY.concat(greenBeltOptInfoVO.getGreenId()); String chartKey = Constants.GREEN_ID_OPT_CHART_KEY.concat(greenBeltOptInfoVO.getGreenId());
redisUtils.set(chartKey, JacksonUtils.getInstance().writeValueAsString(greenBeltChartVO)); redisUtils.setAndExpire(chartKey, JacksonUtils.getInstance().writeValueAsString(greenBeltChartVO), 3600 * 24);
}
acknowledgment.acknowledge();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@KafkaListener(topics = {"${kafka-consumer.greenWavePeriodTopic}"}, groupId = "group")
public void receiveGreenPeriodfoData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value());
try {
greenBeltInfoService.savePeriod(originalData);
} catch (Exception e) {
log.error("绿波GreenRealTime实时数据转换异常", e);
throw new Exception();
} }
acknowledgment.acknowledge(); acknowledgment.acknowledge();
} }
......
...@@ -28,10 +28,8 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler { ...@@ -28,10 +28,8 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler {
@Resource @Resource
private DataProcessService dataProcessService; private DataProcessService dataProcessService;
public static final Map<String, List<CrossSnapshotDataDTO>> crossSnapshotDataMap = new ConcurrentHashMap<>();
@KafkaListener(topics = {"${kafka-consumer.lanePeriodicDataTopic}"}, groupId = "group")
@KafkaListener(topics = {"${kafka-consumer.lanePeriodicDataTopic}"}, groupId = "group1")
public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value()); String originalData = String.valueOf(record.value());
try { try {
...@@ -43,28 +41,29 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler { ...@@ -43,28 +41,29 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler {
acknowledgment.acknowledge(); acknowledgment.acknowledge();
} }
@KafkaListener(topics = {"${kafka-consumer.crossPeriodicDataTopic}"}, groupId = "group1") @KafkaListener(topics = {"${kafka-consumer.crossPeriodicDataTopic}"}, groupId = "group")
public void receiveCrossPeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveCrossPeriodicData(String message) throws Exception {
String originalData = String.valueOf(record.value()); String originalData = String.valueOf(message);
try { try {
dataProcessService.crossSave(originalData); dataProcessService.crossSave(originalData);
//acknowledgment.acknowledge();
} catch (Exception e) { } catch (Exception e) {
//acknowledgment.acknowledge();
log.error("路口Cross实时数据转换异常", e); log.error("路口Cross实时数据转换异常", e);
throw new Exception(); throw new Exception();
} }
acknowledgment.acknowledge();
} }
@KafkaListener(topics = {"${kafka-consumer.dirPeriodicDataTopic}"}, groupId = "group1") @KafkaListener(topics = {"${kafka-consumer.dirPeriodicDataTopic}"}, groupId = "group")
public void receiveDirPeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception { public void receiveDirPeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) throws Exception {
String originalData = String.valueOf(record.value()); String originalData = String.valueOf(record.value());
try { try {
dataProcessService.dirSave(originalData); dataProcessService.dirSave(originalData);
acknowledgment.acknowledge();
} catch (Exception e) { } catch (Exception e) {
log.error("方向Dir周期实时数据转换异常", e); log.error("方向Dir周期实时数据转换异常", e);
throw new Exception(); throw new Exception();
} }
acknowledgment.acknowledge();
} }
@KafkaListener(topics = {"${kafka-consumer.turnPeriodicDataTopic}"}, groupId = "group") @KafkaListener(topics = {"${kafka-consumer.turnPeriodicDataTopic}"}, groupId = "group")
...@@ -72,11 +71,11 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler { ...@@ -72,11 +71,11 @@ public class PeriodDataConsumerHandler implements KafkaListenerErrorHandler {
String originalData = String.valueOf(record.value()); String originalData = String.valueOf(record.value());
try { try {
dataProcessService.turnSave(originalData); dataProcessService.turnSave(originalData);
acknowledgment.acknowledge();
} catch (Exception e) { } catch (Exception e) {
log.error("转向Turn周期实时数据转换异常", e); log.error("转向Turn周期实时数据转换异常", e);
throw new Exception(); throw new Exception();
} }
acknowledgment.acknowledge();
} }
......
package net.wanji.datacenter.pojo.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import net.wanji.databus.dao.entity.GreenwaveRealtimePO;
import net.wanji.databus.po.CrossTurnDataRealtimePO;
import java.util.List;
/**
* @author duanruiming
* @date 2024/11/25 11:09
*/
@Data
public class GreenWavePeriodDTO {
@JsonProperty("eventList")
private List<GreenwaveRealtimePO> eventList;
/**
* 车道数量
*/
private Integer laneNum;
/**
* 全域编号
*/
private String orgCode;
/**
* 数据生成时间: yyyy-MM-dd HH:mm:ss:SSS
*/
private String timeStamp;
}
package net.wanji.datacenter.pojo.vo; package net.wanji.datacenter.pojo.vo;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
...@@ -16,16 +17,21 @@ import java.util.Map; ...@@ -16,16 +17,21 @@ import java.util.Map;
@ApiModel(value = "GreenBeltChartVO", description = "绿波时序图返回实体") @ApiModel(value = "GreenBeltChartVO", description = "绿波时序图返回实体")
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class GreenBeltChartVO { public class GreenBeltChartVO {
private static final long serialVersionUID = 1L; // 推荐添加一个序列化版本号
@ApiModelProperty("绿波路口红灯范围") @ApiModelProperty("绿波路口红灯范围")
Map<String, List<Double[]>> crossRedTimesMap; @JsonProperty("crossRedTimesMap")
String crossRedTimesMap;
@ApiModelProperty("反向绿波路口红灯范围") @ApiModelProperty("反向绿波路口红灯范围")
Map<String, List<Double[]>> backCrossRedTimesMap; @JsonProperty("backCrossRedTimesMap")
String backCrossRedTimesMap;
@ApiModelProperty("绿波路口绿波开始时间") @ApiModelProperty("绿波路口绿波开始时间")
Map<String, Double> greenStartMap; @JsonProperty("greenStartMap")
String greenStartMap;
@ApiModelProperty("反向绿波路口绿波开始时间") @ApiModelProperty("反向绿波路口绿波开始时间")
Map<String, Double> BackGreenStartMap; String BackGreenStartMap;
@JsonProperty("distanceMap")
@ApiModelProperty("绿波路口距离下一个路口距离") @ApiModelProperty("绿波路口距离下一个路口距离")
Map<String, Double> distanceMap; String distanceMap;
@ApiModelProperty("绿波带宽") @ApiModelProperty("绿波带宽")
private Double greenWidthTime; private Double greenWidthTime;
@ApiModelProperty("反向绿波带宽") @ApiModelProperty("反向绿波带宽")
......
...@@ -23,4 +23,5 @@ public interface GreenBeltInfoService { ...@@ -23,4 +23,5 @@ public interface GreenBeltInfoService {
* @throws Exception * @throws Exception
*/ */
void save(GreenBeltOptInfoVO infoVO) throws Exception; void save(GreenBeltOptInfoVO infoVO) throws Exception;
void savePeriod(String message) throws Exception;
} }
...@@ -6,7 +6,7 @@ spring: ...@@ -6,7 +6,7 @@ spring:
cloud: cloud:
nacos: nacos:
config: config:
server-addr: 10.102.1.182:8848 server-addr: 37.12.182.29:8848
file-extension: yaml file-extension: yaml
group: wanji group: wanji
namespace: wanji namespace: wanji
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment