Commit aa67a4c4 authored by duanruiming's avatar duanruiming

Merge remote-tracking branch 'origin/master'

parents 458fe1c4 a2db0c0c
......@@ -50,6 +50,8 @@ public class BaseGisLaneCache {
private static Map<String, CarTrackModel.RoadNet> roadnetMap = new ConcurrentHashMap<>();
private static Map<String, String> laneRefMap = new ConcurrentHashMap<>();
private BaseGisLaneCache() {
}
......@@ -66,8 +68,9 @@ public class BaseGisLaneCache {
dataDirectory += "/";
}
String f4 = dataDirectory + "gis-lane.geojson";
;//BaseLaneInfo
instance.loadLaneData(f4);
String f5 = dataDirectory + "gis-lane-ref.csv";
laneRefMap.putAll(instance.loadGlobalLaneRef(f5));
}
return instance;
}
......@@ -185,6 +188,27 @@ public class BaseGisLaneCache {
return retMap;
}
private Map<String, String> loadGlobalLaneRef(String file) {
Map<String, String> retMap = new HashMap<>();
log.info("加载车道关系数据信息,路径:{},数据:{}", file);
try (InputStream inputStream = PropertiesHelper.class.getResourceAsStream(file);
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, Charset.defaultCharset()));
) {
String str = br.readLine();
// 按行读取字符串
while ((str = br.readLine()) != null) {
String globalLaneNo = str.substring(str.lastIndexOf(",")+1);
String crossLane = str.substring(0,str.lastIndexOf(","));
retMap.put(globalLaneNo,crossLane);
}
} catch (Exception e) {
e.printStackTrace();
}
return retMap;
}
public CarTrackModel.RoadNet queryRoadNet(String pxy) {
Point point = GeomsConvertUtil.createPoint(pxy) ;
......@@ -201,10 +225,11 @@ public class BaseGisLaneCache {
}
public Map<String, CarTrackModel.RoadNet> getRoadnetMap() {
return roadnetMap;
}
public Map<String, String> getLaneRefMap() {
return laneRefMap;
}
}
package com.wanji.indicators.task.beihang;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.util.ArithOfBigDecmial;
import com.wanji.indicators.util.CommonUtil;
import com.wanji.indicators.util.DateUtil;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class BeiHangTrackFrameB3DataFile implements SourceFunction<QueueLengthBean> {
private static final Logger log = LoggerFactory.getLogger(BeiHangTrackFrameB3DataFile.class);
private volatile boolean running = true;
private String directory;
private long startTime;
private long endTime;
public BeiHangTrackFrameB3DataFile(String directory, long startTime, long endTime) {
this.directory = directory;
this.startTime = startTime;
this.endTime = endTime;
}
//获取文件完整路径
private String[] getFilePath() {
List<String> indexList = new ArrayList<>();
///data/track_partitions/dt=2023-08-09/ht=11/mt=00
DateTime start = new DateTime(this.startTime);
DateTime end = new DateTime(this.endTime);
while (start.isBefore(end)) {
String dt = start.toString("yyyyMMdd");
String directory = this.directory + "/" + dt;
if (!new File(directory).exists()) {
start = start.plusDays(1);
continue;
}
IOFileFilter ioFileFilter = FileFilterUtils.prefixFileFilter("2024");
List<File> fileList = FileUtils.listFiles(new File(directory), ioFileFilter, null).stream().collect(Collectors.toList());
fileList = fileList.stream().sorted(Comparator.comparing(File::getName)).collect(Collectors.toList());
String fileName = null;
for (File dataFile : fileList) {
fileName = dataFile.getName();
String filePath = directory + "/" + fileName;
if (!indexList.contains(filePath)) {
indexList.add(filePath);
}
}
start = start.plusDays(1);
// System.out.println("start:"+start.toString("yyyy-MM-dd HH:mm:ss"));
}
// System.out.println("end:"+start.toString("yyyy-MM-dd HH:mm:ss"));
// System.out.println(indexList.toString());
return indexList.toArray(new String[0]);
}
@Override
public void run(SourceContext<QueueLengthBean> sourceContext) throws Exception {
List<String> sortList = Arrays.asList(getFilePath());
log.info("文件总数:{},文件列表:{}", sortList.size(), sortList.toString());
while (running) {
int size = 0;
for (String str : sortList) {
File file = new File(str.trim());
if (file.exists()) {
long st = System.currentTimeMillis();
try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
ZipEntry entry;
while (zis != null && (entry = zis.getNextEntry()) != null) {
try (InputStream is = zis;
BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
String line;
int e1total = 0;
String lastTime = null;
while ((line = reader.readLine()) != null) {
long t1 = System.currentTimeMillis();
if (StringUtils.isNotBlank(line)) {
String[] dataRec = line.split("@");
String timestamp = dataRec[0];
String e1Flag = dataRec[1];
if (!StringUtils.equalsIgnoreCase("b3", e1Flag)) {
continue;
}
lastTime = timestamp;
long time = DateUtil.StringToMillis(timestamp,"yyyy-MM-dd HH:mm:ss.SSS");
String e1Data = dataRec[3];
JSONObject jsonObject = JSONObject.parseObject(e1Data);
JSONArray eventList = jsonObject.getJSONArray("eventList");
for (int i = 0; i < eventList.size(); i++) {
JSONObject jsonObject1 = eventList.getJSONObject(i);
String globalLaneNo = jsonObject1.getString("laneNo");
String crossLane = BaseGisLaneCache.getInstance().getLaneRefMap().get(globalLaneNo);
if (crossLane != null) {
String[] sps = crossLane.split(",");
QueueLengthBean bean = new QueueLengthBean();
bean.setCrossName(sps[0]);
bean.setDir(sps[1]);
bean.setLaneNo(sps[2]);
bean.setQueueLength(jsonObject1.getDoubleValue("queueLength"));
bean.setTime(time);
bean.setGlobalLaneNo(globalLaneNo);
sourceContext.collect(bean);
}
}
}
}
long et = System.currentTimeMillis();
log.info("读取文件:{},耗时:{}ms,扫描E1帧数:{},最后一帧时间:{}", str, (et - st), e1total, lastTime);
}
}
long et = System.currentTimeMillis();
log.info("读取文件:{},耗时:{}ms", str, (et - st));
} catch (IOException e) {
e.printStackTrace();
}
}
size++;
Thread.sleep(100);
}
if (size == sortList.size()) {
sortList.clear();
Thread.sleep(60 * 1000);
break;
}
}
}
@Override
public void cancel() {
running = false;
}
private void cacheData(CarTrackModel carTrack, ConcurrentHashMap<String, CarTrackModel> vehicleInfoCache) {
String crossId = carTrack.getRoadnet().getCrossId();
int trackId = carTrack.getId();
CarTrackModel cache = vehicleInfoCache.get(crossId + trackId);
if (cache == null && !carTrack.getRoadnet().getLaneId().equals("null")) {
carTrack.getRoadnet().setRid(carTrack.getRoadnet().getRid());
carTrack.getRoadnet().setLaneId(carTrack.getRoadnet().getLaneId());
carTrack.getRoadnet().setCrossId(carTrack.getRoadnet().getCrossId());
vehicleInfoCache.put(crossId + trackId, carTrack);
}
if (cache != null) {
if (carTrack.getRoadnet().getLaneId().equals("null")) {
carTrack.getRoadnet().setRid(cache.getRoadnet().getRid());
carTrack.getRoadnet().setLaneId(cache.getRoadnet().getLaneId());
carTrack.getRoadnet().setCrossId(cache.getRoadnet().getCrossId());
carTrack.getRoadnet().setInCrossFlag(1);
vehicleInfoCache.put(crossId + trackId, carTrack);
}
}
}
}
package com.wanji.indicators.task.beihang;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.sink.JsonStringFileSink;
import com.wanji.indicators.task.laneIndicators.CrossLaneIndicatorMainNew;
import com.wanji.indicators.task.laneIndicators.func.LaneDataProcessWindow;
import com.wanji.indicators.task.laneIndicators.model.CrossLaneResultModel;
import com.wanji.indicators.task.laneIndicators.model.LaneResultModel;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
/**
* 离线计算入口
*/
public class OfflineBeiHangQueueLengthTaskMain {
private static final Logger log = LoggerFactory.getLogger(OfflineBeiHangQueueLengthTaskMain.class);
public static void main(String[] args) {
Properties properties = PropertiesHelper.getInstance().getProperties();
//初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//加载配置项 并获取工作路径
String inPath = "d:/flink/workspace";
String outPath = "d:/flink/out/";
ParameterTool parameter = ParameterTool.fromArgs(args);
inPath = parameter.get("inPath","D:\\WanJi-Work\\07 项目\\02 济南\\数据");
outPath = parameter.get("outPath","d:/flink/out/");
String st = parameter.get("startTime","20240412");
String et = parameter.get("endTime","20240414");
// --inPath D:\上地西路数据\后厂村+九街20240412\20240412
inPath = "D:\\上地西路数据\\七街20240412\\";
// inPath = "D:\\上地西路数据\\后厂村+九街20240412\\20240412";
long startTime = DateUtil.StringToMillis(st,"yyyyMMdd");
long endTime = DateUtil.StringToMillis(et,"yyyyMMdd");
int parallelism = parameter.getInt("parallelism",1);
env.setParallelism(parallelism);
//读取离线数据
DataStreamSource<QueueLengthBean> streamSource = env.addSource(new BeiHangTrackFrameB3DataFile(inPath,startTime,endTime));
try {
SingleOutputStreamOperator<QueueLengthBean> laneStream = streamSource
.assignTimestampsAndWatermarks(
WatermarkStrategy.<QueueLengthBean>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getTime()))
.keyBy(o -> o.getLaneNo())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.maxBy("queueLength")
.process(new ProcessFunction<QueueLengthBean, QueueLengthBean>() {
@Override
public void processElement(QueueLengthBean value, Context ctx, Collector<QueueLengthBean> out) throws Exception {
// value.setStartTime(DateUtil.toDateTime(ctx.timestamp(),"yyyy-MM-dd HH:00:00"));
// value.setEndTime(DateUtil.toDateTime(ctx.timestamp()+60*60*1000,"yyyy-MM-dd HH:00:00"));
value.setStartTime(DateUtil.toDateTime(ctx.timestamp()-1*60*1000,"yyyy-MM-dd HH:mm:00"));
value.setEndTime(DateUtil.toDateTime(ctx.timestamp(),"yyyy-MM-dd HH:mm:00"));
out.collect(value);
}
})
.name("车道排队指标计算");
SingleOutputStreamOperator<String> resultStream = laneStream.flatMap(new FlatMapFunction<QueueLengthBean, String>() {
@Override
public void flatMap(QueueLengthBean value, Collector<String> out) throws Exception {
String s = value.getCrossName()+","+value.getDir()+","+value.getLaneNo()+","+value.getQueueLength()+","+value.getStartTime()+","+value.getEndTime()+","+value.getGlobalLaneNo();
out.collect(s);
}
});
String tpath = outPath + "七街-排队长度.csv";
resultStream.addSink(new JsonStringFileSink(tpath)).setParallelism(1).name("车道排队指标计算-结果输出至文件");
JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
.withMaxRetries(3) // 异常重试次数
.withBatchSize(1) // 批量大小设置为1,表示一条数据都会写入mysql
.withBatchIntervalMs(0) // 批次的间隔为0,表示关闭定时写入
.build();
JdbcConnectionOptions jdbcBuild = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(properties.getProperty("jdbc.url"))
.withUsername(properties.getProperty("jdbc.username"))
.withPassword(properties.getProperty("jdbc.password"))
.withDriverName(properties.getProperty("jdbc.driver"))
.withConnectionCheckTimeoutSeconds(60)
.build();;
queueLengthToMysql(jdbcExecutionOptions,jdbcBuild,laneStream);
env.execute("离线调试测试");
} catch (Exception e) {
log.error("事件测试 - 计算任务异常 : " + e);
}
}
private static void queueLengthToMysql(JdbcExecutionOptions jdbcExecutionOptions, JdbcConnectionOptions jdbcBuild, SingleOutputStreamOperator<QueueLengthBean> laneCsvStream) {
laneCsvStream.addSink(JdbcSink.sink(
"insert into t_analysis_cross_queue_length (cross_name,rid_dir,lane_no,max_queue_length,window_start_time,window_end_time,global_lane_no)" +
" values(?,?,?,?,?,?,?) " +
"on duplicate key update " +
"cross_name=values(cross_name)," +
"rid_dir=values(rid_dir)," +
"lane_no=values(lane_no)," +
"max_queue_length=values(max_queue_length)," +
"window_start_time=values(window_start_time)," +
"window_end_time=values(window_end_time)," +
"global_lane_no=values(global_lane_no)"
,
new JdbcStatementBuilder<QueueLengthBean>() {
@Override
public void accept(PreparedStatement preparedStatement, QueueLengthBean resultModel) throws SQLException {
preparedStatement.setString(1, resultModel.getCrossName());
preparedStatement.setString(2, resultModel.getDir());
preparedStatement.setString(3, resultModel.getLaneNo());
preparedStatement.setDouble(4, resultModel.getQueueLength());
preparedStatement.setTimestamp(5, new Timestamp(DateUtil.StringToMillis(resultModel.getStartTime())));
preparedStatement.setTimestamp(6, new Timestamp(DateUtil.StringToMillis(resultModel.getEndTime())));
preparedStatement.setString(7, resultModel.getGlobalLaneNo());
preparedStatement.addBatch();
}
},
jdbcExecutionOptions,
jdbcBuild
)).setParallelism(1).name("queue length to mysql");
}
}
package com.wanji.indicators.task.beihang;
import lombok.Data;
import java.io.Serializable;
/**
* @author fengyi
* @date 2024/5/8
* @description
*/
@Data
public class QueueLengthBean implements Serializable {
private String crossName;
private String dir;
private String laneNo;
private String globalLaneNo;
private Double queueLength;
//周期开始时间
private String startTime;
//周期截止时间
private String endTime;
private Long time;
}
......@@ -56,7 +56,13 @@ public class CrossLaneIndicatorMainNew implements Serializable {
.withBatchSize(1) // 批量大小设置为1,表示一条数据都会写入mysql
.withBatchIntervalMs(0) // 批次的间隔为0,表示关闭定时写入
.build();
JdbcConnectionOptions jdbcBuild = null;
JdbcConnectionOptions jdbcBuild = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(properties.getProperty("jdbc.url"))
.withUsername(properties.getProperty("jdbc.username"))
.withPassword(properties.getProperty("jdbc.password"))
.withDriverName(properties.getProperty("jdbc.driver"))
.withConnectionCheckTimeoutSeconds(60)
.build();;
private CrossLaneIndicatorMainNew(StreamExecutionEnvironment env, String indexName) {
this.env = env;
......
package com.wanji.indicators.task.trajectory.func;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.constant.CongestEnum;
import com.wanji.indicators.constant.Constant;
......@@ -431,6 +432,8 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
private LaneSnapshotDataDTO readLaneSnapFromKafka() throws JsonProcessingException {
TopicPartition partition = new TopicPartition(snapShotTopic, 0);
ObjectMapper mapper = JacksonUtils.getInstance();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// 指定消费的主题和分区
consumer.assign(Collections.singletonList(partition));
......@@ -442,7 +445,6 @@ public class VehicleGapTimeProcessFunction extends KeyedProcessFunction<String,
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// 获取到实体类中
ObjectMapper mapper = JacksonUtils.getInstance();
LaneSnapshotDataDTO laneSnapshotDataDTO = mapper.readValue(String.valueOf(record.value()),
LaneSnapshotDataDTO.class);
return laneSnapshotDataDTO;
......
......@@ -167,4 +167,17 @@ citybrain.snapshot.sink.topic=JN050001LaneSnapshotDataCityBrain
#\u81EA\u5B9A\u4E49\u884C\u4EBA\u8F66\u9053\u7F16\u53F7
consume.pedestrian.baseLaneInfo=13NF80B5QN0-11,13NF80B5QN0-12,13NF80B5QN0-13,13NF80B5QN0-14,13NGH0B5RC0-21,13NGH0B5RC0-22,13NGH0B5RC0-23,13NGH0B5RC0-24,13NI00B5RM0-31,13NI00B5RM0-32
line.congestion.sink.topic="line_congestion_sink"
\ No newline at end of file
line.congestion.sink.topic="line_congestion_sink"
# \u57CE\u5E02\u5927\u8111\u8BBE\u5907\u7F16\u53F7\u4E0E\u4E07\u96C6\u8DEF\u53E3ID\u6620\u5C04
device.cross=13NI00B5RM0:13NI00B5RM0,13NGH0B5RC0:13NGH0B5RC0,13NF80B5QN0:13NF80B5QN0
# \u57CE\u5E02\u5927\u8111 laneId \u4E0E\u4E07\u96C6 laneId \u6620\u5C04
brain.wanji.lane.id=test:test
jdbc.driver=com.mysql.cj.jdbc.Driver
jdbc.url=jdbc:mysql://localhost:3306/beihang?userUnicode=true&characterEncoding=utf-8
jdbc.username=root
jdbc.password=123456
\ No newline at end of file
路口名称,方向,车道号,全域车道号
上地西路与后厂村路交叉口,1,11,6
上地西路与后厂村路交叉口,1,12,5
上地西路与后厂村路交叉口,1,13,4
上地西路与后厂村路交叉口,1,14,3
上地西路与后厂村路交叉口,1,15,2
上地西路与后厂村路交叉口,1,16,1
上地西路与后厂村路交叉口,3,11,16
上地西路与后厂村路交叉口,3,12,15
上地西路与后厂村路交叉口,3,13,14
上地西路与后厂村路交叉口,3,14,13
上地西路与后厂村路交叉口,3,15,12
上地西路与后厂村路交叉口,3,16,11
上地西路与后厂村路交叉口,5,11,25
上地西路与后厂村路交叉口,5,12,24
上地西路与后厂村路交叉口,5,13,23
上地西路与后厂村路交叉口,5,14,22
上地西路与后厂村路交叉口,5,15,21
上地西路与后厂村路交叉口,7,11,35
上地西路与后厂村路交叉口,7,12,34
上地西路与后厂村路交叉口,7,13,33
上地西路与后厂村路交叉口,7,14,32
上地西路与后厂村路交叉口,7,15,31
上地西路与后厂村路交叉口,7,16,30
上地西路与上地九街交叉口,1,11,43
上地西路与上地九街交叉口,1,12,42
上地西路与上地九街交叉口,1,13,41
上地西路与上地九街交叉口,1,14,40
上地西路与上地九街交叉口,3,11,51
上地西路与上地九街交叉口,3,12,50
上地西路与上地九街交叉口,3,13,49
上地西路与上地九街交叉口,3,14,48
上地西路与上地九街交叉口,5,11,58
上地西路与上地九街交叉口,5,12,57
上地西路与上地九街交叉口,5,13,56
上地西路与上地九街交叉口,5,14,55
上地西路与上地七街交叉口,1,11,68
上地西路与上地七街交叉口,1,12,67
上地西路与上地七街交叉口,1,13,66
上地西路与上地七街交叉口,1,14,65
上地西路与上地七街交叉口,1,15,64
上地西路与上地七街交叉口,1,16,63
上地西路与上地七街交叉口,3,11,74
上地西路与上地七街交叉口,3,12,73
上地西路与上地七街交叉口,5,11,82
上地西路与上地七街交叉口,5,12,81
上地西路与上地七街交叉口,5,13,80
上地西路与上地七街交叉口,5,14,79
上地西路与上地七街交叉口,5,15,78
上地西路与上地七街交叉口,5,16,77
上地西路与上地七街交叉口,7,11,89
上地西路与上地七街交叉口,7,12,88
上地西路与上地七街交叉口,7,13,87
上地西路与上地七街交叉口,7,14,86
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment