Commit e8364935 authored by zhoushiguang's avatar zhoushiguang
parents b994eb9c a8c061cf
......@@ -460,6 +460,19 @@
</dependency>
<!--swagger ends-->
<!-- SparkJava Dependency -->
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.9.3</version>
</dependency>
<dependency>
<groupId>wanji</groupId>
<artifactId>v2xprotocol</artifactId>
<version>0.0.3</version>
</dependency>
</dependencies>
<!--<build>
......
package com.wanji.indicators.task.maincontrol;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.wanji.indicators.task.maincontrol.entity.E1FrameExtend;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import static spark.Spark.port;
import static spark.Spark.post;
public class MainControlMain {
public static final Properties properties = PropertiesHelper.getInstance().getProperties();
public static final String originE1Topic;
public static final String kafkaServerAddress;
private static final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss:SSS").create();
private static final PropertiesHelper instance = PropertiesHelper.getInstance();
static {
originE1Topic = properties.getProperty("origin.e1.topic");
kafkaServerAddress = properties.getProperty("bootstrap.servers");
}
public static void main(String[] args) throws Exception {
// 启动 HTTP 服务器
startHttpServer();
// 启动 Flink 作业
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取配置文件中的kafka消费topic
String sourceTopic = properties.getProperty("origin.e1.topic");
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(kafkaServerAddress)
.setTopics(sourceTopic)
.setGroupId(properties.getProperty("consumer.group.id") + "-origin-e1")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"origin-e1-topic");
SingleOutputStreamOperator<E1FrameExtend> E1FrameStream = stream.map(new MapFunction<String, E1FrameExtend>() {
@Override
public E1FrameExtend map(String value) {
return gson.fromJson(value, E1FrameExtend.class);
}
});
E1FrameStream.print();
env.execute("启动主控 Job");
}
private static void startHttpServer() {
port(19355);
// Kafka producer configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
post("/submitE1Frame", (request, response) -> {
String body = request.body();
producer.send(new ProducerRecord<>(originE1Topic, body));
response.status(200);
return "主控收到 E1 数据";
});
}
}
package com.wanji.indicators.task.maincontrol.entity;
import lombok.Data;
import java.util.Date;
@Data
public class CameraInfo {
private int cameraId;// true int 相机ID,遵循《全息路口基站ID与基站相机ID编号规则》,无归属则为0
private Date cameraTimeStamp;// true String 当前相机图片帧的ms级时间戳: yyyy-MM-dd HH:mm:ss:SSS
}
package com.wanji.indicators.task.maincontrol.entity;
import cn.net.wanji.v2xprotocol.annotation.ParseList;
import cn.net.wanji.v2xprotocol.annotation.ProtocolField;
import cn.net.wanji.v2xprotocol.annotation.ProtocolType;
import cn.net.wanji.v2xprotocol.annotation.ReservedSpace;
import cn.net.wanji.v2xprotocol.entity.E1FrameParticipant;
import lombok.Data;
import java.util.Date;
import java.util.List;
/**
* @ClassName E1Frame
* @Description
* @Date 2021/3/8 9:42
* @Version 1.0
*/
@ProtocolType(isNeedTransfer=false,isVerifyFrameCheck = false,childCmd = 2)
@Data
public class E1Frame extends Org {
//设备ID unit16_t 2
@ProtocolField(length = 2)
int deviceId;
//算法耗时 ms
short algorithmDuration;
//点云数据帧号
@ProtocolField(length = 4)
long frameNo;
@ProtocolField(dateType = 0)
Date timeStamp;
// //时间戳秒 uint32_t 4 1900起,在最终计算时间上添加8h后为标准北京时间
// @ProtocolField(length = 4)
// long unixTimeSec;
// //时间戳微秒 uint32_t 4 时间戳小数部分,单位为电脑晶振时钟周期(➗4.295后即为小数点后有效值)
// @ProtocolField(length = 4)
// long unixTimeDecimal;
//激光器原点经度 nint32_t 4 分辨率1e-7°,东经为正,西经为负
@ProtocolField(length = 4,multiple = 0.0000001)
double longitude;
//激光器原点纬度 nint32_t 4 分辨率1e-7°,北纬为正,南纬为负
@ProtocolField(length = 4,multiple = 0.0000001)
double latitude;
//激光器角度 uint16_t 2 y轴与正北方的夹角(0-360°正北为0)
short angle;
//交通参与者数量 nint8_t 1
short participantNum;
//交通参与者集合
@ParseList(elementClass = E1FrameParticipant.class , field4Size = "participantNum")
@ReservedSpace(frontReservedSpacelength = 4,behindReservedSpacelength = 4)
List<E1FrameParticipant> e1FrameParticipant;
}
package com.wanji.indicators.task.maincontrol.entity;
import lombok.Data;
import java.util.Date;
import java.util.List;
/**
* @ClassName E1Frame
* @Description
* @Date 2021/3/8 9:42
* @Version 1.0
*/
@Data
public class E1FrameExtend extends E1Frame {
Long globalFrameNo;//全域数据帧号
Date globalTimeStamp;//全域上报时间
short stationNum; //基站数量
List<E1FrameSourceInfo> sourceInfoList; //单基站信息集合
public E1FrameExtend(){};
public E1FrameExtend(E1Frame e1Frame){
this.setAlgorithmDuration(e1Frame.getAlgorithmDuration());
this.setAngle(e1Frame.getAngle());
this.setDeviceId(e1Frame.getDeviceId());
this.setE1FrameParticipant(e1Frame.getE1FrameParticipant());
this.setFrameNo(e1Frame.getFrameNo());
this.setLatitude(e1Frame.getLatitude());
this.setLongitude(e1Frame.getLongitude());
this.setParticipantNum(e1Frame.getParticipantNum());
this.setTimeStamp(e1Frame.getTimeStamp());
}
}
package com.wanji.indicators.task.maincontrol.entity;
import lombok.Data;
import java.util.Date;
import java.util.List;
/**
* @ClassName E1FrameParticipant
* @Description
* @Date 2021/3/8 9:44
* @Version 1.0
*/
@Data
public class E1FrameSourceInfo {
private int state; //0正常 1预测 2离线
private int baseStationSource;//基站来源1~255,高速公路按照行驶方向递增;路口按照与正北顺时针夹角最小的基站号为1,顺时针依次增加
private Long sourceframeNo;//单基站点云数据帧号
private Date sourcetimeStamp;//单基站识别结果时间戳
private Date sourceresulttimeStamp ;//融合结果上报时间戳:yyyy-MM-dd HH:mm:ss:SSS
private int pcloudalgorithmconsume;//点云算法耗时:ms,无则填0
private int videoalgorithmconsume;//视频算法耗时:ms,无则填0
private int fusionalgorithmconsume;//融合算法耗时:ms,无则填0
private int totalalgorithmconsume;//感知结果时延:点云组帧完成到融合结果输出
private Date sourceRecievetimeStamp;//全域软件接收单基站融合感知结果时间戳: yyyy-MM-dd HH:mm:ss:SSS
private double sourcelongitude;//单基站激光器原点经度
private double sourcelatitude;//单基站激光器原点纬度
private short sourceangle;//单基站激光器角度
private short cameraNum;//基站相机数量
private List<CameraInfo> cameraList;//基站相机信息集合
private int lidarNum;//雷达数量
private List<LidarInfo> lidarList;//基站雷达信息集合
}
\ No newline at end of file
package com.wanji.indicators.task.maincontrol.entity;
import lombok.Data;
import java.util.Date;
@Data
public class LidarInfo {
private int frameId ;// true int 点云帧最后一包数据中的帧序号
private Date pcloudTimeStamp;// true String 点云帧最后一包数据中的时间戳: yyyy-MM-dd HH:mm:ss:SSS
}
package com.wanji.indicators.task.maincontrol.entity;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* 组织
*/
@EqualsAndHashCode
@Data
public class Org {
private String orgCode;
}
......@@ -4,8 +4,11 @@ import org.apache.commons.lang3.StringUtils;
import java.io.*;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class PropertiesHelper implements Serializable {
private Properties properties;
......@@ -33,6 +36,8 @@ public class PropertiesHelper implements Serializable {
InputStream inputStream = PropertiesHelper.class.getResourceAsStream("/config.properties");
BufferedReader bf = new BufferedReader(new InputStreamReader(inputStream, Charset.defaultCharset()));
properties.load(bf);
// 环境变量解析
resolvePlaceholders();
bf.close();
inputStream.close();
......@@ -48,6 +53,27 @@ public class PropertiesHelper implements Serializable {
}
return this.properties;
}
private void resolvePlaceholders() {
Pattern pattern = Pattern.compile("\\$\\{([^:}]+):([^}]+)\\}");
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
String key = (String) entry.getKey();
String value = (String) entry.getValue();
Matcher matcher = pattern.matcher(value);
StringBuffer resolvedValue = new StringBuffer();
while (matcher.find()) {
String envKey = matcher.group(1);
String defaultValue = matcher.group(2);
String envValue = System.getProperty(envKey, System.getenv(envKey));
if (envValue == null) {
envValue = defaultValue;
}
matcher.appendReplacement(resolvedValue, envValue);
}
matcher.appendTail(resolvedValue);
properties.setProperty(key, resolvedValue.toString());
}
}
public synchronized Properties getConsumerProperties() {
Properties consumerProps = new Properties();
......
config_name=config_dev.properties
\ No newline at end of file
config_name=config_${my.profile:dev}.properties
\ No newline at end of file
......@@ -175,9 +175,9 @@ device.cross=13NI00B5RM0:13NI00B5RM0,13NGH0B5RC0:13NGH0B5RC0,13NF80B5QN0:13NF80B
# \u57CE\u5E02\u5927\u8111 laneId \u4E0E\u4E07\u96C6 laneId \u6620\u5C04
brain.wanji.lane.id=test:test
jdbc.driver=com.mysql.cj.jdbc.Driver
jdbc.url=jdbc:mysql://localhost:3306/beihang?userUnicode=true&characterEncoding=utf-8
jdbc.username=root
jdbc.password=123456
\ No newline at end of file
jdbc.password=123456
origin.e1.topic=origin-e1
\ No newline at end of file
......@@ -10,7 +10,7 @@
<!-- 加载配置文件 -->
<!--<context:property-placeholder location="classpath:db.properties"/>-->
<context:property-placeholder location="classpath:db/dev/db.properties"/>
<context:property-placeholder location="classpath:db/${my.profile:dev}/db.properties"/>
<!-- 配置数据源 -->
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment