Commit 2cb7973f authored by zhoushiguang's avatar zhoushiguang

checkpoint优化;修改topic数据导出支持分区文件存储

parent 0d8e66c2
package com.wanji.indicators.sink;
import com.wanji.indicators.task.export.KafkaRecordModel;
import com.wanji.indicators.util.FileUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Set;
public class PartitionFileSink implements SinkFunction<Map<Integer,String>> {
private static final Logger log = LoggerFactory.getLogger(PartitionFileSink.class);
private Map<Integer,String> partitionPath;
public PartitionFileSink(Map<Integer,String> partitionPath) {
this.partitionPath = partitionPath;
}
@Override
public void invoke(Map<Integer,String> dataMap, Context context) {
Set<Integer> keys = dataMap.keySet();
for (Integer key : keys) {
String path = partitionPath.get(key);
if (StringUtils.isNotBlank(path)) {
FileUtil.writeApend(path, dataMap.get(key));
}
}
}
}
......@@ -13,6 +13,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 自由流速度
......@@ -44,12 +45,13 @@ public class AreaFreeSpeedSource extends RichParallelSourceFunction<Map<String,
public void run(SourceContext<Map<String, Map<String,Double>>> sourceContext) throws Exception {
try {
if (running) {
while (running) {
String crossList = properties.getProperty("full.area.cross.list");
Double defaultFreeSpeed = Double.parseDouble(properties.getProperty("rid.default.free.speed"));
Map<String, Map<String, Double>> ridFreeSpeed = baseLaneInfoService.getRidFreeSpeed(crossList, defaultFreeSpeed);
sourceContext.collect(ridFreeSpeed);
TimeUnit.MINUTES.sleep(1);
}
} catch (Exception e) {
log.error("查询车道信息异常: " + e.getMessage(), e);
......
......@@ -12,6 +12,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
......@@ -40,14 +41,17 @@ public class ConflictTurnSource extends RichParallelSourceFunction<Map<String, L
public void run(SourceContext<Map<String, List<String>>> sourceContext) throws Exception {
//key:cross-rid-ridDir-turn value:crossid-rid-ridDir-冲突转向
while (running) {
try {
String crossList = properties.getProperty("full.area.cross.list");
Map<String, List<String>> retMap = baseLaneInfoService.getRidTurnConflictInfo(crossList);
sourceContext.collect(retMap);
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
log.error("查询 rid 冲突信息数据异常: " + e.getMessage(), e);
}
}
}
@Override
public void cancel() {
......
......@@ -16,6 +16,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 查询 溢出 检测区域 数据
......@@ -50,7 +51,7 @@ public class CongestionSource extends RichParallelSourceFunction<HashMap<String,
@Override
public void run(SourceContext<HashMap<String, Map<String, CongestionArea>>> sourceContext) throws Exception {
HashMap<String, Map<String, CongestionArea>> outMap = new HashMap<>();
if (running) {
while (running) {
try {
String sql = "SELECT cross_id AS crossId,rid,lane_num AS laneNum,rid_in_arr AS ridInArr,wkt FROM t_congestion_area ";
List<CongestionArea> dataList = congestionAreaMapper.findAllCongestionArea();// new JdbcUtil().excuteQueryToList(CongestionAreaDo.class, sql);
......@@ -93,6 +94,7 @@ public class CongestionSource extends RichParallelSourceFunction<HashMap<String,
if (outMap.size() > 0) {
sourceContext.collect(outMap);
}
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
log.error("查询溢出检测区域: " + e.getMessage() , e);
}
......
......@@ -81,6 +81,7 @@ public class ConsulAreaSource extends RichParallelSourceFunction<HashMap<String,
if (outMap.size() > 0) {
sourceContext.collect(outMap);
}
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
log.error("查询溢出检测区域: " + e.getMessage() , e);
}
......
......@@ -94,6 +94,7 @@ public class ConsulConfigSource extends RichParallelSourceFunction<HashMap<Strin
if (!hashMap.isEmpty()) {
sourceContext.collect(hashMap);
}
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
log.error("consul 服务异常 查询配置异常: " + e.getMessage() , e);
} finally {
......
......@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 查询 rid 转向数据
......@@ -25,7 +26,7 @@ public class CrossShapeSource extends RichParallelSourceFunction<HashMap<String,
public void run(SourceContext<HashMap<String, CrossShapeModel>> sourceContext) throws Exception {
HashMap<String, CrossShapeModel> outMap = new HashMap<>();
try {
if (running) {
while (running) {
String sql = "SELECT aa.rid_in,aa.rid_out,aa.turn_dir,aa.wkt,bb.angle AS angle_out,cc.angle AS angle_in " +
" FROM cross_turn_line aa " +
" LEFT JOIN rid bb ON aa.rid_out = bb.rid " +
......@@ -62,6 +63,7 @@ public class CrossShapeSource extends RichParallelSourceFunction<HashMap<String,
if (outMap.size() > 0) {
sourceContext.collect(outMap);
}
TimeUnit.MINUTES.sleep(1);
// Thread.sleep(1000*30);
}
} catch (Exception e) {
......
......@@ -12,6 +12,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
......@@ -48,7 +49,7 @@ public class FirdsOfRightTridSource extends RichParallelSourceFunction<HashMap<S
@Override
public void run(SourceContext<HashMap<String, Map<String,String>>> sourceContext) throws Exception {
HashMap<String, Map<String,String>> retMap = new HashMap<>();
if (running) {
while (running) {
List<BaseCrossTurnInfo> dataList = baseCrossTurnInfoMapper.findRidTurnInfo(null);
if (dataList != null && !dataList.isEmpty()) {
......@@ -72,6 +73,7 @@ public class FirdsOfRightTridSource extends RichParallelSourceFunction<HashMap<S
if (retMap.size() > 0) {
sourceContext.collect(retMap);
}
TimeUnit.MINUTES.sleep(1);
}
}
......
......@@ -47,10 +47,11 @@ public class LaneInfoSource extends RichParallelSourceFunction<Map<String, BaseL
public void run(SourceContext<Map<String, BaseLaneInfo>> sourceContext) throws Exception {
try {
if (running) {
while (running) {
String crossList = properties.getProperty("full.area.cross.list");
Map<String, BaseLaneInfo> map = baseLaneInfoService.findLaneInfo(crossList);
sourceContext.collect(map);
TimeUnit.MINUTES.sleep(1);
// TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
......
......@@ -11,6 +11,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
......@@ -33,13 +34,14 @@ public class LaneRidLengthSource extends RichParallelSourceFunction<Map<String,L
@Override
public void run(SourceContext<Map<String,List<BaseLaneInfo>>> sourceContext) throws Exception {
if (running) {
while (running) {
List<BaseLaneInfo> list = baseLaneInfoService.findLaneRidInfo();
if (list.size() > 0) {
Map<String,List<BaseLaneInfo>> group = list.stream().collect(Collectors.groupingBy(o->o.getId()));
sourceContext.collect(group);
}
TimeUnit.MINUTES.sleep(1);
}
}
......
......@@ -7,6 +7,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @Author ke.han
......@@ -20,7 +21,7 @@ public class RidDirSource extends RichParallelSourceFunction<
@Override
public void run(SourceContext<HashMap<String, String>> out) throws Exception {
HashMap<String, String> outMap = new HashMap<>();
if (running) {
while (running) {
String sql = "SELECT rid,dir_4_no AS dir,angle FROM rid";
List<RidDirDo> ridLaneNumDos = null;//new JdbcUtil().excuteQueryToList(RidDirDo.class, sql);
if (ridLaneNumDos != null && !ridLaneNumDos.isEmpty()) {
......@@ -37,6 +38,7 @@ public class RidDirSource extends RichParallelSourceFunction<
if (outMap.size() > 0) {
out.collect(outMap);
}
TimeUnit.MINUTES.sleep(1);
// Thread.sleep(1000*60);
}
}
......
......@@ -6,6 +6,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @Author ke.han
......@@ -23,7 +24,7 @@ public class WaitLaneSource extends RichParallelSourceFunction<
@Override
public void run(SourceContext<HashMap<String, Integer>> out) {
HashMap<String, Integer> outMap = new HashMap<>();
if (running) {
while (running) {
String sql = "SELECT * FROM cross_flink_settings";
List<WaitingAreaDo> ridLaneNumDos = null;//new JdbcUtil().excuteQueryToList(WaitingAreaDo.class, sql);
if (ridLaneNumDos != null && !ridLaneNumDos.isEmpty()) {
......@@ -37,6 +38,11 @@ public class WaitLaneSource extends RichParallelSourceFunction<
if (outMap.size() > 0) {
out.collect(outMap);
}
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Thread.sleep(1000*60);
}
}
......
......@@ -5,6 +5,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
/**
* 待转区数据源
......@@ -17,13 +18,14 @@ public class WaitingAreaSource extends RichParallelSourceFunction<
@Override
public void run(SourceContext<HashMap<String, Integer>> out) throws Exception {
HashMap<String, Integer> outMap = new HashMap<>();
if (running) {
while (running) {
String sql = "SELECT * FROM cross_flink_settings WHERE waiting_area <> 0";
try {
if (outMap.size() > 0) {
out.collect(outMap);
}
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
log.error("路口待转配置查询异常:" + e.getMessage());
out.collect(outMap);
......
......@@ -3,9 +3,13 @@ package com.wanji.indicators.task.export;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.RoutePathCarTrackModel;
import com.wanji.indicators.sink.PartitionFileSink;
import com.wanji.indicators.task.track.service.sink.FileSink;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
......@@ -35,10 +39,13 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.*;
import java.util.stream.Collectors;
public class TrackExportToFileMain {
private static final Logger log = LoggerFactory.getLogger(TrackExportToFileMain.class);
......@@ -54,12 +61,34 @@ public class TrackExportToFileMain {
String startTime = parameter.get("startTime");
String endTime = parameter.get("endTime");
String sinkPath = parameter.get("sinkPath");
//单位分钟,按几分钟一个文件
Integer partitionTime = parameter.getInt("partitionTime");
//目录
if (!sinkPath.endsWith("/")) {
sinkPath += "/";
}
sinkPath += topic+"_"+startTime + "-" + endTime + ".json";
log.info("导出路径:" + sinkPath);
Map<Integer, String> timePathIndex = new HashMap<>();
if (partitionTime > 0) {//指定文件分区
DateTime start = new DateTime(startTime);
DateTime end = new DateTime(endTime);
while (start.isBefore(end)) {
int minute = start.getMinuteOfHour();
int index = minute / partitionTime * partitionTime;
String sdt = start.toString("yyyy_MM_dd_HH_mm_ss");
start = start.plusMinutes(partitionTime);
String edt = start.toString("yyyy_MM_dd_HH_mm_ss");
String filePath = sinkPath + topic + "-" + sdt + "-" + edt + ".json";
timePathIndex.put(index, filePath);
}
} else {
sinkPath += topic + "-" + startTime + "-" + endTime + ".json";
timePathIndex.put(0, sinkPath);
}
long startTimestamp = DateUtil.StringToMillis(startTime, "yyyy_MM_dd_HH_mm_ss");
long endTimestamp = DateUtil.StringToMillis(endTime, "yyyy_MM_dd_HH_mm_ss");
......@@ -158,19 +187,30 @@ public class TrackExportToFileMain {
}
});
SingleOutputStreamOperator<String> toJsonStream =
SingleOutputStreamOperator<Map<Integer,String>> toJsonStream =
timeOutStream.flatMap(
new FlatMapFunction<KafkaRecordModel, String>() {
new FlatMapFunction<KafkaRecordModel, Map<Integer,String>>() {
@Override
public void flatMap(KafkaRecordModel value, Collector<String> out) throws Exception {
public void flatMap(KafkaRecordModel value, Collector<Map<Integer,String>> out) throws Exception {
Long ts = value.getTimestamp();
//Long endTs = DateUtil.StringToMillis("2023-03-30 11:40:00:999","yyyy-MM-dd HH:mm:ss:SSS");
// if (ts > endTimestamp) {
// System.exit(0);
// }
log.info("记录时间:{},数据导出范围【{},{}】", DateUtil.toDateTime(ts, "yyyy-MM-dd HH:mm:ss.SSS"), startTime, endTime);
String json = JSON.toJSONString(value);
out.collect(json);
Map<Integer, String> dataMap = new HashMap<>();
if (partitionTime > 0) {
DateTime dateTime = new DateTime(new Date(ts));
int minute = dateTime.getMinuteOfHour();
int index = minute / partitionTime * partitionTime;
dataMap.put(index, json);
} else {
dataMap.put(0, json);
}
out.collect(dataMap);
}
})
......@@ -179,7 +219,7 @@ public class TrackExportToFileMain {
try {
toJsonStream.addSink(new FileSink(sinkPath));
toJsonStream.addSink(new PartitionFileSink(timePathIndex));
env.execute("根据起始时间范围导出数据到文件");
//env.executeAsync().cancel();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment