Commit 95f84355 authored by duanruiming's avatar duanruiming

[add] kafka配置

parent e578e6cd
......@@ -64,6 +64,11 @@
<!-- 依赖声明 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
......
......@@ -19,6 +19,10 @@
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>net.wanji</groupId>
<artifactId>signal-feign-service</artifactId>
......
package net.wanji.opt.kafka;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author duanruiming
* @date 2023/03/09 16:03
*/
@Component
@Slf4j
public class ConsumerHandler implements KafkaListenerErrorHandler {
/**
* kafka数据缓存
* key:topic value:data
*/
private static final Map<String, String> lanePeriodicDataMap = new ConcurrentHashMap<>();
/**
* 通过消息主题获取消息信息
*
* @param topic
* @return
*/
public String getTopicMessage(String topic) {
if (lanePeriodicDataMap.isEmpty()) {
return null;
}
return lanePeriodicDataMap.get(topic);
}
@KafkaListener(topics = {"JN040001LanePeriodicData"})
public void receiveLanePeriodicData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
log.info("收到kafka消息,消息主题:" + record.topic());
log.info("收到kafka消息,消息内容:" + record.value());
lanePeriodicDataMap.put(record.topic(), String.valueOf(record.value()));
acknowledgment.acknowledge();
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
return new Object();
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
String errorMessage = String.format("监听主题:%s,消费者详情:%s,异常信息:%s,消息详情:%s",
consumer.listTopics(), consumer.groupMetadata(), exception, message);
log.error(errorMessage);
return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
}
}
package net.wanji.opt.po.kafka;
import lombok.Data;
import java.util.List;
/**
* @author duanruiming
* @date 2023/03/09 17:02
*/
@Data
public class LanePeriodicDataPO {
private List<Eventlist> eventList;
private int laneNum;
private String orgCode;
private String timeStamp;
@Data
public class Eventlist {
private double dynamicQueueLengthMax;
private int dynamicQueueLengthMin;
private int greenLightEfficiency;
private String laneId;
private int laneCapacity;
private int laneFlowRate;
private int laneNoStopRate;
private int laneOneStopRate;
private double laneSaturation;
private int laneSaturationFlowRate;
private int laneThreeStopRate;
private int laneTwoStopRate;
private double lightGreenFinishQueueLength;
private int lightGreenStartQueueLength;
private int meanDelay;
private int meanStopsNumber;
private double meanV;
private int staticQueueLengthMax;
private int staticQueueLengthMin;
private double timeOccupancy;
private int trafficFlow;
private int trolleyEquivalent;
private double vehicleLengthRatioMean;
private double vehicleNumsRatioMean;
}
}
package net.wanji.opt.task;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.enums.CrossStatusEnum;
import net.wanji.common.framework.Constants;
import net.wanji.opt.dao.mapper.trend.CrossDataRealtimeMapper;
import net.wanji.opt.dto.CrossTurnDataRealtimeDTO;
import net.wanji.opt.kafka.ConsumerHandler;
import net.wanji.opt.po.kafka.LanePeriodicDataPO;
import net.wanji.opt.po.trend.CrossDataRealtimePO;
import net.wanji.opt.service.CrossOptimizeService;
import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
......@@ -21,21 +24,21 @@ import java.util.List;
* @desc RealtimeOptimizeTask
*/
@RequiredArgsConstructor
@Component
@Slf4j
public class CrossMonitorTask {
@Resource
CrossOptimizeService crossOptimizeService;
@Resource
CrossDataRealtimeMapper crossDataRealtimeMapper;
private final CrossOptimizeService crossOptimizeService;
private final ConsumerHandler consumerHandler;
private final CrossDataRealtimeMapper crossDataRealtimeMapper;
/**
* 路口实时监测
*
*/
@Scheduled(cron = "* 0/1 * * * ? ")
@Scheduled(fixedRate = 1 * 60 *1000)
public void realtimeMonitor() {
log.info("路口实时监测任务...");
// 获取路口实时监测数据
List<CrossDataRealtimePO> crossDataRealtimePOList = listCrossDataRealtime();
......@@ -107,6 +110,8 @@ public class CrossMonitorTask {
crossDataRealtimePOList.add(crossDataRealtimePO);
crossDataRealtimePOList.add(crossDataRealtimePO1);
String lanePeriodicData = consumerHandler.getTopicMessage("JN040001LanePeriodicData");
LanePeriodicDataPO lanePeriodicDataPO = JSONObject.parseObject(lanePeriodicData, LanePeriodicDataPO.class);
// todo 对数据进行处理,可能主要是CrossTurnDataRealtimeDTO中的车间距,车时距等配置修改
CrossTurnDataRealtimeDTO crossTurnDataRealtimeDTO = new CrossTurnDataRealtimeDTO();
......
......@@ -65,6 +65,22 @@ spring:
timeout: 5000
database: 3
kafka:
bootstrap-servers: 10.102.1.182:9092
#消费者配置
consumer:
max-poll-records: 1
broker-id: 0
auto-commit-interval: 1S
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: group
listener:
concurrency: 3
ack-mode: manual_immediate
# 信号平台
system:
address:
......
......@@ -53,6 +53,22 @@ spring:
timeout: 5000
database: 3
kafka:
bootstrap-servers: 10.102.1.182:9092
#消费者配置
consumer:
max-poll-records: 1
broker-id: 0
auto-commit-interval: 1S
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: group
listener:
concurrency: 3
ack-mode: manual_immediate
# 信号平台
system:
address:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment