Commit 8d77cd3b authored by hanbing's avatar hanbing

城市大脑对接-推送设备数据

parent 6c992aae
package com.wanji.indicators.task.citybrain.devicestatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wanji.indicators.task.citybrain.vo.DeviceStatusVO;
import com.wanji.indicators.task.citybrain.vo.RequestBodyVO;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Dsl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
/**
* @author Kent HAN
* @date 2024/3/25 13:12
*/
public class DeviceStatusMain {
private static final Logger log = LoggerFactory.getLogger(DeviceStatusMain.class);
private static final ObjectMapper mapper = new ObjectMapper();
public static final Properties properties = PropertiesHelper.getInstance().getProperties();
// 城市大脑域名
public static final String domain;
// 城市大脑 URL 路径
public static final String urlPath;
public static final String appKey;
public static final String secretKey;
// 数据来源公司名称
public static final String from;
static {
domain = properties.getProperty("city.brain.domain");
urlPath = properties.getProperty("city.brain.device.status");
appKey = properties.getProperty("city.brain.app.key");
secretKey = properties.getProperty("city.brain.secret.key");
from = properties.getProperty("city.brain.from");
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> deviceStream = env.addSource(new PeriodicJdbcReader());
deviceStream
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new AllWindowFunction<Row, List<Row>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Row> values, Collector<List<Row>> out) {
List<Row> rows = new ArrayList<>();
for (Row value : values) {
rows.add(value);
}
out.collect(rows);
}
}).addSink(new SinkFunction<List<Row>>() {
@Override
public void invoke(List<Row> value, Context context) {
// 将收集到的数据发送至城市大脑
AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient();
try {
long timestamp = new Date().getTime();
// 生成签名
String signature = makeSignature(secretKey, timestamp);
String jsonBody = buildJsonFromValue(value);
// 发送 POST 请求
asyncHttpClient.preparePost(domain + urlPath)
.setHeader("Content-Type", "application/json;charset=utf-8")
.setHeader("timestamp", timestamp)
.setHeader("appkey", appKey)
.setHeader("signature",signature)
.setBody(jsonBody)
.execute()
.toCompletableFuture()
.thenAccept(response -> {
log.info("发送设备在线状态检测数据收到响应: " + response.getResponseBody());
});
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
asyncHttpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private String buildJsonFromValue(List<Row> rows) throws JsonProcessingException {
RequestBodyVO requestBodyVO = new RequestBodyVO();
RequestBodyVO.Header header = new RequestBodyVO.Header();
String[] parts = urlPath.split("/");
String name = parts[parts.length - 1]; // 获取最后一个元素
header.setName(name);
header.setFrom(from);
requestBodyVO.setHeaders(header);
requestBodyVO.setBody(convertBodyStr(rows));
// 将RequestBodyVO对象转换为JSON字符串
String jsonString = mapper.writeValueAsString(requestBodyVO);
// 创建一个只包含一个元素的列表
return "[" + jsonString + "]";
}
});
// 启动Flink作业
env.execute("推送设备在线状态");
}
private static String convertBodyStr(List<Row> rows) throws JsonProcessingException {
List<DeviceStatusVO> deviceStatusVOList = new ArrayList<>();
for (Row row : rows) {
DeviceStatusVO deviceStatusVO = new DeviceStatusVO();
deviceStatusVO.setData_time(new Date().getTime() + "");
deviceStatusVO.setDeviceNo((String) row.getField(0)); // row 只有一个字段
deviceStatusVOList.add(deviceStatusVO);
}
return mapper.writeValueAsString(deviceStatusVOList);
}
private static String makeSignature(String secretKey, long timestamp) {
try {
String toHash = secretKey + timestamp;
// 创建MessageDigest实例并使用MD5算法
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] digest = md.digest(toHash.getBytes());
// 将结果转换为16进制字符串
StringBuilder hexString = new StringBuilder();
for (byte aDigest : digest) {
String hex = Integer.toHexString(0xff & aDigest);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
// 提取并返回结果中的第2位到第12位(包含2不包含12,即10个字符)
return hexString.substring(2, 12);
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
return null;
}
}
}
package com.wanji.indicators.task.citybrain.devicestatus;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;
import java.util.Properties;
// 自定义数据源
class PeriodicJdbcReader extends RichSourceFunction<Row> {
private volatile boolean isRunning = true;
private JdbcInputFormat jdbcInputFormat;
public static final Properties properties = PropertiesHelper.getInstance().getProperties();
public static final String mysqlIp;
public static final String mysqlDbName;
static {
mysqlIp = properties.getProperty("mysql.ip");
mysqlDbName = properties.getProperty("mysql.db.name");
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化 JDBC 连接和查询
jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://" + mysqlIp + ":3306/" + mysqlDbName + "?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.setUsername("root")
.setPassword("Wanji300552")
.setQuery("SELECT equip_code FROM t_device_camera UNION ALL SELECT equip_code FROM t_device_radar;")
.setRowTypeInfo(new RowTypeInfo(Types.STRING))
.finish();
}
@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (isRunning) {
// 在这里执行查询
jdbcInputFormat.openInputFormat();
InputSplit[] inputSplits = jdbcInputFormat.createInputSplits(1);
for (InputSplit split : inputSplits) {
jdbcInputFormat.open(split);
while (!jdbcInputFormat.reachedEnd()) {
Row row = jdbcInputFormat.nextRecord(new Row(1));
if (row != null) {
ctx.collect(row);
}
}
}
jdbcInputFormat.close();
jdbcInputFormat.closeInputFormat();
// 等待5秒
Thread.sleep(5000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
\ No newline at end of file
package com.wanji.indicators.task.citybrain.vo;
import lombok.Data;
@Data
public class DeviceStatusVO {
private String data_time; // 字符串,毫秒级时间戳
private String deviceNo; // 设备编号
}
\ No newline at end of file
package com.wanji.indicators.task.citybrain.vo;
import lombok.Data;
@Data
public class RequestBodyVO {
private Header headers;
private String body;
@Data
public static class Header {
private String name;
private String from;
}
}
\ No newline at end of file
......@@ -102,7 +102,26 @@ cross.dirs=13NF80B5QN0:13NF80B5QN0-1,13NF80B5QN0-2,13NF80B5QN0-4,13NF80B5QN0-6;1
ignore.crosses=13NID0B5RM0
# MySQL IP
mysql.ip=37.12.182.29
mysql.ip=10.102.1.182
# MySQL \u6570\u636E\u5E93\u540D
mysql.db.name=holo_roadnet
\ No newline at end of file
mysql.db.name=holo_roadnet_jn
# 城市大脑接口,交通⽹(视频专⽹)访问,需要配置DNS指向 37.10.238.73,37.10.238.74
city.brain.domain=http://gw-in.jtbrain.gov
city.brain.app.key=bigdata_app
city.brain.secret.key=123456
# 设备在线状态检测数据
city.brain.device.status=/datain/device_test/device_status
# 轨迹数据
city.brain.target.info=/datain/device_test/target_info
# 过车信息数据
city.brain.passcar.info=/datain/device_test/passcar_info
# 静态排队数据
city.brain.static.queue=/datain/device_test/static_queue
# 动态排队数据
city.brain.dynamic.queue=/datain/device_test/dynamic_queue
# 区域数据
city.brain.area.data=/datain/device_test/area_data
# 数据来源公司名称
city.brain.from=wanji
\ No newline at end of file
......@@ -114,4 +114,24 @@ mysql.ip=37.12.182.29
mysql.db.name=holo_roadnet
# 路口非协调方向
cross.nocoord.dirs=13NI00B5RM0:1;13NGH0B5RC0:1,5;13NF80B5QN0:1,4
\ No newline at end of file
cross.nocoord.dirs=13NI00B5RM0:1;13NGH0B5RC0:1,5;13NF80B5QN0:1,4
# 城市大脑接口,交通⽹(视频专⽹)访问,需要配置DNS指向 37.10.238.73,37.10.238.74
city.brain.domain=http://gw-in.jtbrain.gov
city.brain.app.key=bigdata_app
city.brain.secret.key=123456
# 设备在线状态检测数据
# 设备在线状态检测数据
city.brain.device.status=/datain/device_test/device_status
# 轨迹数据
city.brain.target.info=/datain/device_test/target_info
# 过车信息数据
city.brain.passcar.info=/datain/device_test/passcar_info
# 静态排队数据
city.brain.static.queue=/datain/device_test/static_queue
# 动态排队数据
city.brain.dynamic.queue=/datain/device_test/dynamic_queue
# 区域数据
city.brain.area.data=/datain/device_test/area_data
# 数据来源公司名称
city.brain.from=wanji
\ No newline at end of file
12F6L08J630:12F6L08J63012F6S08J7P00:112.96313284188527,28.18318575342956;112.96303313946666,28.183194356219847;112.96305539467383,28.183428386959946;112.96315227754664,28.18342099486973;112.96313284188527,28.18318575342956
12F6L08J630:12F6L08J63012F7O08J5U00:112.96354410000906,28.18283950134074;112.96352853848853,28.18271853213667;112.9633037242585,28.182742264061066;112.96331953166877,28.18286652039383;112.96354410000906,28.18283950134074
12F6L08J630:12F6L08J63012F6F08J4600:112.96292323382805,28.182416762776572;112.96282914613639,28.182429462626725;112.96285131325381,28.182654093569635;112.962945350963,28.182641866754853;112.96292323382805,28.182416762776572
12F6S08J7P0:12F6S08J7P012F6S08J8T00:112.96379954396568,28.188516678001562;112.96370564700435,28.188527356789084;112.96373122467124,28.188746864962972;112.96382613127733,28.18873559293259;112.96379954396568,28.188516678001562
12F6S08J7P0:12F6S08J7P012F8308J7E00:112.96418312352651,28.188152488433946;112.9641502272407,28.18806876123086;112.9639356562015,28.188136413689957;112.96396894420842,28.188225208365164;112.96418312352651,28.188152488433946
12F6S08J7P0:12F6S08J7P012F6L08J6300:112.96357638149044,28.18786559221101;112.9634794865813,28.187875948798126;112.96350842973068,28.18810139099276;112.96360563818898,28.18808988000444;112.96357638149044,28.18786559221101
12F6S08J8T0:12F6S08J8T012F7L08J8S00:112.96418244046701,28.19192859057195;112.96417684807375,28.191833814493464;112.96395118642712,28.191839035877035;112.96395631087961,28.191933726561583;112.96418244046701,28.19192859057195
12F6S08J8T0:12F6S08J8T012F6S08J7P00:112.96374376109364,28.19149557373418;112.96364952758402,28.19148400543331;112.9636140217438,28.191706917471826;112.96370808697927,28.191718930682264;112.96374376109364,28.19149557373418
12F6S08J8T0:12F6S08J8T012F6N08JA100:112.96379087784844,28.192091194225977;112.96369664433882,28.192079625925107;112.9636611384986,28.192302537963624;112.96375520373407,28.19231455117406;112.96379087784844,28.192091194225977
\ No newline at end of file
13NID0B5RM0:13NID0B5RM013NHC0B5R400:117.09604169736885,36.644790032250896;117.09608051785587,36.6448244460881;117.0962419846417,36.6446995386553;117.09620449671407,36.64466409552374;117.09604169736885,36.644790032250896
13NID0B5RM0:13NID0B5RM013NH20B5RH00:117.09624256906865,36.64489034135067;117.09624592651616,36.64496882168675;117.09647304659556,36.64496025015195;117.09646623060874,36.644884251898674;117.09624256906865,36.64489034135067
13NGH0B5RC0:13NGH0B5RC013NF80B5QN00:117.08953880238352,36.64388059783646;117.08950438164997,36.64395233609795;117.08971033045316,36.644030528158645;117.08974645844097,36.64395897597499;117.08953880238352,36.64388059783646
13NID0B5RM0:13NID0B5RM013NI00B5RM00:117.0955355076057,36.64498427650557;117.09553346280966,36.645071521137126;117.09576213835423,36.64507503430101;117.09576377639532,36.644988033569696;117.0955355076057,36.64498427650557
13NGH0B5RC0:13NGH0B5RC013NEP0B5QJ00:117.0898749695255,36.643852310026844;117.08999168758082,36.64389566386026;117.09006533736702,36.643726045334844;117.0899566223769,36.643682252619385;117.0898749695255,36.643852310026844
13NGH0B5RC0:13NGH0B5RC013NI00B5RM00:117.09019785998227,36.644004177469945;117.09016996276503,36.64408081744988;117.09038057675839,36.6441482957196;117.09040920390312,36.644076046259094;117.09019785998227,36.644004177469945
13NF80B5QN0:13NF80B5QN013NED0B5Q900:117.08566152826947,36.64176235952611;117.08560972676949,36.64184040257543;117.08578777591498,36.64195684491826;117.08583397273655,36.64187414171029;117.08566152826947,36.64176235952611
13NF80B5QN0:13NF80B5QN013NGH0B5RC00:117.08605943390197,36.641911479079525;117.08599681823961,36.64198347196667;117.08617136408513,36.642100432473406;117.0862344119634,36.64202920541093;117.08605943390197,36.641911479079525
13NF80B5QN0:13NF80B5QN013NDG0B5RI00:117.08589754298806,36.64204909701344;117.08583946469253,36.642018242918944;117.08574434250956,36.64219449309183;117.08579989280231,36.642216304249715;117.08589754298806,36.64204909701344
13NF80B5QN0:13NF80B5QN013NDT0B5Q900:117.08609025384176,36.64171158042417;117.0860452683286,36.64167647809194;117.08589676368292,36.64179832731728;117.08594619058222,36.64183558031174;117.08609025384176,36.64171158042417
13NGH0B5RC0:13NGH0B5RC013NEH0B5RJ00:117.09006942695913,36.64421338839392;117.08992142320416,36.64416398791302;117.089869036946,36.64434016574915;117.09000808307756,36.64438106167019;117.09006942695913,36.64421338839392
13NI00B5RM0:13NI00B5RM013NGH0B5RC00:117.09423195012252,36.64493383820295;117.09422377093831,36.64501085885425;117.09444590928958,36.6450299635677;117.09445770644335,36.64495343331372;117.09423195012252,36.64493383820295
13NI00B5RM0:13NI00B5RM013NG40B5SK00:117.0948077755471,36.64508885308461;117.09466741966627,36.64509218048696;117.09467498926716,36.64527531914366;117.09481130900397,36.645269866354184;117.0948077755471,36.64508885308461
13NI00B5RM0:13NI00B5RM013NID0B5RM00:117.09485437114998,36.64487766904978;117.094851951221,36.64496660143979;117.09507645089205,36.64497132613058;117.09508054048415,36.64488544469639;117.09485437114998,36.64487766904978
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment