Commit 317b7d5c authored by duanruiming's avatar duanruiming

[add] 通过车牌查询kafka轨迹信息

parent f45fa8dd
...@@ -26,6 +26,10 @@ ...@@ -26,6 +26,10 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency> <dependency>
<groupId>net.wanji</groupId> <groupId>net.wanji</groupId>
<artifactId>signal-feign-service</artifactId> <artifactId>signal-feign-service</artifactId>
...@@ -151,7 +155,8 @@ ...@@ -151,7 +155,8 @@
<artifactId>mybatis-generator-maven-plugin</artifactId> <artifactId>mybatis-generator-maven-plugin</artifactId>
<version>${mybatis.generator.version}</version> <version>${mybatis.generator.version}</version>
<configuration> <configuration>
<configurationFile>${basedir}/src/main/resources/mybatis-generator/generatorConfig.xml</configurationFile> <configurationFile>${basedir}/src/main/resources/mybatis-generator/generatorConfig.xml
</configurationFile>
<verbose>true</verbose> <verbose>true</verbose>
<overwrite>true</overwrite> <overwrite>true</overwrite>
</configuration> </configuration>
......
...@@ -20,5 +20,6 @@ public class Constant { ...@@ -20,5 +20,6 @@ public class Constant {
public static final String WEBSOCKET_TOPIC_EVENT_WARN = "eventWarn"; public static final String WEBSOCKET_TOPIC_EVENT_WARN = "eventWarn";
public static final String WEBSOCKET_TOPIC_SYS_SERVICE_STATUS = "sysStatus"; public static final String WEBSOCKET_TOPIC_SYS_SERVICE_STATUS = "sysStatus";
public static final String WEBSOCKET_TOPIC_CAR_TRAIL_INFO = "carTrailInfo";
} }
package net.wanji.web.kafka;
import com.alibaba.fastjson.JSONObject;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author duanruiming
* @date 2023/03/09 16:03
*/
@Slf4j
@Component
public class ConsumerHandler implements KafkaListenerErrorHandler {
/**
* kafka数据缓存
* key:topic value:data
*/
private static final Map<String, JNMatchResultMiniData> carInfoMap = new ConcurrentHashMap<>();
/**
* 通过消息主题获取消息信息
*
* @param topic
* @return
*/
public JNMatchResultMiniData getTopicMessage(String topic) {
if (carInfoMap.isEmpty()) {
return null;
}
return carInfoMap.get(topic);
}
@KafkaListener(topics = {"CSMatchResultMiniData_0"})
public void receiveCarInfoData(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
try {
log.info("收到kafka消息,消息主题:" + record.topic());
log.info("收到kafka消息,消息内容:" + record.value());
JNMatchResultMiniData jnMatchResultMiniData = JSONObject.parseObject((String)record.value(), JNMatchResultMiniData.class);
carInfoMap.put(record.topic(), jnMatchResultMiniData);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("监听主题消息异常, 异常信息", e);
}
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
return new Object();
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
String errorMessage = String.format("监听主题:%s,消费者详情:%s,异常信息:%s,消息详情:%s",
consumer.listTopics(), consumer.groupMetadata(), exception, message);
log.error(errorMessage);
return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
}
}
package net.wanji.web.kafka;
/**
* @author duanruiming
* @date 2023/05/17 16:52
*/
import lombok.Data;
import java.util.List;
@Data
public class JNMatchResultMiniData {
private String timeStamp;
private String globalTimeStamp;
private int participantNum;
private List<E1FrameParticipant> e1FrameParticipant;
private String orgCode;
@Data
public class E1FrameParticipant {
private int id;
private Roadnet roadnet;
private int originalType;
private int originalColor;
private double longitude;
private double latitude;
private double altitude;
private double speed;
private double courseAngle;
private String picLicense;
private int licenseColor;
private int baseLocation;
private int waitingTime;
private int stopNum;
@Data
public class Roadnet {
private String laneId;
private String rid;
private String segmentId;
private String crossId;
}
}
}
package net.wanji.web.task;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import net.wanji.web.common.constant.Constant;
import net.wanji.web.kafka.ConsumerHandler;
import net.wanji.web.kafka.JNMatchResultMiniData;
import net.wanji.web.websocket.WebSocketServer;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* @author duanruiming
* @date 2023/05/17 16:08
*/
@Component
@Slf4j
public class RealTimeCarTask {
@Resource
private ConsumerHandler consumerHandler;
@Scheduled(fixedRate = 1000)
public void pushRealTimeCarInfo() {
try {
Set<WebSocketServer> events = WebSocketServer.getWebSocketSet(Constant.WEBSOCKET_TOPIC_CAR_TRAIL_INFO);
for (WebSocketServer socketServer : events) {
String picLicense = socketServer.getUserId();
JNMatchResultMiniData jnMatchResultMiniData = consumerHandler.getTopicMessage("CSMatchResultMiniData_0");
if (Objects.nonNull(jnMatchResultMiniData) ) {
List<JNMatchResultMiniData.E1FrameParticipant> list = jnMatchResultMiniData.getE1FrameParticipant();
if (!CollectionUtils.isEmpty(list)) {
for (JNMatchResultMiniData.E1FrameParticipant e1FrameParticipant : list) {
int id = e1FrameParticipant.getId();
String license = e1FrameParticipant.getPicLicense();
if (3554 == e1FrameParticipant.getId() || StringUtils.equals(license, picLicense)) {
socketServer.sendMessage(JSONObject.toJSONString(e1FrameParticipant));
}
}
}
}
}
} catch (Exception e) {
log.error("通过车牌推送数据失败,异常信息", e);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment