Commit ba9ce0e2 authored by zhoushiguang's avatar zhoushiguang

任务合并入库

parent 2046c4a3
package com.wanji.indicators.task;
import com.wanji.indicators.event.cross.accident.AccidentEventMainNew;
import com.wanji.indicators.event.cross.conflictpoint.ConflictEventMain;
import com.wanji.indicators.event.stream.func.CrossFrameFlatMap;
import com.wanji.indicators.event.stream.func.RefBaseRidInfoCoFlatMap;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.CrossFrameModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.source.AreaFreeSpeedSource;
import com.wanji.indicators.task.area.service.AreaIndexAnalysisMainNew;
import com.wanji.indicators.task.area.service.LaneNonMotorAnalysisMainNew;
import com.wanji.indicators.task.area.service.RidParticipantAnalysisMainNew;
import com.wanji.indicators.task.area.service.func.RidFreeSpeedCoFlatMap;
import com.wanji.indicators.task.motor.MotorEfficiencyMainNew;
import com.wanji.indicators.task.track.service.CarRoutePathMainNew;
import com.wanji.indicators.task.track.service.TrackStoreMainNew;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.task.trafficstate.RidTrafficStateMain;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import java.util.Properties;
public class AllTaskMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
private static final String GROUP_ID = "FlinkConsumerGroup-V56-TRACK";
public static void main(String[] args) {
//批量轨迹es index,逗号分割:esIndexName,windowTime
String indexName = "car-track-30s,30";
String indexName1 = "car-route-path";
ParameterTool parameter = ParameterTool.fromArgs(args);
boolean isMockPlateNumber = parameter.getBoolean("isMockPlateNumber",false);
if (parameter.get("trackEsName") != null)
indexName = parameter.get("trackEsName");
if (parameter.get("routeEsName") != null)
indexName1 = parameter.get("routeEsName");
boolean isStartLightStatus = false;
String startLightStatusFlag = parameter.get("isStartLightStatus", "false");
isStartLightStatus = Boolean.parseBoolean(startLightStatusFlag);
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
String topic = properties.getProperty("consumer.topic");
//根据指定时间消费
long startTimeStampOffset = 0;
//yyyyMMdd_HHmmss
String consumerStartDate = parameter.get("consumerStartDate");
if (StringUtils.isNotEmpty(consumerStartDate)) {
startTimeStampOffset = DateUtil.stringToDate(consumerStartDate, "yyyyMMdd_HHmmss").getTime();
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(4);
// env.getCheckpointConfig().disableCheckpointing();
// env.enableCheckpointing(3 * 60 * 1000);
// env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
// env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri")));
// env.setStateBackend(new FsStateBackend(properties.getProperty("check.point.uri")));
// env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1 * 60 * 1000);
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置重启策略,30s钟重启一次,最多重启3次
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(30, TimeUnit.SECONDS)));
DateTime currentDateTime = new DateTime();
DateTime startTimeStamp = currentDateTime.withTimeAtStartOfDay();
KafkaSource<String> source = null;
//从指定时间开始消费
if (startTimeStampOffset > 0) {
source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
// .setProperty("enable.auto.commit", "true")
// .setProperty("auto.commit.interval.ms", "1000")
.setProperty("auto.offset.reset", "latest")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic)
.setGroupId(GROUP_ID + "_run_route")
// 从大于等于此时间戳开始的偏移量开始
.setStartingOffsets(OffsetsInitializer.timestamp(startTimeStampOffset))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
} else {
source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("enable.auto.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("auto.offset.reset", "latest")
// .setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic)
.setGroupId(GROUP_ID + "_run_route_new")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
;
}
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-car-track-source-route");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("路径计算-轨迹帧数据-JsonToObject") ;
SingleOutputStreamOperator<CarTrackModel> carTrackModelStream =
frameModelStream.
flatMap(new CarTrackFlatMap(isMockPlateNumber))
.setParallelism(1)
.name("路径计算-轨迹帧数据解析-ToCarTrackModel");
try {
//批量轨迹封装
TrackStoreMainNew.init(env, indexName).run(carTrackModelStream);
//车辆行驶路径分析
CarRoutePathMainNew.init(env, indexName1).run(carTrackModelStream);
/**********************************************************************************/
SingleOutputStreamOperator<CarTrackModel> joinFreeSpeedStream = carTrackModelStream
.connect(env.addSource(new AreaFreeSpeedSource()).broadcast())
.flatMap(new RidFreeSpeedCoFlatMap())
.setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定");
//区域指数相关
AreaIndexAnalysisMainNew.init(env, "indexName",false).run(joinFreeSpeedStream);
//行人非机动车数量对机动车速度影响
RidParticipantAnalysisMainNew.init(env,null,false).run(joinFreeSpeedStream);;
//路段路况
RidTrafficStateMain.init(env, null,false).run(joinFreeSpeedStream);
SingleOutputStreamOperator<CrossFrameModel> crossDataStream =
stream
.flatMap(new CrossFrameFlatMap())
.setParallelism(1)
.name("全域轨迹帧数据-按路口分组输出");
//机动车运行指标计算
MotorEfficiencyMainNew.init(env,null,false).run(crossDataStream);
//车道上
LaneNonMotorAnalysisMainNew.init(env,null,false).run(carTrackModelStream);
/*****************************************************************************************/
//抽帧
SingleOutputStreamOperator<CrossFrameModel> chouzhenStream = crossDataStream
.keyBy(CrossFrameModel::getCrossId)
.countWindow(1)
.maxBy("globalTimeStamp")
.name("数据抽帧");
SingleOutputStreamOperator<CrossFrameModel> joinLaneStream = chouzhenStream
.flatMap(new RefBaseRidInfoCoFlatMap())
.setParallelism(1)
.name("全域轨迹帧数据-车道属性数据绑定");
//路口内冲突点计算
ConflictEventMain.init(env).run(joinLaneStream);
DataStream<String> lightStatusStream = null;
if (isStartLightStatus) {
KafkaSource<String> lightStatusSource = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "false")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics("cross_lights_status")
.setGroupId(GROUP_ID + "-event_light")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
lightStatusStream = env
.fromSource(lightStatusSource, WatermarkStrategy.noWatermarks(), "cross_lights_status-data-source");
//事故计算
AccidentEventMainNew.init(env).run(joinLaneStream, lightStatusStream);
} else {
//事故计算
AccidentEventMainNew.init(env).run(joinLaneStream, null);
}
/*****************************************************************************************/
env.execute("轨迹路径分析-工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment