Commit b16c7cba authored by hanbing's avatar hanbing

减少数据集样本取样数量

parent 4523781d
......@@ -13,5 +13,5 @@ import java.util.List;
@Repository
@DS("holo")
public interface EventInfoMapper {
List<EventInfoPO> selectLastTenMinutes();
List<EventInfoPO> selectHistEvent();
}
......@@ -2,7 +2,6 @@ package net.wanji.datacenter.task;
import lombok.extern.slf4j.Slf4j;
import net.wanji.datacenter.config.EventSaveConfiguration;
import net.wanji.datacenter.constant.EventAbnormalEnum;
import net.wanji.datacenter.mapper.EventInfoMapper;
import net.wanji.datacenter.mapper.HoloMapper;
import net.wanji.datacenter.pojo.po.DeviceCameraPO;
......@@ -21,8 +20,10 @@ import javax.annotation.Resource;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
......@@ -53,58 +54,68 @@ public class EventTask {
@Value("${kafka.producer.bootstrap.servers}")
String kafkaServers;
// 保存事件 Nvr 视频的线程池
private ExecutorService executor = Executors.newFixedThreadPool(40);
// 保存事件数据集时的样本数据编号
private Map<String, Long> eventTypeCountMap = new HashMap<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
20,
40,
1L,
TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(50),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
/**
* 每10分钟存储事件数据集
* 每30分钟存储事件数据集
*/
@Async
@Scheduled(cron="0 */10 * * * ? ")
@Scheduled(cron="0 */30 * * * ? ")
public void saveEvent() {
// 查询数据库中10分钟内的记录
List<EventInfoPO> eventInfoPOList = eventInfoMapper.selectLastTenMinutes();
// 定义需要保留的事件类型
List<String> validTypes = EventAbnormalEnum.getNotB5List();
List<EventInfoPO> filteredList = eventInfoPOList.stream()
.filter(e -> validTypes.contains(e.getType()))
.collect(Collectors.toList());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
for (EventInfoPO eventInfoPO : filteredList) {
String eventType = eventInfoPO.getType();
Date startDate = eventInfoPO.getStartTime();
String startDateStr = formatter.format(startDate);
String dateFolderName = startDateStr + "_" + eventType;
String key = dateFolderName + eventSaveConfiguration.getNegativeSample();
Long count = eventTypeCountMap.getOrDefault(key, -1L);
eventTypeCountMap.put(key, ++count);
String pathToDate = eventSaveConfiguration.getRootFolder() + File.separator
+ eventSaveConfiguration.getCity() + File.separator
+ eventType + File.separator
+ dateFolderName + File.separator;
// todo 暂时建立空文件夹
makeEmptyFolders(pathToDate, count);
// 保存标签
saveLabel(eventInfoPO, pathToDate, count);
// 保存轨迹
saveKafka(eventInfoPO, pathToDate, count, matchResultTopic);
// 保存灯态
saveKafka(eventInfoPO, pathToDate, count, lightStatusTopic);
// 保存录像
Long finalCount = count;
executor.submit(() -> saveNvr(eventInfoPO, pathToDate, finalCount));
List<EventInfoPO> eventInfoPOList = eventInfoMapper.selectHistEvent();
// 相同 eventType 只保留一条记录
List<EventInfoPO> uniqueList = eventInfoPOList.stream()
.collect(Collectors.toMap(
EventInfoPO::getType,
Function.identity(),
(existing, replacement) -> existing))
.values().stream()
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(uniqueList)) {
return;
}
// 随机选择一条记录
Random random = new Random();
EventInfoPO eventInfoPO = uniqueList.get(random.nextInt(uniqueList.size()));
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
String eventType = eventInfoPO.getType();
Date startDate = eventInfoPO.getStartTime();
String startDateStr = formatter.format(startDate);
String dateFolderName = startDateStr + "_" + eventType;
String key = dateFolderName + eventSaveConfiguration.getNegativeSample();
Long count = eventTypeCountMap.getOrDefault(key, -1L);
eventTypeCountMap.put(key, ++count);
String pathToDate = eventSaveConfiguration.getRootFolder() + File.separator
+ eventSaveConfiguration.getCity() + File.separator
+ eventType + File.separator
+ dateFolderName + File.separator;
// todo 暂时建立空文件夹
makeEmptyFolders(pathToDate, count);
// 保存标签
saveLabel(eventInfoPO, pathToDate, count);
// 保存轨迹
saveKafka(eventInfoPO, pathToDate, count, matchResultTopic);
// 保存灯态
saveKafka(eventInfoPO, pathToDate, count, lightStatusTopic);
// 保存录像
Long finalCount = count;
executor.submit(() -> saveNvr(eventInfoPO, pathToDate, finalCount));
}
private void saveNvr(EventInfoPO eventInfoPO, String pathToDate, Long count) {
......@@ -125,16 +136,14 @@ public class EventTask {
}
String monitorChannel = deviceCameraPO.getMonitorChannel();
List commend = new ArrayList();
List<String> commend = new ArrayList<>();
commend.add("ffmpeg");
commend.add("-i");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss");
// 格式化 startTime 和 endTime
String starttimeStr = sdf.format(startTimeStamp.getTime() - 30 * 1000);
String endtimeStr = sdf.format(endTimeStamp.getTime() + 30 * 1000);
// 时间参数格式 2023_05_16_10_09_00
commend.add(nvrurl + "?channel=" + monitorChannel + "&subtype=" + eventSaveConfiguration.getSubType()
+ "&starttime=" + starttimeStr + "&endtime=" + endtimeStr);
......@@ -151,21 +160,19 @@ public class EventTask {
makeDirIfNotExist(savepath);
commend.add(savepath + "/" + fileName + ".mp4");
try {
ProcessBuilder builder = new ProcessBuilder(new String[0]);
builder.command(commend);
builder.redirectErrorStream(true);
Process p = builder.start();
BufferedReader buf = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line;
while ((line = buf.readLine()) != null) {
log.info("[" + nvrurl + "],ffmpeg输出的信息(视频回放):" + line);
ProcessBuilder builder = new ProcessBuilder(commend);
builder.redirectErrorStream(true);
try (ProcessCloser processCloser = new ProcessCloser(builder.start())) {
// 设置超时时间
if (!processCloser.process.waitFor(15, TimeUnit.MINUTES)) { // 最长执行时间为15分钟
// 超时处理
log.warn("ffmpeg 命令执行超时,已被强制终止。");
}
} catch (Exception e) {
log.error("[" + nvrurl + "],获取失败 !", e);
log.error("[" + nvrurl + "], 获取失败 !", e);
}
}
}
private void saveKafka(EventInfoPO eventInfoPO, String pathToDate, Long count, String topic) {
......@@ -192,7 +199,15 @@ public class EventTask {
B5LabelVO vo = new B5LabelVO();
vo.setCategory(Integer.parseInt(eventInfoPO.getCategory()));
vo.setEventType(Integer.parseInt(eventInfoPO.getType()));
vo.setTargetIdList(new ArrayList<>());
String globalId = eventInfoPO.getGlobalId();
if (globalId == null) {
vo.setTargetIdList(new ArrayList<>());
} else {
List<Integer> targetList = new ArrayList<>();
targetList.add(Integer.parseInt(globalId));
vo.setTargetIdList(targetList);
}
vo.setCrossId(eventInfoPO.getCrossId());
vo.setRid(eventInfoPO.getRid());
......
package net.wanji.datacenter.task;
class ProcessCloser implements AutoCloseable {
Process process;
public ProcessCloser(Process process) {
this.process = process;
}
@Override
public void close() {
process.destroy();
}
}
\ No newline at end of file
......@@ -8,10 +8,10 @@
station_id,event_id
</sql>
<select id="selectLastTenMinutes" resultType="net.wanji.datacenter.pojo.po.EventInfoPO">
<select id="selectHistEvent" resultType="net.wanji.datacenter.pojo.po.EventInfoPO">
SELECT <include refid="Base_Column_List"/>
FROM t_event_info
WHERE ruksj >= NOW() - INTERVAL 10 MINUTE
WHERE ruksj >= NOW() - INTERVAL 30 MINUTE
</select>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment