Commit e19121e3 authored by zhoushiguang's avatar zhoushiguang

行人指标区域加载问题修改

parent ede5bef5
...@@ -46,10 +46,8 @@ public class PedestrianDataCache { ...@@ -46,10 +46,8 @@ public class PedestrianDataCache {
private Map<String, String> loadData(String file, int type) { private Map<String, String> loadData(String file, int type) {
Map<String, String> retMap = new HashMap<>(); Map<String, String> retMap = new HashMap<>();
String filePath = PedestrianDataCache.class.getResource(file).getPath(); try (InputStream inputStream = PropertiesHelper.class.getResourceAsStream(file);
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, Charset.defaultCharset()));
try (FileReader fr = new FileReader(filePath);
BufferedReader br = new BufferedReader(fr);
) { ) {
StringBuilder jsonBuilder = new StringBuilder(); StringBuilder jsonBuilder = new StringBuilder();
String str = null; String str = null;
...@@ -57,6 +55,7 @@ public class PedestrianDataCache { ...@@ -57,6 +55,7 @@ public class PedestrianDataCache {
while ((str = br.readLine()) != null) { while ((str = br.readLine()) != null) {
jsonBuilder.append(str); jsonBuilder.append(str);
} }
log.info("加载行人区域信息,路径:{},数据:{}",file,jsonBuilder.toString());
if (jsonBuilder.length() > 0) { if (jsonBuilder.length() > 0) {
JSONObject jsonObject = JSON.parseObject(jsonBuilder.toString()); JSONObject jsonObject = JSON.parseObject(jsonBuilder.toString());
JSONArray jsonArray = jsonObject.getJSONArray("features"); JSONArray jsonArray = jsonObject.getJSONArray("features");
......
...@@ -8,6 +8,7 @@ import com.wanji.indicators.model.event.accident.AccidentShowModel; ...@@ -8,6 +8,7 @@ import com.wanji.indicators.model.event.accident.AccidentShowModel;
import com.wanji.indicators.sink.AccidentEventFileSink; import com.wanji.indicators.sink.AccidentEventFileSink;
import com.wanji.indicators.sink.EventFileSink; import com.wanji.indicators.sink.EventFileSink;
import com.wanji.indicators.source.ConflictTurnSource; import com.wanji.indicators.source.ConflictTurnSource;
import com.wanji.indicators.source.CongestionSource;
import com.wanji.indicators.source.ConsulAreaSource; import com.wanji.indicators.source.ConsulAreaSource;
import com.wanji.indicators.util.PropertiesHelper; import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
...@@ -84,7 +85,7 @@ public class AccidentEventMainNew implements Serializable { ...@@ -84,7 +85,7 @@ public class AccidentEventMainNew implements Serializable {
//匹配拥堵检测区域 //匹配拥堵检测区域
// SingleOutputStreamOperator<AccidentFrameModel> checkCongestionArea = accidentFrameModelFilter // SingleOutputStreamOperator<AccidentFrameModel> checkCongestionArea = accidentFrameModelFilter
// .connect(env.addSource(new ConsulAreaSource()).broadcast()) // .connect(env.addSource(new CongestionSource()).broadcast())
// .flatMap(new AccidentCheckCongestionAreaCoFlatMap()) // .flatMap(new AccidentCheckCongestionAreaCoFlatMap())
// .setParallelism(1) // .setParallelism(1)
// .name("事故检测-匹配拥堵检测区域"); // .name("事故检测-匹配拥堵检测区域");
......
package com.wanji.indicators.source;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.service.BaseLaneInfoService;
import com.wanji.indicators.service.impl.BaseLaneInfoServiceImpl;
import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.ws.WebSocket;
import org.asynchttpclient.ws.WebSocketListener;
import org.asynchttpclient.ws.WebSocketUpgradeHandler;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import javax.xml.bind.DatatypeConverter;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author fengyi
* @date 2023/10/22
* @description
*/
@Slf4j
public class LightStatusKafkaSource extends RichSourceFunction<String> {
private volatile boolean running = true;
KafkaSource<String> lightStatusSource = null;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
PropertiesHelper instance = PropertiesHelper.getInstance();
lightStatusSource = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "false")
.setBootstrapServers(instance.getProperties().getProperty("bootstrap.servers"))
.setTopics("cross_lights_status")
.setGroupId(UUID.randomUUID().toString())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
}
@Override
public void cancel() {
log.info("cancel function called");
running = false;
}
}
...@@ -631,7 +631,7 @@ FROM ...@@ -631,7 +631,7 @@ FROM
tb.rid, tb.rid,
tb.in_dir, tb.in_dir,
tb.lane_turn, tb.lane_turn,
CAST(ROUND(SUM( tb.traffic_flow * tb.flow_coefficient )) AS INT) traffic_flow, CAST(ROUND(SUM( tb.traffic_flow * tb.flow_coefficient ),2) AS INT) traffic_flow,
AVG( tb.mean_v * tb.flow_coefficient) mean_v, AVG( tb.mean_v * tb.flow_coefficient) mean_v,
MAX( tb.queue_length ) queue_length, MAX( tb.queue_length ) queue_length,
AVG( tb.green_light_efficiency ) green_light_efficiency, AVG( tb.green_light_efficiency ) green_light_efficiency,
......
...@@ -47,7 +47,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -47,7 +47,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
//绿灯总时长 //绿灯总时长
private transient MapState<String, Integer> greenTotalTimeState; private transient MapState<String, Integer> greenTotalTimeState;
//绿灯期间各方向上斑马线内人数 //绿灯期间各方向上斑马线内人数
private MapState<String, List<PedCrossingWalkModel>> bmxAreaResultState; private transient MapState<String, List<PedCrossingWalkModel>> bmxAreaResultState;
//各方向上等待区内的行人目标 //各方向上等待区内的行人目标
private transient MapState<String, List<PedCrossingWalkModel>> waitingAreaResultState; private transient MapState<String, List<PedCrossingWalkModel>> waitingAreaResultState;
...@@ -101,6 +101,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -101,6 +101,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
// System.out.println(); // System.out.println();
// } // }
List<CrossRidTurnLampStatusModel> value1 = lightEntry.getValue(); List<CrossRidTurnLampStatusModel> value1 = lightEntry.getValue();
Integer ridDir = value1.get(0).getDir();
//绿灯放行出现的行人目标 //绿灯放行出现的行人目标
List<CarTrackModel> runList = groupTrackByDir.get(rid); List<CarTrackModel> runList = groupTrackByDir.get(rid);
Integer agoGreenTime = greenRemainTimeState.get(rid); Integer agoGreenTime = greenRemainTimeState.get(rid);
...@@ -131,7 +132,6 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -131,7 +132,6 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
// =========================================================================================================// // =========================================================================================================//
// 相位时长倒计时 // 相位时长倒计时
greenRemainTimeState.put(rid, value1.get(0).getCyclePhaseCountDown()); greenRemainTimeState.put(rid, value1.get(0).getCyclePhaseCountDown());
lightStatusState.put(rid, value1.get(0).getLampState());
List<PedCrossingWalkModel> agoMidResult = bmxAreaResultState.get(rid); List<PedCrossingWalkModel> agoMidResult = bmxAreaResultState.get(rid);
if (agoMidResult == null) { if (agoMidResult == null) {
...@@ -139,9 +139,10 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -139,9 +139,10 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
} }
if (isCanWalk) { if (isCanWalk) {
// log.info("行人绿灯长:rid={},行人绿灯剩余时长={}s,行人绿灯总时长={}s,时间:{},当前灯态:{},上次灯态:{},方向:{}", // if (rid.equals("13NG40B5SK013NI00B5RM00")) {
// rid, greenRemainTimeState.get(rid), greenTotalTimeState.get(rid), DateUtil.toDateTime(globalTimeStamp, "yyyy-MM-dd HH:mm:ss.SSS"),nowLightColor,agoLightColor,lightDir); // log.info("行人绿灯长:rid={},行人绿灯剩余时长={}s,行人绿灯总时长={}s,时间:{},当前灯态:{},上次灯态:{},方向:{}",
// rid, greenRemainTimeState.get(rid), greenTotalTimeState.get(rid), DateUtil.toDateTime(globalTimeStamp, "yyyy-MM-dd HH:mm:ss.SSS"), nowLightColor, agoLightColor, lightDir);
// }
if (runList != null && !runList.isEmpty()) { if (runList != null && !runList.isEmpty()) {
//按rid分组 //按rid分组
Map<String, List<CarTrackModel>> groupByRid = runList.stream().collect(Collectors.groupingBy(o -> o.getRoadnet().getRid())); Map<String, List<CarTrackModel>> groupByRid = runList.stream().collect(Collectors.groupingBy(o -> o.getRoadnet().getRid()));
...@@ -157,8 +158,9 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -157,8 +158,9 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
if (bmxGeom != null) { if (bmxGeom != null) {
//是否在斑马线区域内 //是否在斑马线区域内
boolean isInBmxArea = GeomsConvertUtil.withinGeo(lng + "," + lat, bmxGeom); boolean isInBmxArea = GeomsConvertUtil.withinGeo(lng + "," + lat, bmxGeom);
if (isInBmxArea) if (isInBmxArea) {
inBmxAreaPedList.add(pedModel); inBmxAreaPedList.add(pedModel);
}
} }
} }
...@@ -173,21 +175,32 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -173,21 +175,32 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
if (Objects.equals(nowId, agoId)) { if (Objects.equals(nowId, agoId)) {
agoModel.setNowTime(globalTimeStamp); agoModel.setNowTime(globalTimeStamp);
//偏离区域后又进入到区域,重置离开标记 //偏离区域后又进入到区域,重置离开标记
if (agoModel.isLeave()) { // if (agoModel.isLeave()) {
agoModel.setLeave(false); // agoModel.setLeave(false);
agoModel.setLeaveTime(0); // agoModel.setLeaveTime(0);
} // }
isNew = false; isNew = false;
double speedDiff = nowModel.getSpeed() - agoModel.getSpeed(); double speedDiff = nowModel.getSpeed() - agoModel.getAccelerateStartSpeed();
//速度差大于0.5m/s,转换为km/h //速度差大于0.5m/s,转换为km/h
if (speedDiff >= 0.5 * 3.6) { if (speedDiff >= 0.5 * 3.6) {
agoModel.setAcceleratedPassDuration(globalTimeStamp - agoModel.getAcceleratedPassStartTime()); //第一次加速
agoModel.setAcceleratedPassStartTime(globalTimeStamp); if (agoModel.getAcceleratedPassStartTime()==0) {
agoModel.setAcceleratedPassDuration(0);
agoModel.setAcceleratedPassStartTime(globalTimeStamp);
agoModel.setAccelerateStartSpeed(agoModel.getSpeed());
}else {
agoModel.setAcceleratedPassDuration(globalTimeStamp - agoModel.getAcceleratedPassStartTime());
}
//持续时长大于5秒,设置为加速通过标记 //持续时长大于5秒,设置为加速通过标记
if (agoModel.getAcceleratedPassDuration() > 5 * 1000) { if (agoModel.getAcceleratedPassDuration() > 5 * 1000) {
agoModel.setAcceleratedPass(true); agoModel.setAcceleratedPass(true);
} }
} else {
agoModel.setAcceleratedPass(false);
agoModel.setAcceleratedPassDuration(0);
agoModel.setAcceleratedPassStartTime(0);
agoModel.setAccelerateStartSpeed(0);
} }
} }
...@@ -208,6 +221,14 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -208,6 +221,14 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
} }
} }
agoMidResult.addAll(newList); agoMidResult.addAll(newList);
//时间戳format
for (PedCrossingWalkModel model : agoMidResult){
model.setArriveTimeStr(DateUtil.toDateTime(model.getArriveTime(),DateUtil.YYYY_MM_DD_HH_MM_SS));
model.setNowTimeStr(DateUtil.toDateTime(model.getNowTime(),DateUtil.YYYY_MM_DD_HH_MM_SS));
if (model.getLeaveTime()>0) {
model.setLeaveTimeStr(DateUtil.toDateTime(model.getLeaveTime(), DateUtil.YYYY_MM_DD_HH_MM_SS));
}
}
if (flag) { if (flag) {
//记录第一次绿灯时进入斑马线区域的对象 //记录第一次绿灯时进入斑马线区域的对象
...@@ -222,20 +243,28 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -222,20 +243,28 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
} }
} }
} else {//绿灯结束时,输出计算结果 } else {//进入红灯状态
if (!agoMidResult.isEmpty()) {
PedCrossingWalkIndicatorResult result = this.getOutResult(value.getCrossId(), rid, globalTimeStamp); //上一次存储的灯态
String lampState = lightStatusState.get(rid);
if (lampState != null && (lampState.equals(LightStatusEnum.GREEN.getType())||lampState.equals(LightStatusEnum.YELLOW.getType()))){
// if (rid.equals("13NG40B5SK013NI00B5RM00")) {
// log.info("灯态由绿灯或黄灯变为红灯时:rid={},行人绿灯剩余时长={}s,行人绿灯总时长={}s,时间:{},当前灯态:{},上次灯态:{},方向:{}",
// rid, greenRemainTimeState.get(rid), greenTotalTimeState.get(rid), DateUtil.toDateTime(globalTimeStamp, "yyyy-MM-dd HH:mm:ss.SSS"), nowLightColor, agoLightColor, lightDir);
// }
//灯态由绿灯或黄灯变为红灯时
PedCrossingWalkIndicatorResult result = this.getOutResult(value.getCrossId(), rid, ridDir, globalTimeStamp,greenTotalTimeState.get(rid));
out.collect(result); out.collect(result);
} }
// if (agoGreenTime != null && agoGreenTime==1) {
// greenTotalTimeState.remove(rid);
// }
greenTotalTimeState.remove(rid);
bmxAreaResultState.remove(rid); bmxAreaResultState.remove(rid);
greenTotalTimeState.remove(rid);
lightStatusState.remove(rid); lightStatusState.remove(rid);
greenRemainTimeState.remove(rid); greenRemainTimeState.remove(rid);
} }
lightStatusState.put(rid, value1.get(0).getLampState());
} }
Iterator<Map.Entry<String, List<PedCrossingWalkModel>>> waitIterator = waitingAreaResultState.iterator(); Iterator<Map.Entry<String, List<PedCrossingWalkModel>>> waitIterator = waitingAreaResultState.iterator();
...@@ -261,6 +290,8 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -261,6 +290,8 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
private PedCrossingWalkModel toPedInfo(CarTrackModel pedModel) { private PedCrossingWalkModel toPedInfo(CarTrackModel pedModel) {
PedCrossingWalkModel pedCrossingWalkModel = new PedCrossingWalkModel(); PedCrossingWalkModel pedCrossingWalkModel = new PedCrossingWalkModel();
pedCrossingWalkModel.setRid(pedModel.getRoadnet().getRid());
pedCrossingWalkModel.setDir(pedModel.getRoadnet().getRidDir8());
pedCrossingWalkModel.setObjectId(pedModel.getId()); pedCrossingWalkModel.setObjectId(pedModel.getId());
pedCrossingWalkModel.setArriveTime(pedModel.getGlobalTimeStamp()); pedCrossingWalkModel.setArriveTime(pedModel.getGlobalTimeStamp());
pedCrossingWalkModel.setDir(pedModel.getRoadnet().getRidDir8()); pedCrossingWalkModel.setDir(pedModel.getRoadnet().getRidDir8());
...@@ -269,6 +300,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -269,6 +300,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
pedCrossingWalkModel.setNowTime(pedModel.getGlobalTimeStamp()); pedCrossingWalkModel.setNowTime(pedModel.getGlobalTimeStamp());
pedCrossingWalkModel.setSpeed(pedModel.getSpeed()); pedCrossingWalkModel.setSpeed(pedModel.getSpeed());
pedCrossingWalkModel.setArriveSpeed(pedModel.getSpeed()); pedCrossingWalkModel.setArriveSpeed(pedModel.getSpeed());
return pedCrossingWalkModel; return pedCrossingWalkModel;
} }
...@@ -280,7 +312,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -280,7 +312,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
* @param globalTimeStamp * @param globalTimeStamp
* @return * @return
*/ */
private PedCrossingWalkIndicatorResult getOutResult(String crossId, String rid, Long globalTimeStamp) throws Exception { private PedCrossingWalkIndicatorResult getOutResult(String crossId, String rid, Integer ridDir,Long globalTimeStamp,Integer pedGreenTime) throws Exception {
//绿灯时间内路口总人数 //绿灯时间内路口总人数
int size = 0; int size = 0;
//离开区域的人数 //离开区域的人数
...@@ -299,6 +331,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -299,6 +331,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
//各方向斑马线上行人目标 //各方向斑马线上行人目标
Iterator<Map.Entry<String, List<PedCrossingWalkModel>>> iterator = bmxAreaResultState.iterator(); Iterator<Map.Entry<String, List<PedCrossingWalkModel>>> iterator = bmxAreaResultState.iterator();
Set<String> ridKeys = new HashSet<>(); Set<String> ridKeys = new HashSet<>();
//斑马线行人
List<PedCrossingWalkModel> walkingList = new ArrayList<>(); List<PedCrossingWalkModel> walkingList = new ArrayList<>();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Map.Entry<String, List<PedCrossingWalkModel>> entry = iterator.next(); Map.Entry<String, List<PedCrossingWalkModel>> entry = iterator.next();
...@@ -308,43 +341,61 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -308,43 +341,61 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
outSize += ridData.stream().filter(o -> o.isLeave()).count(); outSize += ridData.stream().filter(o -> o.isLeave()).count();
totalPassTime += ridData.stream().filter(o -> o.isLeave()).mapToDouble(o -> o.getLeaveTime() - o.getArriveTime()).sum(); totalPassTime += ridData.stream().filter(o -> o.isLeave()).mapToDouble(o -> o.getLeaveTime() - o.getArriveTime()).sum();
accelerateCount += ridData.stream().filter(o -> o.isAcceleratedPass()).count(); accelerateCount += ridData.stream().filter(o -> o.isAcceleratedPass()).count();
ridKeys.add(key); ridKeys.add(key);
walkingList.addAll(ridData); walkingList.addAll(ridData);
} }
long totalWaitTime = 0; long totalWaitTime = 0;
//从等待区进入到斑马线区域的行人目标id
Set<Integer> idSets = new HashSet<>(); Set<Integer> idSets = new HashSet<>();
for (PedCrossingWalkModel walkModel : walkingList) { for (PedCrossingWalkModel walkModel : walkingList) {
Integer id = walkModel.getObjectId(); Integer id = walkModel.getObjectId();
Optional<PedCrossingWalkModel> waitOption = waitingList.stream().filter(o -> o.getObjectId().equals(id)).findAny(); Optional<PedCrossingWalkModel> waitOption = waitingList.stream().filter(o -> o.getObjectId().equals(id)).findAny();
if (waitOption.isPresent()) { if (waitOption.isPresent()) {
//第一次进入斑马线时间 - 第一次进入等待区时间 //第一次进入斑马线时间 - 第一次进入等待区时间
totalWaitTime += walkModel.getArriveTime() - waitOption.get().getArriveTime(); if (walkModel.getArriveTime()>waitOption.get().getArriveTime()) {
totalWaitTime += walkModel.getArriveTime() - waitOption.get().getArriveTime();
idSets.add(id);
}
} }
idSets.add(id);
} }
//平均等待时长 //平均等待时长
double avgWaitTime = totalWaitTime / walkingList.size(); double avgWaitTime = 0;
if (avgWaitTime < 0){ if (!idSets.isEmpty()) {
avgWaitTime = 0; avgWaitTime = totalWaitTime / idSets.size();
if (avgWaitTime < 0) {
avgWaitTime = 0;
}
} }
//平均等待人数 //平均等待人数
long waitingCount = waitingList.stream().filter(o -> o.isWaiting()).count(); long waitingCount = waitingList.stream().filter(o -> o.isWaiting()).count();
//绿灯开始时间 //绿灯开始时间
long startTime = globalTimeStamp - greenTotalTimeState.get(rid) * 1000; long startTime = globalTimeStamp - pedGreenTime * 1000;
//绿灯结束时间 //绿灯结束时间
long endTime = globalTimeStamp; long endTime = globalTimeStamp;
//平均通过率 //平均通过率
double avgPassedRate = ArithOfBigDecmial.div(outSize, size); double avgPassedRate = 0;
//平均通过时长 //平均通过时长
double avgPassTime = ArithOfBigDecmial.div(totalPassTime, size); double avgPassTime = 0;
//绿灯计算时加速通过率 //绿灯计算时加速通过率
double accRate = ArithOfBigDecmial.div(accelerateCount, size); double accRate = 0;
if (size>0) {
avgPassedRate = ArithOfBigDecmial.div(outSize, size);
//绿灯计算时加速通过率
accRate = ArithOfBigDecmial.div(accelerateCount, size);
}
if (outSize>0){
//平均通过时长
avgPassTime = ArithOfBigDecmial.div(totalPassTime, outSize);
}
PedCrossingWalkIndicatorResult result = new PedCrossingWalkIndicatorResult(); PedCrossingWalkIndicatorResult result = new PedCrossingWalkIndicatorResult();
result.setCrossId(crossId); result.setCrossId(crossId);
result.setRid(rid);
result.setRidDir(ridDir);
result.setStartTime(startTime); result.setStartTime(startTime);
result.setStartTimeStr(DateUtil.toDateTime(startTime,DateUtil.YYYY_MM_DD_HH_MM_SS)); result.setStartTimeStr(DateUtil.toDateTime(startTime,DateUtil.YYYY_MM_DD_HH_MM_SS));
result.setEndTime(endTime); result.setEndTime(endTime);
...@@ -355,6 +406,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C ...@@ -355,6 +406,7 @@ public class PedIndicationProcessFunction extends KeyedProcessFunction<String, C
result.setPedCount(size); result.setPedCount(size);
result.setWaitingCount(waitingCount); result.setWaitingCount(waitingCount);
result.setAvgWaitTime(avgWaitTime); result.setAvgWaitTime(avgWaitTime);
result.setPedGreenTime(pedGreenTime);
log.info("绿灯结束时指标结果输出:{}", result); log.info("绿灯结束时指标结果输出:{}", result);
......
...@@ -13,7 +13,10 @@ import java.io.Serializable; ...@@ -13,7 +13,10 @@ import java.io.Serializable;
public class PedCrossingWalkIndicatorResult implements Serializable { public class PedCrossingWalkIndicatorResult implements Serializable {
private String crossId; private String crossId;
private String rid;
private Integer ridDir ;
//行人绿灯时长
Integer pedGreenTime;
//绿灯开始时间 //绿灯开始时间
long startTime ; long startTime ;
String startTimeStr; String startTimeStr;
...@@ -28,7 +31,6 @@ public class PedCrossingWalkIndicatorResult implements Serializable { ...@@ -28,7 +31,6 @@ public class PedCrossingWalkIndicatorResult implements Serializable {
double accRate ; double accRate ;
//人数 //人数
private int pedCount; private int pedCount;
//平均等待时长 //平均等待时长
double avgWaitTime ; double avgWaitTime ;
//平均等待人数 //平均等待人数
......
...@@ -19,12 +19,15 @@ public class PedCrossingWalkModel implements Serializable { ...@@ -19,12 +19,15 @@ public class PedCrossingWalkModel implements Serializable {
private double passTime; private double passTime;
//第一次进入时间 //第一次进入时间
private long arriveTime; private long arriveTime;
private String arriveTimeStr;
//离开时间 //离开时间
private long leaveTime; private long leaveTime;
private String leaveTimeStr;
//是否离开斑马线区域标记 //是否离开斑马线区域标记
private boolean isLeave; private boolean isLeave;
//当前时间 //当前时间
private long nowTime; private long nowTime;
private String nowTimeStr;
//实时速度 //实时速度
private double speed; private double speed;
//初始进入时速度 //初始进入时速度
...@@ -35,6 +38,8 @@ public class PedCrossingWalkModel implements Serializable { ...@@ -35,6 +38,8 @@ public class PedCrossingWalkModel implements Serializable {
private long acceleratedPassDuration; private long acceleratedPassDuration;
//加速通过开始时间 //加速通过开始时间
private long acceleratedPassStartTime; private long acceleratedPassStartTime;
//加速开始速度
private double accelerateStartSpeed;
//经度 //经度
private double lng; private double lng;
//纬度 //纬度
......
package com.wanji.indicators.task.pedestrian.stream; package com.wanji.indicators.task.pedestrian.stream;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.event.cross.conflictpoint.func.ConflictLaneInfoCoFlatMap;
import com.wanji.indicators.event.stream.func.CrossFrameFlatMap; import com.wanji.indicators.event.stream.func.CrossFrameFlatMap;
import com.wanji.indicators.model.CrossFrameModel; import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.CrossRidTurnLampStatusModel; import com.wanji.indicators.model.CrossRidTurnLampStatusModel;
import com.wanji.indicators.source.LaneInfoSource; import com.wanji.indicators.source.LaneInfoSource;
import com.wanji.indicators.source.websocket.LightStatusWebSocketSource; import com.wanji.indicators.source.websocket.LightStatusWebSocketSource;
import com.wanji.indicators.task.pedestrian.service.func.PedIndicationProcessFunction; import com.wanji.indicators.task.pedestrian.service.func.*;
import com.wanji.indicators.task.pedestrian.service.func.RefPedRidInfoCoFlatMap;
import com.wanji.indicators.task.pedestrian.service.func.TrafficLightStatusFlatMap;
import com.wanji.indicators.task.pedestrian.service.func.TrafficLightStatusInfoCoFlatMap;
import com.wanji.indicators.task.pedestrian.service.model.PedCrossingWalkIndicatorResult; import com.wanji.indicators.task.pedestrian.service.model.PedCrossingWalkIndicatorResult;
import com.wanji.indicators.util.PropertiesHelper; import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
...@@ -46,6 +47,7 @@ public class PedestrianMain { ...@@ -46,6 +47,7 @@ public class PedestrianMain {
PropertiesHelper instance = PropertiesHelper.getInstance(); PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties(); Properties properties = instance.getProperties();
String topic = properties.getProperty("consumer.topic"); String topic = properties.getProperty("consumer.topic");
String kafkaServer = properties.getProperty("bootstrap.servers");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); env.setParallelism(1);
...@@ -62,7 +64,7 @@ public class PedestrianMain { ...@@ -62,7 +64,7 @@ public class PedestrianMain {
.setProperty("auto.offset.commit", "true") .setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000") .setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "false") .setProperty("commit.offsets.on.checkpoint", "false")
.setBootstrapServers(properties.getProperty("bootstrap.servers")) .setBootstrapServers(kafkaServer)
.setTopics(topic) .setTopics(topic)
.setGroupId(GROUP_ID + UUID.randomUUID().toString()) .setGroupId(GROUP_ID + UUID.randomUUID().toString())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
...@@ -76,12 +78,13 @@ public class PedestrianMain { ...@@ -76,12 +78,13 @@ public class PedestrianMain {
.setProperty("auto.offset.commit", "true") .setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000") .setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "false") .setProperty("commit.offsets.on.checkpoint", "false")
.setBootstrapServers(properties.getProperty("bootstrap.servers")) .setBootstrapServers(kafkaServer)
.setTopics("cross_lights_status") .setTopics("cross_lights_status")
.setGroupId(GROUP_ID + UUID.randomUUID().toString()) .setGroupId(GROUP_ID + UUID.randomUUID().toString())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build(); .build();
DataStream<String> lightStatusStream = env DataStream<String> lightStatusStream = env
.fromSource(lightStatusSource, WatermarkStrategy.noWatermarks(), "cross_lights_status-data-source"); .fromSource(lightStatusSource, WatermarkStrategy.noWatermarks(), "cross_lights_status-data-source");
...@@ -103,13 +106,18 @@ public class PedestrianMain { ...@@ -103,13 +106,18 @@ public class PedestrianMain {
// }); // });
//连接灯态数据源 //连接灯态数据源
ConnectedStreams<CrossFrameModel, Map<String,List<CrossRidTurnLampStatusModel>>> connStream = thinningDataStream.connect(turnLightStatusStream); ConnectedStreams<CrossFrameModel, Map<String,List<CrossRidTurnLampStatusModel>>> connStream =
thinningDataStream.connect(turnLightStatusStream);
SingleOutputStreamOperator<CrossFrameModel> bindLightStatusStream = SingleOutputStreamOperator<CrossFrameModel> bindLightStatusStream =
connStream.flatMap(new TrafficLightStatusInfoCoFlatMap()).setParallelism(1).name("全域轨迹帧数据-路口灯态绑定"); connStream
.flatMap(new TrafficLightStatusInfoCoFlatMap())
.setParallelism(1)
.name("全域轨迹帧数据-路口灯态绑定");
SingleOutputStreamOperator<CrossFrameModel> joinLaneStream = bindLightStatusStream SingleOutputStreamOperator<CrossFrameModel> joinLaneStream = bindLightStatusStream
.flatMap(new RefPedRidInfoCoFlatMap()) .connect(env.addSource(new LaneInfoSource()).broadcast())
.flatMap(new BindPedRidInfoCoFlatMap())
.setParallelism(1) .setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定"); .name("全域轨迹帧数据-车道属性数据绑定");
...@@ -129,11 +137,17 @@ public class PedestrianMain { ...@@ -129,11 +137,17 @@ public class PedestrianMain {
try { try {
FlinkKafkaProducer producer = new FlinkKafkaProducer<>("analysis.pedestrian.indicators", KafkaSink<String> sink = KafkaSink.<String>builder()
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), .setBootstrapServers(kafkaServer)
instance.getProducerProperties(), FlinkKafkaProducer.Semantic.NONE); .setRecordSerializer(KafkaRecordSerializationSchema.builder()
//存储到kafka .setTopic("analysis.pedestrian.indicators")
toJsonStream.addSink(producer).name("行人指标计算-数据发送至kafka").setParallelism(1); .setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
toJsonStream.sinkTo(sink).name("行人指标计算-数据发送至kafka").setParallelism(1);
env.execute("行人指标计算"); env.execute("行人指标计算");
} catch (Exception e) { } catch (Exception e) {
......
package com.wanji.indicators.util; package com.wanji.indicators.util;
import org.joda.time.DateTime;
/** /**
* @author fengyi * @author fengyi
* @date 2023/1/12 * @date 2023/1/12
...@@ -7,6 +9,14 @@ package com.wanji.indicators.util; ...@@ -7,6 +9,14 @@ package com.wanji.indicators.util;
*/ */
public class CarNumGenerator { public class CarNumGenerator {
public static void main(String[] args) {
DateTime currentDateTime = new DateTime();
DateTime dateTime = currentDateTime.withMillisOfDay(0).minusHours(-4);
dateTime = currentDateTime.withTimeAtStartOfDay();
System.out.println(DateUtil.toDateTime(dateTime.getMillis(),DateUtil.YYYY_MM_DD_HH_MM_SS));
}
/** /**
* 中国各个地区的数组 * 中国各个地区的数组
*/ */
...@@ -45,8 +55,4 @@ public class CarNumGenerator { ...@@ -45,8 +55,4 @@ public class CarNumGenerator {
return area + cityCode + sb; return area + cityCode + sb;
} }
public static void main(String[] args) {
String carNum = getCarNum();
System.out.println(carNum);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment