Commit 96cf7ad1 authored by zhoushiguang's avatar zhoushiguang

实时计算模块

parent 8a1624b6
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
<modules> <modules>
<module>wj-datacenter-service</module> <module>wj-datacenter-service</module>
<module>wj-realtime-computing</module>
</modules> </modules>
<!-- 依赖版本 --> <!-- 依赖版本 -->
......
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4" />
\ No newline at end of file
This diff is collapsed.
package com.wanji.indicators.config;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import java.util.Properties;
/**
* @author fengyi
* @date 2023/4/7
* @description
*/
public class RedisConfig {
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
public FlinkJedisConfigBase getRedisConfig(){
FlinkJedisConfigBase config = new FlinkJedisPoolConfig.Builder()
.setHost(properties.getProperty("redis.host"))
.setPassword(properties.getProperty("redis.password"))
.setPort(Integer.parseInt(properties.getProperty("redis.port")))
.setDatabase(Integer.parseInt(properties.getProperty("redis.database")))
.build();
return config;
}
}
package com.wanji.indicators.constant;
import java.util.HashMap;
import java.util.Map;
public class Constant {
//机动车类型集合,逗号分割
public static final String MOTOR_TYPES= "1,2,3,7,8,10,11,12,13,15";
public static final String AREA_REALTIME_INDICATOR= "AREA_REALTIME_INDICATOR";
public static final String HASH_KEY_TRAVEL_DATA = "TRAVEL_DATA";
public static final String HASH_KEY_AREA_FREE_FLOW_SPEED = "AREA_FREE_FLOW_SPEED";
public static final String HASH_KEY_AREA_MAX_QUEUE_LENGTH = "AREA_MAX_QUEUE_LENGTH";
}
package com.wanji.indicators.constant;
import java.util.Objects;
public enum DirectionFromCnToEn {
North("北方向", "North direction"),
Northeast("东北方向", "Northeast direction"),
East("东方向", "East direction"),
Southeast("东南方向", "Southeast direction"),
South("南方向", "South direction"),
Southwest("西南方向", "Southwest direction"),
West("西方向", "West direction"),
Northwest("西北方向", "Northwest direction");
private String cndir;
private String endir;
DirectionFromCnToEn(String cndir, String endir) {
this.cndir = cndir;
this.endir = endir;
}
public String getCndir() {
return this.cndir;
}
public void setCndir(String cndir) {
this.cndir = cndir;
}
public String getEndir() {
return this.endir;
}
public void setEndir(String endir) {
this.endir = endir;
}
public static String getEnDir(String dirName) {
DirectionFromCnToEn[] values = values();
for (DirectionFromCnToEn dirEnum : values) {
String dircn = dirEnum.getCndir();
if (Objects.equals(dirName, dircn))
return dirEnum.getEndir();
}
return null;
}
}
package com.wanji.indicators.constant;
public enum EventState {
GREEN(Integer.valueOf(1), "绿"),
YELLOW(Integer.valueOf(2), "黄"),
RED(Integer.valueOf(3), "红"),
DISSIPATE(Integer.valueOf(1), "已消散"),
NOT_DISSIPATE(Integer.valueOf(0), "未消散");
private Integer code;
private String name;
EventState(Integer code, String name) {
this.code = code;
this.name = name;
}
public Integer getCode() {
return this.code;
}
public void setCode(Integer code) {
this.code = code;
}
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
}
package com.wanji.indicators.constant;
public enum EventType {
EVENT(Integer.valueOf(0), "交通事件"),
CONFLICT_POINT(Integer.valueOf(1), "冲突点"),
RAPID_ACCELERATION(Integer.valueOf(2), "急加速"),
RAPID_DECELERATION(Integer.valueOf(3), "急减速"),
SHARP_TURN(Integer.valueOf(4), "急转弯"),
SPEEDING(Integer.valueOf(5), "违法超速"),
ACCIDENT(Integer.valueOf(6), "多车事故"),
ABNORMAL_PARKING(Integer.valueOf(7), "单车事故"),
RETROGRADE(Integer.valueOf(8), "逆向行驶"),
ILLEGAL_PARKING(Integer.valueOf(9), "违规停车"),
CROSSING_THE_ROAD(Integer.valueOf(10), "横穿路段"),
NOT_FOLLOWING_THE_LANE(Integer.valueOf(11), "不按车道行驶"),
CHANGE_LANE(Integer.valueOf(12), "违规变道"),
ABNORMAL_STOP(Integer.valueOf(13), "异常停车"),
OVERFLOW_CONGESTION(Integer.valueOf(14), "路口溢出"),
RUNNING_RED_LIGHT(Integer.valueOf(15), "闯红灯"),
EMERGENCY_LANE(Integer.valueOf(16), "占用应急车道"),
ROAD_CONGESTION(Integer.valueOf(17), "路段拥堵"),
PEDESTRIAN_RUNNING_RED_LIGHT(Integer.valueOf(18), "行人闯红灯"),
NON_VEHICLE_RUNNING_RED_LIGHT(Integer.valueOf(19), "非机动车闯红灯"),
PEDESTRIAN_INVADE(Integer.valueOf(20), "行人入侵机动车道"),
NON_VEHICLE_INVADE(Integer.valueOf(21), "非机动车入侵机动车道"),
RIGHT_TURN_NOT_SLOWING_DOWN(Integer.valueOf(22), "右转弯未减速"),
RIGHT_TURN_BEYOND_SAFE_SPEED(Integer.valueOf(23), "右转弯超出安全车速"),
WARNING_OF_TRUCK_TURNING_RIGHT(Integer.valueOf(24), "大型车右转预警"),
TURN_RIGHT_WITHOUT_STOPPING(Integer.valueOf(25), "大型车右转未停车"),
INTRUSION_RISK_ZONE(Integer.valueOf(26), "大型车入侵危险区"),
DANGEROUS_DRIVING_S(Integer.valueOf(27), "S型危险驾驶"),
FREQUENT_LANE_CHANGES(Integer.valueOf(28), "连续变道"),
CROSS_CONGESTION(Integer.valueOf(29), "路口拥堵"),
CROSS_DEADLOCK(Integer.valueOf(30), "路口死锁"),
CROSS_IMBALANCE(Integer.valueOf(31), "路口失衡"),
OUT_LANE(Integer.valueOf(32), "驶离车道");
private Integer code;
private String name;
EventType(Integer code, String name) {
this.code = code;
this.name = name;
}
public Integer getCode() {
return this.code;
}
public void setCode(Integer code) {
this.code = code;
}
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
public static String getNameByCode(int code) {
String result = "";
switch (code) {
case 1:
result = CONFLICT_POINT.getName();
break;
case 2:
result = RAPID_ACCELERATION.getName();
break;
case 3:
result = RAPID_DECELERATION.getName();
break;
case 4:
result = SHARP_TURN.getName();
break;
case 5:
result = SPEEDING.getName();
break;
case 6:
result = ACCIDENT.getName();
break;
case 7:
result = ABNORMAL_PARKING.getName();
break;
case 8:
result = RETROGRADE.getName();
break;
case 9:
result = ILLEGAL_PARKING.getName();
break;
case 10:
result = CROSSING_THE_ROAD.getName();
break;
case 11:
result = NOT_FOLLOWING_THE_LANE.getName();
break;
case 12:
result = CHANGE_LANE.getName();
break;
case 13:
result = ABNORMAL_STOP.getName();
break;
case 14:
result = OVERFLOW_CONGESTION.getName();
break;
case 15:
result = RUNNING_RED_LIGHT.getName();
break;
case 16:
result = EMERGENCY_LANE.getName();
break;
case 17:
result = ROAD_CONGESTION.getName();
break;
case 18:
result = PEDESTRIAN_RUNNING_RED_LIGHT.getName();
break;
case 19:
result = NON_VEHICLE_RUNNING_RED_LIGHT.getName();
break;
case 20:
result = PEDESTRIAN_INVADE.getName();
break;
case 21:
result = NON_VEHICLE_INVADE.getName();
break;
case 22:
result = RIGHT_TURN_NOT_SLOWING_DOWN.getName();
break;
case 23:
result = RIGHT_TURN_BEYOND_SAFE_SPEED.getName();
break;
case 24:
result = WARNING_OF_TRUCK_TURNING_RIGHT.getName();
break;
case 25:
result = TURN_RIGHT_WITHOUT_STOPPING.getName();
break;
case 26:
result = INTRUSION_RISK_ZONE.getName();
break;
case 27:
result = DANGEROUS_DRIVING_S.getName();
break;
case 28:
result = FREQUENT_LANE_CHANGES.getName();
break;
case 29:
result = CROSS_CONGESTION.getName();
break;
case 30:
result = CROSS_DEADLOCK.getName();
break;
case 31:
result = CROSS_IMBALANCE.getName();
break;
case 32:
result = OUT_LANE.getName();
break;
}
return result;
}
}
package com.wanji.indicators.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import java.time.LocalDateTime;
import com.baomidou.mybatisplus.annotation.TableId;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@EqualsAndHashCode(callSuper=false)
@TableName("t_base_lane_info")
public class BaseLaneInfo extends Model<BaseLaneInfo> {
private static final long serialVersionUID = 1L;
/**
* 车道编号(渠化编号_车道序号)
*/
@TableId(value = "id", type = IdType.AUTO)
private String id;
/**
* 车道代码
*/
private String code;
/**
* 车道序号,从左车道开始编号11、12、13...
*/
private Integer sort;
/**
* 车道类型:1路段车道;2进口车道;3出口车道;4左转弯待转区;6直行待行区
*/
private Integer type;
/**
* 车道方向:1北;2东北;3东;4东南;5南;6西南;7西;8西北
*/
private Integer dir;
/**
* 车道转向:1左转;2直行;3右转;4掉头;5直左;6直右;7左直右;8左右;9左转掉头;10直行掉头;11右转掉头;12左直掉头;13直右掉头;14左直右掉头;15左右掉头
*/
private Integer turn;
/**
* 车道类别:1机动车;2非机动车;3公交专用;4可变;5潮汐
*/
private Integer category;
/**
* 路口ID
*/
private String crossId;
/**
* 路段编号
*/
private String rid;
/**
* 渠化编号
*/
private String segmentId;
/**
* 车道长度
*/
private Double length;
/**
* 车道宽度
*/
private Double width;
/**
* 空间对象
*/
private String wkt;
/**
* 创建时间
*/
private LocalDateTime gmtCreate;
/**
* 修改时间
*/
private LocalDateTime gmtModified;
@Override
protected Serializable pkVal() {
return this.id;
}
private Integer ridLevel;
private String ridWkt;
private Double ridLength;
}
package com.wanji.indicators.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import java.time.LocalDateTime;
import com.baomidou.mybatisplus.annotation.TableId;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@EqualsAndHashCode(callSuper=false)
@TableName("t_base_rid_info")
public class BaseRidInfo extends Model<BaseRidInfo> {
private static final long serialVersionUID = 1L;
/**
* 路段编号(开始路口编号+结束路口编号+主辅路序号)
*/
@TableId(value = "id", type = IdType.AUTO)
private String id;
/**
* 路段名称
*/
private String name;
/**
* 道路编号
*/
private String roadId;
/**
* 道路名称
*/
private String roadName;
/**
* 道路方向编号
*/
private String roadDirId;
/**
* 开始路口编号
*/
private String startCrossId;
/**
* 结束路口编号
*/
private String endCrossId;
/**
* 驶出方向:1北;2东北;3东;4东南;5南;6西南;7西;8西北
*/
private Integer outDir;
/**
* 驶入方向:1北;2东北;3东;4东南;5南;6西南;7西;8西北
*/
private Integer inDir;
/**
* 驶出角度,正北顺时针0~359
*/
private Double startAngle;
/**
* 驶入角度,正北顺时针0~359
*/
private Double endAngle;
/**
* 行驶方向:0上行;1下行
*/
private Integer direction;
/**
* 顺序号:路段在道路方向的顺序号
*/
private Integer sort;
/**
* 路段走向:1 南向北;2 西向东;3 北向南;4 东向西;5 内环;6 外环;99 其他(参见路段走向字典)
*/
private Integer trend;
/**
* 道路等级:
41000 高速公路;
42000 国道;
43000 城市快速路;
44000 城市主干道;
45000 城市次干道;
47000 城市普通道路;
51000 省道;
52000 县道;
53000 乡道;
54000 县乡村内部道路;
49 小路(参见道路等级字典)
*/
private Integer level;
/**
* 行政区划代码,跨区数据会以逗号分开
*/
private String areaCode;
/**
* 路段长度(米)
*/
private Double length;
/**
* 路段宽度(米)
*/
private Double width;
/**
* 是否单行线:0否;1是;99其他
*/
private Integer isOneway;
/**
* 路段类型:1路段;3匝道;4隧道;5桥梁;6高架;99其他
*/
private Integer type;
/**
* 主辅标志:1主路;2辅路;99其他 (参见主辅标志字典)
*/
private Integer mainFlag;
/**
* 空间对象
*/
private String wkt;
/**
* 信控路段编号
*/
private String scId;
/**
* 信控路段名称
*/
private String scName;
/**
* 信控路段序号
*/
private Integer scSort;
/**
* 创建时间
*/
private LocalDateTime gmtCreate;
/**
* 修改时间
*/
private LocalDateTime gmtModified;
@Override
protected Serializable pkVal() {
return this.id;
}
}
package com.wanji.indicators.mapper;
import com.wanji.indicators.entity.BaseLaneInfo;
import java.util.List;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface BaseLaneInfoMapper extends BaseMapper<BaseLaneInfo>{
/**
* 查询表t_base_lane_info所有信息
*/
List<BaseLaneInfo> findAllBaseLaneInfo();
/**
* 根据主键id查询表t_base_lane_info信息
* @param id
*/
BaseLaneInfo findBaseLaneInfoByid(@Param("id") String id);
/**
* 根据条件查询表t_base_lane_info信息
* @param baseLaneInfo
*/
List<BaseLaneInfo> findBaseLaneInfoByCondition(BaseLaneInfo baseLaneInfo);
/**
* 根据主键id查询表t_base_lane_info信息
* @param id
*/
Integer deleteBaseLaneInfoByid(@Param("id") String id);
/**
* 根据主键id更新表t_base_lane_info信息
* @param baseLaneInfo
*/
Integer updateBaseLaneInfoByid(BaseLaneInfo baseLaneInfo);
/**
* 新增表t_base_lane_info信息
* @param baseLaneInfo
*/
Integer addBaseLaneInfo(BaseLaneInfo baseLaneInfo);
}
package com.wanji.indicators.mapper;
import com.wanji.indicators.entity.BaseRidInfo;
import java.util.List;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface BaseRidInfoMapper extends BaseMapper<BaseRidInfo>{
/**
* 查询表t_base_rid_info所有信息
*/
List<BaseRidInfo> findAllBaseRidInfo();
/**
* 根据主键id查询表t_base_rid_info信息
* @param id
*/
BaseRidInfo findBaseRidInfoByid(@Param("id") String id);
/**
* 根据条件查询表t_base_rid_info信息
* @param baseRidInfo
*/
List<BaseRidInfo> findBaseRidInfoByCondition(BaseRidInfo baseRidInfo);
/**
* 根据主键id查询表t_base_rid_info信息
* @param id
*/
Integer deleteBaseRidInfoByid(@Param("id") String id);
/**
* 根据主键id更新表t_base_rid_info信息
* @param baseRidInfo
*/
Integer updateBaseRidInfoByid(BaseRidInfo baseRidInfo);
/**
* 新增表t_base_rid_info信息
* @param baseRidInfo
*/
Integer addBaseRidInfo(BaseRidInfo baseRidInfo);
}
package com.wanji.indicators.model;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* @author fengyi
* @date 2023/3/1
* @description
*/
@Data
public class CarTrackModel implements Serializable {
/*******************************************/
/*全域ID*/
private String orgCode = "";
/*车辆ID*/
private Integer id;
/*车牌号*/
private String picLicense;
/*车牌颜色*/
private Integer licenseColor;
/*车辆颜色*/
private Integer originalColor;
/*车辆类型*/
private Integer originalType;
/*车辆所有人*/
private String owner;
/*使用性质*/
private Integer function;
/*所有权*/
private Integer ownership;
/*路网数据*/
private RoadNet roadnet;
/*******************************************/
/*格式化时间戳:yyyy-MM-dd HH:mm:ss.SSS*/
private String timeStamp;
/*数值时间戳*/
private Long globalTimeStamp;
/*经度*/
private Double longitude;
/*纬度*/
private Double latitude;
/*速度*/
private Double speed;
/*航向角*/
private Double courseAngle;
/*******************************************/
@Data
public static class RoadNet implements Serializable {
/*路口ID*/
private String crossId;
/*路段ID*/
private String rid;
/*车道ID*/
private String laneId;
/*渠化ID*/
private String segmentId;
}
}
package com.wanji.indicators.model;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* @author fengyi
* @date 2023/3/1
* @description
*/
@Data
public class FrameModel implements Serializable {
private String timeStamp;
private Long globalTimeStamp;
private Integer participantNum;
private String orgCode ="";
private List<CarTrackModel> e1FrameParticipant;
}
package com.wanji.indicators.model;
import com.alibaba.fastjson.annotation.JSONField;
import com.google.gson.JsonObject;
import lombok.Data;
import java.io.Serializable;
import java.util.*;
/**
* @author fengyi
* @date 2023/3/1
* @description
*/
@Data
public class RoutePathCarTrackModel implements Serializable {
//id域集合,不同的路口范围可能域ID和车辆ID发生变更 {"orgCode":"","id":""}
private Set<Map<String,Object>> targetId = new HashSet<>();
/*车牌号*/
private String picLicense;
/*路径起始点车辆时间*/
private String startTime;
/*起点所在车道*/
private String startLaneId;
/*起点所属路段*/
private String startRid;
/*起点所属渠化*/
private String startSegmentId;
/*起点所属路口*/
private String startCrossId;
/*起点坐标*/
private Double startLongitude;
/*终点坐标*/
private Double startLatitude;
/*起点时间戳*/
@JSONField(serialize = false)
private Long startGlobalTimeStamp;
/*路径终点车辆时间*/
private String endTime;
/*终点所在车道*/
private String endLaneId;
/*终点所属路段*/
private String endRid;
/*终点所属渠化*/
private String endSegmentId;
/*终点所属路口*/
private String endCrossId;
/*终点坐标*/
private Double endLongitude;
/*终点坐标*/
private Double endLatitude;
/*终点时间戳*/
@JSONField(serialize = false)
private Long endGlobalTimeStamp;
/*路径编号*/
//@JSONField(serialize = false)
private Integer lineNo = 0;
/*路径点序号*/
//@JSONField(serialize = false)
private Integer seqNo;
/*路径长度*/
//@JSONField(serialize = false)
private Double routeLength=0D;
//路径坐标集合
@JSONField(serialize = false)
private List<Double[]> pointList = new ArrayList<>();
/*上一个点坐标*/
@JSONField(serialize = false)
private Double preLongitude;
/*上一个点坐标*/
@JSONField(serialize = false)
private Double preLatitude;
/*上一个点时间戳*/
@JSONField(serialize = false)
private Long preTimestamp;
@JSONField(serialize = false)
private String preDateTime;
/*是否为完成计算的路径*/
@JSONField(serialize = false)
private boolean isFinished;
@JSONField(serialize = false)
private Long createTime;
}
package com.wanji.indicators.model;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
import java.util.Set;
/**
* @author fengyi
* @date 2023/3/1
* @description
*/
@Data
public class SingleCarTrackListModel implements Serializable {
/**
* 公用属性Start
*/
/*全域ID*/
private String orgCode;
/*车辆ID*/
private Integer id;
/*车牌号*/
private String picLicense;
/*车牌颜色*/
private Integer licenseColor;
/*车辆颜色*/
private Integer originalColor;
/*车辆类型*/
private Integer originalType;
/*车辆所有人*/
private String owner;
/*使用性质*/
private Integer function;
/*所有权*/
private Integer ownership;
/*窗口开始时间*/
@JSONField(format="yyyy-MM-dd HH:mm:ss.SSS")
private Date startTime;
/*窗口结束时间*/
@JSONField(format="yyyy-MM-dd HH:mm:ss.SSS")
private Date endTime;
/*批次全域监测开始时间*/
@JSONField(format="yyyy-MM-dd HH:mm:ss.SSS")
private Date globalStartTime;
/*批次全域监测结束时间*/
@JSONField(format="yyyy-MM-dd HH:mm:ss.SSS")
private Date globalEndTime;
//路网数据
private Set<CarTrackModel.RoadNet> roadnets;
//private CarTrackCommonProperty staticProperty;
/**
* 轨迹动态属性集合
*/
private List<CarTrackDynamicProperty> tracks;
// @Data
// @ToString
// public static class CarTrackCommonProperty implements Serializable {
// /*全域ID*/
// private Integer orgCode;
// /*车辆ID*/
// private Integer id;
// /*车牌号*/
// private String picLicense;
// /*车牌颜色*/
// private Integer licenseColor;
// /*车辆颜色*/
// private Integer originalColor;
// /*车辆类型*/
// private Integer originalType;
// /*车辆所有人*/
// private String owner;
// /*使用性质*/
// private String function;
// /*所有权*/
// private String ownership;
// /*窗口开始时间*/
// private String startTime;
// /*窗口结束时间*/
// private String endTime;
// /*批次全域监测开始时间*/
// private String globalStartTime;
// /*批次全域监测结束时间*/
// private String globalEndTime;
//
// }
@Data
public static class CarTrackDynamicProperty implements Serializable {
/*格式化时间戳:yyyy-MM-dd HH:mm:ss.SSS*/
private String timeStamp;
/*数值时间戳*/
private Long globalTimeStamp;
/*经度*/
private Double longitude;
/*纬度*/
private Double latitude;
/*速度*/
private Double speed;
/*航向角*/
private Double courseAngle;
}
}
package com.wanji.indicators.model;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* @author fengyi
* @date 2023/3/1
* @description
*/
@Data
public class SnapshotIndicatorModel implements Serializable {
private String timeStamp;
private Long laneNum;
private String orgCode ="";
private List<SnapshotInfo> eventList;
}
package com.wanji.indicators.model;
import lombok.Data;
import java.io.Serializable;
/**
* @author fengyi
* @date 2023/4/7
* @description
*/
@Data
public class SnapshotInfo implements Serializable {
private Long timestamp;
private String globalId;
Integer carNums;
double dynamicQueueLength;
String laneId;
double meanSpaceHeadway;
double staticQueueLength;
double vehicleLengthRatio;
double vehicleNumsRatio;
}
package com.wanji.indicators.model;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @author fengyi
* @date 2023/3/10
* @description
*/
@Data
public class TopicLastTimeModel implements Serializable {
private String topic;
private Long timestamp;
private Long offset;
private String bootstrapServers;
private long createTime;
}
package com.wanji.indicators.service;
import com.wanji.indicators.entity.BaseLaneInfo;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.core.metadata.IPage;
import java.util.List;
/**
* <p>
* 车道基础信息 服务类
* </p>
*
* @author fengyi
* @since 2023-04-06
*/
public interface BaseLaneInfoService extends IService<BaseLaneInfo> {
public List<BaseLaneInfo> findLaneRidInfo();
}
package com.wanji.indicators.service;
import com.wanji.indicators.entity.BaseRidInfo;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.core.metadata.IPage;
import java.util.List;
/**
* <p>
* 路段基础信息 服务类
* </p>
*
* @author fengyi
* @since 2023-04-05
*/
public interface BaseRidInfoService extends IService<BaseRidInfo> {
public List<BaseRidInfo> findAll();
}
package com.wanji.indicators.service.impl;
import com.vividsolutions.jts.geom.LineString;
import com.wanji.indicators.entity.BaseLaneInfo;
import com.wanji.indicators.entity.BaseRidInfo;
import com.wanji.indicators.mapper.BaseLaneInfoMapper;
import com.wanji.indicators.service.BaseLaneInfoService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.wanji.indicators.util.GeomsConvertUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
/**
* <p>
* 车道基础信息 服务实现类
* </p>
*
* @author fengyi
* @since 2023-04-06
*/
@Service
public class BaseLaneInfoServiceImpl extends ServiceImpl<BaseLaneInfoMapper, BaseLaneInfo> implements BaseLaneInfoService {
@Resource
private BaseLaneInfoMapper baseLaneInfoMapper;
@Override
public List<BaseLaneInfo> findLaneRidInfo() {
List<BaseLaneInfo> list = baseLaneInfoMapper.findAllBaseLaneInfo();
for (BaseLaneInfo model : list) {
String wkt = model.getRidWkt();
LineString lineString = GeomsConvertUtil.getLineString(wkt);
double ridLength = lineString.getLength();
model.setRidLength(ridLength);
}
return list;
}
}
package com.wanji.indicators.service.impl;
import com.vividsolutions.jts.geom.LineString;
import com.wanji.indicators.entity.BaseRidInfo;
import com.wanji.indicators.mapper.BaseRidInfoMapper;
import com.wanji.indicators.service.BaseRidInfoService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.wanji.indicators.util.ArithOfBigDecmial;
import com.wanji.indicators.util.GeomsConvertUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
/**
* <p>
* 路段基础信息 服务实现类
* </p>
*
* @author fengyi
* @since 2023-04-05
*/
@Service
public class BaseRidInfoServiceImpl extends ServiceImpl<BaseRidInfoMapper, BaseRidInfo> implements BaseRidInfoService {
@Resource
private BaseRidInfoMapper baseRidInfoMapper;
@Override
public List<BaseRidInfo> findAll() {
List<BaseRidInfo> list = baseRidInfoMapper.findAllBaseRidInfo();
for (BaseRidInfo model : list) {
String wkt = model.getWkt();
LineString lineString = GeomsConvertUtil.getLineString(wkt);
double ridLength = lineString.getLength();
model.setLength(ridLength);
}
return null;
}
public static void main(String[] args) {
String wkt = "112.96358875770298,28.188337641846843;112.96340547895414,28.18724784621854;112.96322220020323,28.18615803948525;112.96310925849781,28.184595086003952;112.96292227780222,28.18303210967499";
LineString lineString = GeomsConvertUtil.getLineString(wkt);
double ridLength = lineString.getLength();
System.out.println(ArithOfBigDecmial.mul(ridLength, 6371000 * Math.PI / 180));
}
}
package com.wanji.indicators.source;
import com.wanji.indicators.entity.BaseLaneInfo;
import com.wanji.indicators.service.BaseLaneInfoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 查询 进口道对应右转出口rid,及右转出口RID可以进入的各进口RID
*/
@Slf4j
public class LaneRidLengthSource extends RichParallelSourceFunction<Map<String,List<BaseLaneInfo>>> {
private volatile boolean running = true;
@Resource
BaseLaneInfoService baseLaneInfoService;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ApplicationContext beanConf = new ClassPathXmlApplicationContext("spring-container.xml");
baseLaneInfoService = beanConf.getBean(BaseLaneInfoService.class);
}
@Override
public void run(SourceContext<Map<String,List<BaseLaneInfo>>> sourceContext) throws Exception {
if (running) {
List<BaseLaneInfo> list = baseLaneInfoService.findLaneRidInfo();
if (list.size() > 0) {
Map<String,List<BaseLaneInfo>> group = list.stream().collect(Collectors.groupingBy(o->o.getId()));
sourceContext.collect(group);
}
}
}
@Override
public void cancel() {
running = false;
}
}
package com.wanji.indicators.task.export;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import lombok.Data;
import org.apache.kafka.common.record.TimestampType;
import java.io.Serializable;
/**
* @author fengyi
* @date 2023/3/10
* @description
*/
@Data
public class KafkaRecordModel implements Serializable {
private TimestampType timestampType ;
private Long timestamp;
private JSONObject values;
private String topic;
}
package com.wanji.indicators.task.export;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import com.wanji.indicators.model.TopicLastTimeModel;
import com.wanji.indicators.task.track.service.func.SplitRoutePathFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReadKafkaDeserialization implements KafkaDeserializationSchema<KafkaRecordModel> {
private static final Logger log = LoggerFactory.getLogger(SplitRoutePathFunction.class);
private final String encoding = "UTF8";
public ReadKafkaDeserialization( ) {
}
@Override
public TypeInformation<KafkaRecordModel> getProducedType() {
return TypeInformation.of(KafkaRecordModel.class);
}
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
}
@Override
public boolean isEndOfStream(KafkaRecordModel nextElement) {
return false;
}
@Override
public KafkaRecordModel deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
if (consumerRecord != null) {
try {
String json = new String(consumerRecord.value(),"UTF-8");
JSONObject jsonObject = JSONObject.parseObject(json);
KafkaRecordModel model = new KafkaRecordModel();
model.setTimestamp(consumerRecord.timestamp());
model.setTimestampType(consumerRecord.timestampType());
model.setValues(jsonObject);
model.setTopic(consumerRecord.topic());
return model;
} catch (Exception e) {
log.error("deserialize failed : " + e.getMessage());
}
}
return null;
}
}
\ No newline at end of file
package com.wanji.indicators.task.export;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.RoutePathCarTrackModel;
import com.wanji.indicators.task.track.service.sink.FileSink;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class TrackExportToFileMain {
private static final Logger log = LoggerFactory.getLogger(TrackExportToFileMain.class);
public static void main(String[] args) {
//批量轨迹es index,逗号分割:esIndexName,windowTime
ParameterTool parameter = ParameterTool.fromArgs(args);
String bootstrapServers = parameter.get("bootstrapServers");
String topic = parameter.get("topic");
String startTime = parameter.get("startTime");
String endTime = parameter.get("endTime");
String sinkPath = parameter.get("sinkPath");//目录
if (!sinkPath.endsWith("/")) {
sinkPath += "/";
}
sinkPath += startTime + "-" + endTime + ".json";
log.info("导出路径:" + sinkPath);
long startTimestamp = DateUtil.StringToMillis(startTime, "yyyyMMddHHmmss");
long endTimestamp = DateUtil.StringToMillis(endTime, "yyyyMMddHHmmss");
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<KafkaRecordModel> source = KafkaSource.<KafkaRecordModel>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setGroupId(properties.getProperty("consumer.group.id") + "-Export")
// 从大于等于此时间戳开始的偏移量开始
.setStartingOffsets(OffsetsInitializer.timestamp(startTimestamp))
.setDeserializer(KafkaRecordDeserializationSchema.of(new ReadKafkaDeserialization()))
.build();
DataStream<KafkaRecordModel> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-car-track-source");
SingleOutputStreamOperator<KafkaRecordModel> frameModelStream =
stream.filter(new FilterFunction<KafkaRecordModel>() {
@Override
public boolean filter(KafkaRecordModel value) throws Exception {
Long ts = value.getTimestamp();
//Long endTs = DateUtil.StringToMillis("2023-03-30 11:40:00:999","yyyy-MM-dd HH:mm:ss:SSS");
if (ts <= endTimestamp) {
return true;
}
return false;
}
}).name("过滤出时间范围内数据");
SingleOutputStreamOperator<KafkaRecordModel> timeOutStream = frameModelStream
.keyBy(KafkaRecordModel::getTopic)
.process(new KeyedProcessFunction<String, KafkaRecordModel, KafkaRecordModel>() {
private transient MapState<String, Long> dataTimeState;
//连续点间隔超时时间,单位分钟
private Integer timeOut = 10;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(12))//状态过期时间
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.cleanupFullSnapshot()
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)//不返回过期值
.build();
MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("Kafka_Data_Export",
TypeInformation.of(new TypeHint<Tuple3<String, Integer, String>>() {
}),
TypeInformation.of(new TypeHint<RoutePathCarTrackModel>() {
})
);
mapStateDescriptor.enableTimeToLive(ttlConfig);
dataTimeState = getRuntimeContext().getMapState(mapStateDescriptor);
}
@Override
public void processElement(KafkaRecordModel value, Context context, Collector<KafkaRecordModel> out) throws Exception {
out.collect(value);
int interval = 60 * 1000;
long fireTime = context.timerService().currentProcessingTime() + interval;
context.timerService().registerProcessingTimeTimer(fireTime);
dataTimeState.put(value.getTopic(), context.timerService().currentProcessingTime());
//log.info("register process time:" + DateUtil.toDateTime(fireTime, "yyyy-MM-dd HH:mm:ss"));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<KafkaRecordModel> out) throws Exception {
Iterator<Map.Entry<String, Long>> iterator = dataTimeState.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> entry = iterator.next();
long dataTime = entry.getValue();
//log.info("dataTime:"+DateUtil.toDateTime(dataTime,"yyyy-MM-dd HH:mm:ss")+" onTime:" + DateUtil.toDateTime(timestamp, "yyyy-MM-dd HH:mm:ss"));
if (timestamp - dataTime > 30 * 1000) {
//超时未收到新数据
ctx.timerService().deleteProcessingTimeTimer(timestamp);
System.exit(0);
}
}
}
});
SingleOutputStreamOperator<String> toJsonStream =
timeOutStream.flatMap(
new FlatMapFunction<KafkaRecordModel, String>() {
@Override
public void flatMap(KafkaRecordModel value, Collector<String> out) throws Exception {
Long ts = value.getTimestamp();
//Long endTs = DateUtil.StringToMillis("2023-03-30 11:40:00:999","yyyy-MM-dd HH:mm:ss:SSS");
// if (ts > endTimestamp) {
// System.exit(0);
// }
log.info("记录时间:{},数据导出范围【{},{}】", DateUtil.toDateTime(ts, "yyyy-MM-dd HH:mm:ss.SSS"), startTime, endTime);
String json = JSON.toJSONString(value);
out.collect(json);
}
})
.name("转Json输出");
try {
toJsonStream.addSink(new FileSink(sinkPath));
env.execute("根据起始时间范围导出数据到文件");
env.executeAsync().cancel();
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.task.freeflow.service.func;
import com.wanji.indicators.task.freeflow.service.model.FrameMaxSpeedModel;
import com.wanji.indicators.task.freeflow.service.model.MaxSpeedResultModel;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
public class SpeedAggregateFunction implements AggregateFunction<FrameMaxSpeedModel,MaxSpeedResultModel,MaxSpeedResultModel> {
private static final Logger log = LoggerFactory.getLogger(SpeedAggregateFunction.class);
@Override
public MaxSpeedResultModel createAccumulator() {
MaxSpeedResultModel initModel = new MaxSpeedResultModel();
initModel.setAvgSpeed(0D);
initModel.setMaxSpeed(0d);
initModel.setSumSpeed(0D);
initModel.setCount(0);
return initModel;
}
/**
* 每进入一个数据就会执行一次
* @param value 当前进入的数据
* @param accumulator 之前计算好的中间结果
* @return
*/
@Override
public MaxSpeedResultModel add(FrameMaxSpeedModel value, MaxSpeedResultModel accumulator) {
accumulator.setMaxSpeed(value.getSpeed()>accumulator.getMaxSpeed()?value.getSpeed():accumulator.getMaxSpeed());
accumulator.setAvgSpeed((value.getSpeed()+accumulator.getAvgSpeed())/2);
accumulator.setSumSpeed(value.getSpeed()+accumulator.getSumSpeed());
accumulator.setCount(accumulator.getCount()+1);
//System.out.println("avgSpeed:"+accumulator.getAvgSpeed()+":"+accumulator.getSumSpeed()/accumulator.getCount()+" maxSpeed:"+accumulator.getMaxSpeed());
return accumulator;
}
/*
当window的结束时间到达时,触发这个方法,返回结果
*/
@Override
public MaxSpeedResultModel getResult(MaxSpeedResultModel accumulator) {
//log.info("AggResult:"+accumulator);
return accumulator;
}
/**
* 在session窗口才会用到merge,时间窗口其实用不到
* @param a
* @param b
* @return
*/
@Override
public MaxSpeedResultModel merge(MaxSpeedResultModel a, MaxSpeedResultModel b) {
return null;
}
}
package com.wanji.indicators.task.freeflow.service.func;
import com.wanji.indicators.task.freeflow.service.model.MaxSpeedResultModel;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
public class SpeedProcessWindow extends ProcessWindowFunction<
MaxSpeedResultModel,
MaxSpeedResultModel,
String,
TimeWindow> {
private static final Logger log = LoggerFactory.getLogger(SpeedProcessWindow.class);
@Override
public void process(String s, Context context, Iterable<MaxSpeedResultModel> elements, Collector<MaxSpeedResultModel> out) throws Exception {
long windowStartTs = context.window().getStart();
long windowEndTs = context.window().getEnd();
List<MaxSpeedResultModel> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList());
if (elements.iterator().hasNext()) {
MaxSpeedResultModel result = elements.iterator().next();
result.setStartTime(DateUtil.toDateTime(windowStartTs,"yyyy-MM-dd HH:mm:ss.SSS"));
result.setEndTime(DateUtil.toDateTime(windowEndTs,"yyyy-MM-dd HH:mm:ss.SSS"));
result.setDate(DateUtil.toDateTime(windowEndTs,"yyyy-MM-dd"));
log.info("FreeFlowSpeed Result:" + result.toString());
out.collect(result);
}
}
}
package com.wanji.indicators.task.freeflow.service.model;
import com.wanji.indicators.constant.Constant;
import lombok.Data;
import java.io.Serializable;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
@Data
public class FrameMaxSpeedModel implements Serializable {
private String id= Constant.HASH_KEY_AREA_FREE_FLOW_SPEED;
private Double speed;
private long timestamp;
//yyyyMMdd
private String date;
}
package com.wanji.indicators.task.freeflow.service.model;
import com.wanji.indicators.constant.Constant;
import lombok.Data;
import java.io.Serializable;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
@Data
public class MaxSpeedResultModel implements Serializable {
private String id= Constant.HASH_KEY_AREA_FREE_FLOW_SPEED;
private String startTime;
private String endTime;
private Double maxSpeed;
private Double avgSpeed;
private Double sumSpeed;
private Integer count;
private long timestamp;
//yyyyMMdd
private String date;
}
package com.wanji.indicators.task.freeflow.service.sink;
import com.alibaba.fastjson.JSON;
import com.wanji.indicators.task.freeflow.service.model.MaxSpeedResultModel;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* @author fengyi
* @date 2023/4/7
* @description
*/
public class CustomRedisMapper implements RedisMapper<MaxSpeedResultModel> {
private String redisKey ;
public CustomRedisMapper(String redisKey){
this.redisKey = redisKey;
}
// 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, this.redisKey);
}
@Override
public String getKeyFromData(MaxSpeedResultModel data) {
return data.getId();
}
@Override
public String getValueFromData(MaxSpeedResultModel data) {
return JSON.toJSONString(data);
}
}
package com.wanji.indicators.task.freeflow.stream;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.task.freeflow.service.sink.CustomRedisMapper;
import com.wanji.indicators.config.RedisConfig;
import com.wanji.indicators.task.freeflow.service.model.FrameMaxSpeedModel;
import com.wanji.indicators.task.freeflow.service.model.MaxSpeedResultModel;
import com.wanji.indicators.task.freeflow.service.func.SpeedAggregateFunction;
import com.wanji.indicators.task.freeflow.service.func.SpeedProcessWindow;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
/**
* 自由流速度计算
*/
public class FreeFlowSpeedMain {
private static final Logger log = LoggerFactory.getLogger(FreeFlowSpeedMain.class);
public static void main(String[] args) {
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DateTime currentDateTime = new DateTime();
//过去一天0点
DateTime dateTime = currentDateTime.withMillisOfDay(0).plusDays(-1);
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(properties.getProperty("consumer.topic"))
.setGroupId(properties.getProperty("consumer.group.id") + "_Free_Flow_Speed")
/* 设置起始偏移量有以下几种情况:
1.从指定的位置消费:OffsetsInitializer.offsets(Map< TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
// 从大于等于此时间戳开始的偏移量开始
.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-car-track-source");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("轨迹帧数据-JsonToObject");
SingleOutputStreamOperator<FrameMaxSpeedModel> carTrackModelStream =
frameModelStream.
flatMap(new FlatMapFunction<FrameModel, FrameMaxSpeedModel>() {
@Override
public void flatMap(FrameModel value, Collector<FrameMaxSpeedModel> out) throws Exception {
List<CarTrackModel> list = value.getE1FrameParticipant();
String motorObjectType = Constant.MOTOR_TYPES;
String[] sps = motorObjectType.split(",");
List<String> typeList = Arrays.asList(sps);
//取每帧机动车最大速度
OptionalDouble doubleStream = list.stream().filter(o->typeList.contains(o.getOriginalType().toString())).mapToDouble(CarTrackModel::getSpeed).max();
if (doubleStream.isPresent()) {
double maxSpeed = doubleStream.getAsDouble();
FrameMaxSpeedModel model = new FrameMaxSpeedModel();
model.setSpeed(maxSpeed);
model.setTimestamp(value.getGlobalTimeStamp());
model.setDate(DateUtil.toDateTime(value.getGlobalTimeStamp(),"yyyyMMdd"));
//log.info("Frame MaxSpeed:"+maxSpeed+" "+DateUtil.toDateTime(value.getGlobalTimeStamp(),"yyyy-MM-dd HH:mm:ss.SSS"));
out.collect(model);
}
}
})
.setParallelism(1)
.name("计算每帧机动车最大速度");
SingleOutputStreamOperator<MaxSpeedResultModel> freeSpeedStream = carTrackModelStream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<FrameMaxSpeedModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()))
.keyBy(FrameMaxSpeedModel::getDate)
//.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16)))
.window(TumblingEventTimeWindows.of(Time.days(1)))
.aggregate(new SpeedAggregateFunction(),new SpeedProcessWindow())
.name("按天取最大速度-作为自由流速度");
try {
//每2次取较大者更新到redis
//SingleOutputStreamOperator<MaxSpeedResultModel> maxStream = freeSpeedStream.keyBy(MaxSpeedResultModel::getDate).countWindow(2).maxBy("maxSpeed");
//maxStream.print();
freeSpeedStream.addSink( new RedisSink<>(new RedisConfig().getRedisConfig(), new CustomRedisMapper(Constant.AREA_REALTIME_INDICATOR)));
env.execute("自由流速度计算工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.task.monitor.service.func;
import com.wanji.indicators.model.TopicLastTimeModel;
import com.wanji.indicators.task.track.service.func.SplitRoutePathFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CustomKafkaDeserialization implements KafkaDeserializationSchema<TopicLastTimeModel> {
private static final Logger log = LoggerFactory.getLogger(SplitRoutePathFunction.class);
private final String encoding = "UTF8";
public CustomKafkaDeserialization( ) {
}
@Override
public TypeInformation<TopicLastTimeModel> getProducedType() {
return TypeInformation.of(TopicLastTimeModel.class);
}
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
}
@Override
public boolean isEndOfStream(TopicLastTimeModel nextElement) {
return false;
}
@Override
public TopicLastTimeModel deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
if (consumerRecord != null) {
try {
String topic = consumerRecord.topic();
long ts = consumerRecord.timestamp();
long offset = consumerRecord.offset();
TopicLastTimeModel model = new TopicLastTimeModel();
model.setTimestamp(ts);
model.setTopic(topic);
model.setOffset(offset);
model.setCreateTime(System.currentTimeMillis());
return model;
} catch (Exception e) {
log.error("deserialize failed : " + e.getMessage());
}
}
return null;
}
}
\ No newline at end of file
package com.wanji.indicators.task.monitor.service.func;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.HashMap;
import java.util.Map;
/**
* @author fengyi
* @date 2023/3/13
* @description
*/
public class KafkaCustomerOffsetSource extends RichParallelSourceFunction<Map<String, Integer>> {
@Override
public void run(SourceContext<Map<String, Integer>> ctx) throws Exception {
}
@Override
public void cancel() {
}
}
package com.wanji.indicators.task.monitor.service.func;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.TopicLastTimeModel;
import com.wanji.indicators.task.track.service.func.ProcessCarTrackByKey;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.HttpUtil;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.joda.time.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
/**
* @author fengyi
* @date 2023/3/12
* @description
*/
public class TopicMonitorKeyProcessFunction extends KeyedProcessFunction<String, TopicLastTimeModel, TopicLastTimeModel> {
private static final Logger log = LoggerFactory.getLogger(TopicMonitorKeyProcessFunction.class);
private Properties properties;
private ValueState<TopicLastTimeModel> valueState;
private int delay;//无数据间隔 单位分钟
private int timeOut;//数据超时时间 单位分钟
public TopicMonitorKeyProcessFunction(int delay, int timeOut, Properties properties) {
this.delay = delay;
this.timeOut = timeOut;
this.properties = properties;
}
@Override
public void open(Configuration parameters) throws Exception {
TypeInformation<TopicLastTimeModel> topicInfo = TypeInformation.of(new TypeHint<TopicLastTimeModel>() {
});
valueState = getRuntimeContext().getState(new ValueStateDescriptor<TopicLastTimeModel>("TOPIC_LAST_STATE", topicInfo));
}
@Override
public void processElement(TopicLastTimeModel value, Context ctx, Collector<TopicLastTimeModel> out) throws Exception {
TopicLastTimeModel currentTopicModel = valueState.value();
if (currentTopicModel == null) {
currentTopicModel = new TopicLastTimeModel();
BeanUtils.copyProperties(value, currentTopicModel);
currentTopicModel.setBootstrapServers(properties.getProperty("bootstrap.servers"));
//X分钟定时器
long coalescedTime = ((ctx.timerService().currentProcessingTime() + delay * 60 * 1000) / 1000) * 1000;
log.info("Topic:"+currentTopicModel.getTopic()+" 数据时间:" + DateUtil.toDateTime(currentTopicModel.getTimestamp(), "yyyy-MM-dd HH:mm:ss.SSS") + ",当前时间:" + DateUtil.toDateTime(ctx.timerService().currentProcessingTime(), "yyyy-MM-dd HH:mm:ss.SSS") + " ,定时器下次触发时间:" + DateUtil.toDateTime(coalescedTime, "yyyy-MM-dd HH:mm:ss.SSS"));
//注册x分钟定时器
ctx.timerService().registerProcessingTimeTimer(coalescedTime);
} else {
currentTopicModel.setTimestamp(value.getTimestamp());
}
valueState.update(currentTopicModel);
out.collect(currentTopicModel);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<TopicLastTimeModel> out) throws Exception {
TopicLastTimeModel current = valueState.value();
log.info(ctx.getCurrentKey() + "==>onTimer:" + DateUtil.toDateTime(ctx.timestamp(), "yyyy-MM-dd HH:mm:ss.SSS") + ",数据时间:" + DateUtil.toDateTime(current.getTimestamp(), "yyyy-MM-dd HH:mm:ss.SSS"));
if (Objects.nonNull(current)) {
DateTime nowTime = DateTime.now();
DateTime dataTime = new DateTime(current.getTimestamp());
long minute = 0;
if (dataTime.isBeforeNow()) {
//数据时间与当前时间的时间差
Interval interval = new Interval(dataTime, nowTime);
Duration duration = interval.toDuration();
minute = duration.getStandardMinutes();
}
// StringBuilder stringBuilder = new StringBuilder();
// stringBuilder.append("Kafka全域感知数据检测:\r\n");
// stringBuilder.append("问题描述:全域感知数据于 " + dataTime.toString("yyyy-MM-dd HH:mm:ss") + " 开始无数据\r\n");
// stringBuilder.append("Kafka服务地址:" + current.getBootstrapServers() + "\r\n");
// stringBuilder.append("KafkaTopic:" + current.getTopic() + "\r\n");
// stringBuilder.append("检测时间:" + nowTime.toString("yyyy-MM-dd HH:mm:ss"));
JSONObject jsonObject = new JSONObject();
jsonObject.put("msg_type", "text");
JSONObject contentObject = new JSONObject();
contentObject.put("text", "Kafka Topic:" + current.getTopic() + "于" + dataTime.toString("yyyy-MM-dd HH:mm:ss") + "开始无数据");
jsonObject.put("content", contentObject);
log.info(jsonObject.toString());
out.collect(current);
String at = "<at user_id=\\\\\\\"ou_xxx\\\\\\\">Tom</at>";
if (minute > this.timeOut) {
Map<String, String> propertyMap = new HashMap<>();
propertyMap.put("Content-Type", "application/json");
String urlStr = properties.getProperty("alarm.feishu.url");
byte[] bytes = HttpUtil.httpRequest(urlStr, "POST", jsonObject.toJSONString(), "UTF-8", 10, propertyMap);
if (bytes != null && bytes.length > 0) {
String json = new String(bytes, "UTF-8");
log.info("飞书告警请求:{}\r\n" + "飞书告警响应:{}", jsonObject.toString(), json);
}
}
long coalescedTime = ((ctx.timerService().currentProcessingTime() + delay * 60 * 1000) / 1000) * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);
log.info("Topic:"+current.getTopic()+" 数据时间:" + DateUtil.toDateTime(current.getTimestamp(), "yyyy-MM-dd HH:mm:ss.SSS") + "," + DateUtil.toDateTime(nowTime.getMillis(), "yyyy-MM-dd HH:mm:ss.SSS") + ",定时器下次触发时间: " + DateUtil.toDateTime(coalescedTime, "yyyy-MM-dd HH:mm:ss.SSS"));
}
}
}
package com.wanji.indicators.task.monitor.stream;
import com.wanji.indicators.model.TopicLastTimeModel;
import com.wanji.indicators.task.monitor.service.func.CustomKafkaDeserialization;
import com.wanji.indicators.task.monitor.service.func.TopicMonitorKeyProcessFunction;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
public class KafkaMonitorMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
private static final Logger log = LoggerFactory.getLogger(TopicMonitorKeyProcessFunction.class);
public static void main(String[] args) {
int freqTime = 5;
int timeOut = 30;
ParameterTool parameter = ParameterTool.fromArgs(args);
//topic列表,逗号分割
String topicList = parameter.get("topicList");
freqTime = Integer.valueOf(parameter.get("freqTime","5"));
timeOut = Integer.valueOf(parameter.get("timeOut","5"));
String[] topicArray = topicList.split(",");
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
//构建消费者
Consumer<String, String> consumer = createConsumer(properties);
//获取最后消费的topic分区offset
Map<TopicPartition, Long> endingOffsetMap = getEndingOffsets(consumer,topicArray);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(2);
DateTime now = DateTime.now();
DateTime preDateTime = now.minusHours(24);
KafkaSource<TopicLastTimeModel> source = KafkaSource.<TopicLastTimeModel>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topicArray)
.setGroupId(properties.getProperty("consumer.group.id") + "_DATA_MONITOR")
//.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setStartingOffsets(OffsetsInitializer.offsets(endingOffsetMap))
.setDeserializer(KafkaRecordDeserializationSchema.of(new CustomKafkaDeserialization()))
.build();
DataStream<TopicLastTimeModel> stream = env
.fromSource(source, WatermarkStrategy.<TopicLastTimeModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
, "kafka-data-monitor-source");
SingleOutputStreamOperator<TopicLastTimeModel> frameModelStream =
stream
.keyBy(TopicLastTimeModel::getTopic)
.process(new TopicMonitorKeyProcessFunction(freqTime, timeOut, properties))
// .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)))
// .process(new ProcessWindowFunction<TopicLastTimeModel, TopicLastTimeModel, String, TimeWindow>() {
// @Override
// public void process(String s, Context context, Iterable<TopicLastTimeModel> elements, Collector<TopicLastTimeModel> out) throws Exception {
// System.out.println("***************************");
// }
// })
.name("Kafka-Topic 数据监控");
try {
//frameModelStream.print();
env.execute("Kafka-Topic数据监控");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
@SuppressWarnings("unchecked")
public static Map<TopicPartition, Long> getEndingOffsets(Consumer<String, String> consumer,String[] topicArray) {
Map<TopicPartition, Long> endingOffsetMap = new HashMap<>();
try {
//topic列表
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
for (String topic : topicArray) {
List<PartitionInfo> partitionInfos = topics.get(topic);
if (partitionInfos == null) {
log.warn("Partition information was not found for topic {}", topic);
} else {
Collection<TopicPartition> partitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
partitions.add(new TopicPartition(topic, partitionInfo.partition()));
}
//最新offset
Map<TopicPartition, Long> endingOffsets = consumer.endOffsets(partitions);
Map<TopicPartition, Long> offsetMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : endingOffsets.entrySet()){
offsetMap.put(entry.getKey(), entry.getValue()-1);
}
endingOffsetMap.putAll(offsetMap);
}
}
} finally {
consumer.close();
}
return endingOffsetMap;
}
private static Consumer<String, String> createConsumer(Properties properties) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("bootstrap.servers"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getProperty("consumer.group.id") + "_DATA_MONITOR");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
final Consumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
}
package com.wanji.indicators.task.queuelength.service.func;
import com.wanji.indicators.model.SnapshotInfo;
import com.wanji.indicators.model.SnapshotIndicatorModel;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 帧数据转换
*/
public class ParseSnapshotInfoFlatMap implements FlatMapFunction<SnapshotIndicatorModel, SnapshotInfo> {
private static final Logger log = LoggerFactory.getLogger(ParseSnapshotInfoFlatMap.class);
@Override
public void flatMap(SnapshotIndicatorModel snapshotIndicatorModel, Collector<SnapshotInfo> collector) throws Exception {
String timeStamp = snapshotIndicatorModel.getTimeStamp();
long ts = DateUtil.StringToMillis(timeStamp, "yyyy-MM-dd HH:mm:ss:SSS");
String globalId = snapshotIndicatorModel.getOrgCode();
List<SnapshotInfo> list = snapshotIndicatorModel.getEventList();
list.forEach(o -> {
o.setTimestamp(ts);
o.setGlobalId(globalId);
collector.collect(o);
});
}
/**
* 获取随机数
*
* @param min
* @param max
* @return
*/
public static int getRandom(int min, int max) {
int floor = (int) Math.floor(Math.random() * (max - min + 1) + min);
return floor;
}
}
package com.wanji.indicators.task.queuelength.service.func;
import com.wanji.indicators.model.SnapshotInfo;
import com.wanji.indicators.task.freeflow.service.model.FrameMaxSpeedModel;
import com.wanji.indicators.task.queuelength.service.model.MaxQueueResultModel;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
public class QueueAggregateFunction implements AggregateFunction<SnapshotInfo,MaxQueueResultModel, MaxQueueResultModel> {
private static final Logger log = LoggerFactory.getLogger(QueueAggregateFunction.class);
@Override
public MaxQueueResultModel createAccumulator() {
MaxQueueResultModel initModel = new MaxQueueResultModel();
initModel.setMaxQueueLength(0D);
return initModel;
}
/**
* 每进入一个数据就会执行一次
* @param value 当前进入的数据
* @param accumulator 之前计算好的中间结果
* @return
*/
@Override
public MaxQueueResultModel add(SnapshotInfo value, MaxQueueResultModel accumulator) {
accumulator.setMaxQueueLength(value.getDynamicQueueLength()>accumulator.getMaxQueueLength()?value.getDynamicQueueLength():accumulator.getMaxQueueLength());
//System.out.println("avgSpeed:"+accumulator.getAvgSpeed()+":"+accumulator.getSumSpeed()/accumulator.getCount()+" maxSpeed:"+accumulator.getMaxSpeed());
return accumulator;
}
/*
当window的结束时间到达时,触发这个方法,返回结果
*/
@Override
public MaxQueueResultModel getResult(MaxQueueResultModel accumulator) {
//log.info("AggResult:"+accumulator);
return accumulator;
}
/**
* 在session窗口才会用到merge,时间窗口其实用不到
* @param a
* @param b
* @return
*/
@Override
public MaxQueueResultModel merge(MaxQueueResultModel a, MaxQueueResultModel b) {
return null;
}
}
package com.wanji.indicators.task.queuelength.service.func;
import com.wanji.indicators.task.queuelength.service.model.MaxQueueResultModel;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
public class QueueProcessWindow extends ProcessWindowFunction<
MaxQueueResultModel,
MaxQueueResultModel,
String,
TimeWindow> {
private static final Logger log = LoggerFactory.getLogger(QueueProcessWindow.class);
@Override
public void process(String s, Context context, Iterable<MaxQueueResultModel> elements, Collector<MaxQueueResultModel> out) throws Exception {
long windowStartTs = context.window().getStart();
long windowEndTs = context.window().getEnd();
List<MaxQueueResultModel> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList());
if (elements.iterator().hasNext()) {
MaxQueueResultModel result = elements.iterator().next();
result.setStartTime(DateUtil.toDateTime(windowStartTs,"yyyy-MM-dd HH:mm:ss.SSS"));
result.setEndTime(DateUtil.toDateTime(windowEndTs,"yyyy-MM-dd HH:mm:ss.SSS"));
log.info("MaxQueueLength Result:" + result.toString());
out.collect(result);
}
}
}
package com.wanji.indicators.task.queuelength.service.func;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.SnapshotIndicatorModel;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 帧数据转换
*/
public class SnapshotFlatMap implements FlatMapFunction<String, SnapshotIndicatorModel> {
private static final Logger log = LoggerFactory.getLogger(SnapshotFlatMap.class);
@Override
public void flatMap(String trackJson, Collector<SnapshotIndicatorModel> collector) throws Exception {
SnapshotIndicatorModel frameModel = JSONObject.parseObject(trackJson, SnapshotIndicatorModel.class);
collector.collect(frameModel);
}
}
package com.wanji.indicators.task.queuelength.service.model;
import com.wanji.indicators.constant.Constant;
import lombok.Data;
import java.io.Serializable;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
@Data
public class MaxQueueResultModel implements Serializable {
private String id= Constant.HASH_KEY_AREA_MAX_QUEUE_LENGTH;
private String startTime;
private String endTime;
private Double maxQueueLength;
}
package com.wanji.indicators.task.queuelength.service.sink;
import com.alibaba.fastjson.JSON;
import com.wanji.indicators.task.freeflow.service.model.MaxSpeedResultModel;
import com.wanji.indicators.task.queuelength.service.model.MaxQueueResultModel;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* @author fengyi
* @date 2023/4/7
* @description
*/
public class QueueRedisMapper implements RedisMapper<MaxQueueResultModel> {
private String redisKey ;
public QueueRedisMapper(String redisKey){
this.redisKey = redisKey;
}
// 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, this.redisKey);
}
@Override
public String getKeyFromData(MaxQueueResultModel data) {
return data.getId();
}
@Override
public String getValueFromData(MaxQueueResultModel data) {
return JSON.toJSONString(data);
}
}
package com.wanji.indicators.task.queuelength.stream;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.SnapshotIndicatorModel;
import com.wanji.indicators.model.SnapshotInfo;
import com.wanji.indicators.task.freeflow.service.sink.CustomRedisMapper;
import com.wanji.indicators.config.RedisConfig;
import com.wanji.indicators.task.queuelength.service.model.MaxQueueResultModel;
import com.wanji.indicators.task.queuelength.service.func.ParseSnapshotInfoFlatMap;
import com.wanji.indicators.task.queuelength.service.func.QueueAggregateFunction;
import com.wanji.indicators.task.queuelength.service.func.QueueProcessWindow;
import com.wanji.indicators.task.queuelength.service.func.SnapshotFlatMap;
import com.wanji.indicators.task.queuelength.service.sink.QueueRedisMapper;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Properties;
/**
* 自由流速度计算
*/
public class MaxQueueLengthMain {
private static final Logger log = LoggerFactory.getLogger(MaxQueueLengthMain.class);
public static void main(String[] args) {
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DateTime currentDateTime = new DateTime();
//过去一天0点
DateTime dateTime = currentDateTime.withMillisOfDay(0).plusDays(-1);
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(properties.getProperty("consumer.snapshot.indicator.topic"))
.setGroupId(properties.getProperty("consumer.group.id") + "_snapshot")
/* 设置起始偏移量有以下几种情况:
1.从指定的位置消费:OffsetsInitializer.offsets(Map< TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
// 从大于等于此时间戳开始的偏移量开始
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-car-track-source");
SingleOutputStreamOperator<SnapshotIndicatorModel> frameModelStream =
stream
.flatMap(new SnapshotFlatMap())
.setParallelism(1)
.name("每秒快照数据-JsonToObject");
SingleOutputStreamOperator<SnapshotInfo> carTrackModelStream =
frameModelStream.
flatMap(new ParseSnapshotInfoFlatMap())
.setParallelism(1)
.name("快照车道数据-FlatMap");
SingleOutputStreamOperator<MaxQueueResultModel> freeSpeedStream = carTrackModelStream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<SnapshotInfo>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()))
.keyBy(SnapshotInfo::getGlobalId)
//.window(TumblingEventTimeWindows.of(Time.minutes(2)))
.window(SlidingEventTimeWindows.of(Time.minutes(2),Time.seconds(1)))
.aggregate(new QueueAggregateFunction(),new QueueProcessWindow())
.name("每5秒计算2分钟最大排队长度");
try {
freeSpeedStream.addSink( new RedisSink<MaxQueueResultModel>(new RedisConfig().getRedisConfig(), new QueueRedisMapper(Constant.AREA_REALTIME_INDICATOR)));
env.execute("2分钟最大排队长度计算工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.task.track.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.SingleCarTrackListModel;
import com.wanji.indicators.task.track.service.func.ProcessCarTrackByKey;
import com.wanji.indicators.task.track.service.key.KeySelectorGlobalIdPlateNo;
import com.wanji.indicators.task.track.service.sink.BatchTrackElasticSearchSinkFunction;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.ElasticSearchSinkUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.springframework.beans.factory.annotation.Value;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class AreaRealTimeIndicatorMainNew implements Serializable {
private final StreamExecutionEnvironment env;
private final PropertiesHelper instance;
private Properties p = new Properties();
private String path;
private String indexName = "area.realtime.indicator";
private int windowTime = 2;
private AreaRealTimeIndicatorMainNew(StreamExecutionEnvironment env, String paramInfo) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = this.instance.getProperties();
}
public static AreaRealTimeIndicatorMainNew init(StreamExecutionEnvironment env, String paramInfo) {
return new AreaRealTimeIndicatorMainNew(env,paramInfo);
}
public void run(SingleOutputStreamOperator<CarTrackModel> carTrackModelStream) {
//筛选机动车类型数据
SingleOutputStreamOperator<CarTrackModel> filterStream =
carTrackModelStream.filter(new FilterFunction<CarTrackModel>() {
@Override
public boolean filter(CarTrackModel value) throws Exception {
String motorObjectType = Constant.MOTOR_TYPES;
String[] sps = motorObjectType.split(",");
List<String> typeList = Arrays.asList(sps);
String carType = value.getOriginalType().toString();
if (typeList.contains(carType)){
return true;
}
return false;
}
}).name("筛选机动车数据");
SingleOutputStreamOperator<JSONObject> groupByPlateStream =
filterStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(CarTrackModel::getOrgCode)
.window(SlidingEventTimeWindows.of(Time.minutes(this.windowTime),Time.seconds(5)))
.process(new ProcessWindowFunction<CarTrackModel, JSONObject, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<CarTrackModel> elements, Collector<JSONObject> out) throws Exception {
Iterator<CarTrackModel> iterator = elements.iterator();
List<CarTrackModel> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList());
//平均速度
double avgSpeed = list.stream().filter(o->o.getSpeed()>0).collect(Collectors.averagingDouble(o->o.getSpeed())).doubleValue();
//目标数量
long count = list.stream().collect(Collectors.groupingBy(o->o.getId())).size();
JSONObject jsonObject = new JSONObject();
jsonObject.put("vehicleNum",count);
jsonObject.put("avgSpeed",avgSpeed);
out.collect(jsonObject);
long ws = context.window().getStart();
long we = context.window().getEnd();
System.out.println(DateUtil.toDateTime(ws,"yyyy-MM-dd HH:mm:ss.SSS")+" "+DateUtil.toDateTime(we,"yyyy-MM-dd HH:mm:ss.SSS"));
}
})
.setParallelism(2)
.name("车辆轨迹分组-(全域ID-车辆ID-车牌号)");
//从配置文件中读取 es 的地址
try {
groupByPlateStream.print();
List<HttpHost> esAddresses = ElasticSearchSinkUtil.getEsAddresses(p.getProperty("elasticsearch.server"));
BatchTrackElasticSearchSinkFunction elasticsearchSinkFunction = new BatchTrackElasticSearchSinkFunction(indexName);
//ElasticSearchSinkUtil.addSink(esAddresses, 2, groupByPlateStream, elasticsearchSinkFunction);
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
}
package com.wanji.indicators.task.track.service;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.RoutePathCarTrackModel;
import com.wanji.indicators.task.track.service.func.SplitRoutePathFunction;
import com.wanji.indicators.task.track.service.key.KeySelectorGlobalIdPlateNo;
import com.wanji.indicators.task.track.service.sink.PartitionRoutePathElasticSearchSinkFunction;
import com.wanji.indicators.task.track.service.sink.RoutePathElasticSearchSinkFunction;
import com.wanji.indicators.util.ElasticSearchSinkUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.http.HttpHost;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
public class CarRoutePathMainNew {
private final StreamExecutionEnvironment env;
private final PropertiesHelper instance;
private Properties p = new Properties();
private String path;
private String indexName;
private CarRoutePathMainNew(StreamExecutionEnvironment env, String indexName) {
this.env = env;
this.indexName = indexName;
this.instance = PropertiesHelper.getInstance();
this.p = this.instance.getProperties();
}
public static CarRoutePathMainNew init(StreamExecutionEnvironment env, String indexName) {
return new CarRoutePathMainNew(env, indexName);
}
public void run(SingleOutputStreamOperator<CarTrackModel> carTrackModelStream) {
//给每个车辆轨迹点进行路径规划分段,连续两点超过半小时的重新生成一条新路线
SingleOutputStreamOperator<RoutePathCarTrackModel> splitLineStream = carTrackModelStream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(new KeySelectorGlobalIdPlateNo())
.process(new SplitRoutePathFunction())
.setParallelism(2).name("提取车辆通行路径");
//从配置文件中读取 es 的地址
try {
List<HttpHost> esAddresses = ElasticSearchSinkUtil.getEsAddresses(p.getProperty("elasticsearch.server"));
RoutePathElasticSearchSinkFunction elasticsearchSinkFunction = new RoutePathElasticSearchSinkFunction(indexName);
ElasticSearchSinkUtil.addSink(esAddresses, 2, splitLineStream, elasticsearchSinkFunction);
PartitionRoutePathElasticSearchSinkFunction elasticsearchSinkFunction1 = new PartitionRoutePathElasticSearchSinkFunction(indexName);
ElasticSearchSinkUtil.addSink(esAddresses, 2, splitLineStream, elasticsearchSinkFunction1);
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
}
package com.wanji.indicators.task.track.service;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.SingleCarTrackListModel;
import com.wanji.indicators.task.track.service.func.ProcessCarTrackByKey;
import com.wanji.indicators.task.track.service.key.KeySelectorGlobalIdPlateNo;
import com.wanji.indicators.task.track.service.sink.BatchTrackElasticSearchSinkFunction;
import com.wanji.indicators.task.track.service.sink.PartitionBatchTrackElasticSearchSinkFunction;
import com.wanji.indicators.util.ElasticSearchSinkUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.http.HttpHost;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
public class TrackStoreMainNew {
private final StreamExecutionEnvironment env;
private final PropertiesHelper instance;
private Properties p = new Properties();
private String path;
private String indexName ;
//窗口时间,单位秒
private Integer windowTime=5;
private TrackStoreMainNew(StreamExecutionEnvironment env,String paramInfo) {
this.env = env;
String[] sps = paramInfo.split(",");
this.indexName = sps[0];
if (sps.length>1)
this.windowTime = Integer.parseInt(sps[1]);
this.instance = PropertiesHelper.getInstance();
this.p = this.instance.getProperties();
}
public static TrackStoreMainNew init(StreamExecutionEnvironment env,String paramInfo) {
return new TrackStoreMainNew(env,paramInfo);
}
public void run(SingleOutputStreamOperator<CarTrackModel> carTrackModelStream) {
//根据车辆上报时间每5分钟一个窗口累积
SingleOutputStreamOperator<SingleCarTrackListModel> groupByPlateStream =
carTrackModelStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(new KeySelectorGlobalIdPlateNo())
.window(TumblingEventTimeWindows.of(Time.seconds(this.windowTime)))
//.trigger(new CountTriggerWithTimeout<>(Integer.MAX_VALUE,TimeCharacteristic.EventTime))
.process(new ProcessCarTrackByKey())
.setParallelism(2)
.name("车辆轨迹分组-(全域ID-车辆ID-车牌号)");
//从配置文件中读取 es 的地址
try {
List<HttpHost> esAddresses = ElasticSearchSinkUtil.getEsAddresses(p.getProperty("elasticsearch.server"));
BatchTrackElasticSearchSinkFunction elasticsearchSinkFunction = new BatchTrackElasticSearchSinkFunction(indexName);
ElasticSearchSinkUtil.addSink(esAddresses, 2, groupByPlateStream, elasticsearchSinkFunction);
PartitionBatchTrackElasticSearchSinkFunction elasticsearchSinkFunction1 = new PartitionBatchTrackElasticSearchSinkFunction(indexName);
ElasticSearchSinkUtil.addSink(esAddresses, 2, groupByPlateStream, elasticsearchSinkFunction1);
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
}
package com.wanji.indicators.task.track.service.func;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.model.RoutePathCarTrackModel;
import com.wanji.indicators.util.CarNumGenerator;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.protocol.types.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* 帧数据转换
*/
public class CarTrackFlatMap implements FlatMapFunction<FrameModel, CarTrackModel> {
private static final Logger log = LoggerFactory.getLogger(CarTrackFlatMap.class);
//孪生出的数据暂时没有车牌号,通过ID映射一个随机车牌号作为测试
static Map<Integer,String[]> randomCarNumMap = new ConcurrentHashMap<>();
static String motorObjectType = "1,2,3,7,8,10,11,12,13,15";
boolean isMockPlate = false;
@Override
public void flatMap(FrameModel frameModel, Collector<CarTrackModel> collector) throws Exception {
String timeStamp = frameModel.getTimeStamp();
Long globalTimeStamp = frameModel.getGlobalTimeStamp();
Integer participantNum = frameModel.getParticipantNum();
String globalId = frameModel.getOrgCode();
List<CarTrackModel> list = frameModel.getE1FrameParticipant();
String[] sps = motorObjectType.split(",");
List<String> motorTypeList = Arrays.asList(sps);
list.forEach(o->{
String type = o.getOriginalType().toString();
/*****************模拟车牌号、车辆颜色、车牌颜色******************************/
if (isMockPlate) {
if (motorTypeList.contains(type)) {
Integer id = o.getId();
String[] mockData = randomCarNumMap.get(id);
if (Objects.isNull(mockData)) {
String carNum = CarNumGenerator.getCarNum();
Integer picColor = getRandom(0, 7);
Integer bodyColor = getRandom(1, 9);
Integer function = getRandom(1, 5);
Integer ownership = getRandom(1, 5);
mockData = new String[]{carNum, picColor.toString(), bodyColor.toString(), function.toString(), ownership.toString()};
randomCarNumMap.put(id, mockData);
}
o.setPicLicense(mockData[0]);
o.setLicenseColor(Integer.valueOf(mockData[1]));
o.setOriginalColor(Integer.valueOf(mockData[2]));
o.setFunction(Integer.valueOf(mockData[3]));
o.setOwnership(Integer.valueOf(mockData[4]));
}
}
/***********************************************/
o.setTimeStamp(DateUtil.toDateTime(globalTimeStamp,"yyyy-MM-dd HH:mm:ss.SSS"));
o.setGlobalTimeStamp(globalTimeStamp);
o.setOrgCode(globalId);
collector.collect(o);
});
}
/**
* 获取随机数
*
* @param min
* @param max
* @return
*/
public static int getRandom(int min, int max) {
int floor = (int) Math.floor(Math.random() * (max - min + 1) + min);
return floor;
}
}
package com.wanji.indicators.task.track.service.func;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.model.FrameModel;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 帧数据转换
*/
public class FrameFlatMap implements FlatMapFunction<String, FrameModel> {
private static final Logger log = LoggerFactory.getLogger(FrameFlatMap.class);
@Override
public void flatMap(String trackJson, Collector<FrameModel> collector) throws Exception {
FrameModel frameModel = JSONObject.parseObject(trackJson, FrameModel.class);
collector.collect(frameModel);
}
}
package com.wanji.indicators.task.track.service.func;
import com.wanji.indicators.model.CarTrackModel;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
/**
* @author fengyi
* @date 2023/3/3
* @description
*/
public class MyWaterMark implements AssignerWithPeriodicWatermarks<CarTrackModel> {
private final long maxTimeLag = 3000; // 3 seconds
@Override
public long extractTimestamp(CarTrackModel element, long previousElementTimestamp) {
return element.getGlobalTimeStamp();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
package com.wanji.indicators.task.track.service.func;
/**
* @author fengyi
* @date 2023/3/1
* @description
*/
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.SingleCarTrackListModel;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import java.util.*;
import java.util.stream.Collectors;
//@Slf4j
public class ProcessCarTrackByKey extends ProcessWindowFunction<CarTrackModel, SingleCarTrackListModel, Tuple3<String,Integer,String>, TimeWindow> {
private static final Logger log = LoggerFactory.getLogger(ProcessCarTrackByKey.class);
@Override
public void process(Tuple3<String, Integer, String> integerIntegerStringTuple3, Context context, Iterable<CarTrackModel> iterable, Collector<SingleCarTrackListModel> out) throws Exception {
Iterator<CarTrackModel> iterator = iterable.iterator();
SingleCarTrackListModel singleCarTrackListModel = new SingleCarTrackListModel();
List<SingleCarTrackListModel.CarTrackDynamicProperty> trackList = new ArrayList<>();
int i = 0;
CarTrackModel tmp = null;
Set<CarTrackModel.RoadNet> roadNets = new HashSet<>();
while (iterator.hasNext()) {
CarTrackModel carTrackModel = iterator.next();
// if (i==0) {
// //提取公共属性
// BeanUtils.copyProperties(carTrackModel,singleCarTrackListModel);
// //窗口的开始时间
// long winStartTimeStamp = context.window().getStart();
// //窗口的截止时间
// long winEndTimeStamp = context.window().getEnd();
// singleCarTrackListModel.setStartTime(new Date(winStartTimeStamp));
// singleCarTrackListModel.setEndTime(new Date(winEndTimeStamp));
// }
//提取动态属性
SingleCarTrackListModel.CarTrackDynamicProperty dynamicProperty = new SingleCarTrackListModel.CarTrackDynamicProperty();
dynamicProperty.setCourseAngle(carTrackModel.getCourseAngle());
dynamicProperty.setLongitude(carTrackModel.getLongitude());
dynamicProperty.setLatitude(carTrackModel.getLatitude());
dynamicProperty.setSpeed(carTrackModel.getSpeed());
dynamicProperty.setTimeStamp(carTrackModel.getTimeStamp());
dynamicProperty.setGlobalTimeStamp(carTrackModel.getGlobalTimeStamp());
if (!trackList.contains(dynamicProperty)) {
trackList.add(dynamicProperty);
roadNets.add(carTrackModel.getRoadnet());
}
tmp = carTrackModel;
i++;
}
Date globalStartTime = null;
Date globalEndTime = null;
if (!trackList.isEmpty()) {
//按时间排序
trackList = trackList.stream().sorted(Comparator.comparing(SingleCarTrackListModel.CarTrackDynamicProperty::getGlobalTimeStamp)).collect(Collectors.toList());
//提取公共属性
BeanUtils.copyProperties(tmp, singleCarTrackListModel);
//时段内第一帧检测时间
globalStartTime = new Date(trackList.get(0).getGlobalTimeStamp());
//时段内最后一帧检测时间
globalEndTime = new Date(trackList.get(trackList.size()-1).getGlobalTimeStamp());
//窗口的开始时间
long winStartTimeStamp = context.window().getStart();
//窗口的截止时间
long winEndTimeStamp = context.window().getEnd();
singleCarTrackListModel.setStartTime(new Date(winStartTimeStamp));
singleCarTrackListModel.setEndTime(new Date(winEndTimeStamp));
singleCarTrackListModel.setGlobalStartTime(globalStartTime);
singleCarTrackListModel.setGlobalEndTime(globalEndTime);
}
singleCarTrackListModel.setTracks(trackList);
singleCarTrackListModel.setRoadnets(roadNets);
// log.info("plateNo:{},startTime:{},endTime:{},size:{}",singleCarTrackListModel.getStaticProperty().getPicLicense(),singleCarTrackListModel.getStaticProperty().getStartTime(),singleCarTrackListModel.getStaticProperty().getEndTime(),trackList.size());
out.collect(singleCarTrackListModel);
}
public static void main(String[] args) {
System.out.println(new Date());
}
}
package com.wanji.indicators.task.track.service.func;//package com.wanji.indicators.project.event.track.service.func;
//
//import com.wanji.indicators.model.CarTrackModel;
//import org.apache.flink.api.common.eventtime.WatermarkGenerator;
//import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
//import org.apache.flink.api.common.eventtime.WatermarkOutput;
//import org.apache.flink.api.common.eventtime.WatermarkStrategy;
//import org.apache.flink.api.java.tuple.Tuple2;
//
///**
// * @author fengyi
// * @date 2023/3/3
// * @description
// */
//public class WatermarkDemoFunction implements WatermarkStrategy<CarTrackModel> {
//
// private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);
//
// @Override
// public WatermarkGenerator<CarTrackModel> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
// return new WatermarkGenerator<CarTrackModel>() {
// private long maxWatermark;
//
// @Override
// public void onEvent(CarTrackModel waterSensor, long l, WatermarkOutput watermarkOutput) {
// maxWatermark = Math.max(maxWatermark,waterSensor.getAmount());
// System.out.println("maxWatermark is " + maxWatermark);
// state.f0 = System.currentTimeMillis();
// state.f1 = false;
// }
// @Override
// public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
// //乱序时间
// long outOfTime = 3000L;
// if (maxWatermark - outOfTime <=0){
// } else {
// //10s内没有数据则关闭当前窗口
// if (System.currentTimeMillis() - state.f0 >= 10000L && !state.f1){
// watermarkOutput.emitWatermark(new Watermark(maxWatermark + 5000L));
// state.f1 = true;
// System.out.println("触发窗口");
// } else {
// System.out.println("正常发送水印");
// watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));
// }
// }
// }
// };
// }
//}
\ No newline at end of file
package com.wanji.indicators.task.track.service.key;
import com.wanji.indicators.model.CarTrackModel;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
public class KeySelectorGlobalIdPlateNo implements KeySelector<CarTrackModel, Tuple3<String, Integer, String>> {
//以车牌号和颜色组合为唯一标识key
@Override
public Tuple3<String, Integer, String> getKey(CarTrackModel carTrackModel) throws Exception {
return new Tuple3<>(carTrackModel.getOrgCode(),carTrackModel.getId(),carTrackModel.getPicLicense());
}
}
package com.wanji.indicators.task.track.service.key;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.RoutePathCarTrackModel;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.Map;
public class KeySelectorRoutePath implements KeySelector<RoutePathCarTrackModel, Tuple2<String, Integer>> {
//以车牌号和颜色组合为唯一标识key
@Override
public Tuple2<String, Integer> getKey(RoutePathCarTrackModel carTrackModel) throws Exception {
return new Tuple2<>(carTrackModel.getPicLicense(),carTrackModel.getLineNo());
}
}
package com.wanji.indicators.task.track.service.sink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.wanji.indicators.model.SingleCarTrackListModel;
import com.wanji.indicators.util.DateUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.UUID;
/**
* @author fengyi
* @date 2023/3/2
* @description
*/
public class BatchTrackElasticSearchSinkFunction implements ElasticsearchSinkFunction<SingleCarTrackListModel>, Serializable {
private static Logger log = LoggerFactory.getLogger(BatchTrackElasticSearchSinkFunction.class);
private String index;
public BatchTrackElasticSearchSinkFunction(String index) {
this.index = index;
}
public IndexRequest createIndexRequest(SingleCarTrackListModel element) {
String jsonStr = JSON.toJSONString(element, SerializerFeature.WRITE_MAP_NULL_FEATURES);
JSONObject jsonObject = JSONObject.parseObject(jsonStr, Feature.OrderedField);
jsonObject.put("recordTime", DateUtil.getNowTime("yyyy-MM-dd HH:mm:ss.SSS"));
//String id = element.getOrgCode()+"_"+element.getId()+"_"+element.getPicLicense()+"_"+element.getG;
return Requests.indexRequest()
.index(index)
// .type(StringUtils.isEmpty(element.getPicLicense()) ?element.getId().toString():element.getPicLicense())
// .id(UUID.randomUUID().toString())
// .timeout(TimeValue.timeValueSeconds(60))
.source(jsonObject.toJSONString(), XContentType.JSON);
}
@Override
public void process(SingleCarTrackListModel row, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
try {
requestIndexer.add(createIndexRequest(row));
}catch (Exception e){
e.printStackTrace();
}
}
}
package com.wanji.indicators.task.track.service.sink;
import com.wanji.indicators.util.FileUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileSink implements SinkFunction<String> {
private static final Logger log = LoggerFactory.getLogger(FileSink.class);
private String path;
public FileSink(String path) {
this.path = path;
}
@Override
public void invoke(String json, Context context) {
if(StringUtils.isNotBlank(path)){
FileUtil.writeApend(path, json);
}
}
}
package com.wanji.indicators.task.track.service.sink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.wanji.indicators.model.SingleCarTrackListModel;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.ElasticSearchSinkUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.http.HttpHost;
import org.apache.ibatis.session.SqlSession;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
/**
* @author fengyi
* @date 2023/3/2
* @description
*/
public class PartitionBatchTrackElasticSearchSinkFunction implements ElasticsearchSinkFunction<SingleCarTrackListModel>, Serializable {
private static Logger log = LoggerFactory.getLogger(PartitionBatchTrackElasticSearchSinkFunction.class);
private String index;
private static Map<String,Boolean> mappingFlag = new HashMap<>();
@Override
public void open() throws Exception {
}
public PartitionBatchTrackElasticSearchSinkFunction(String index) {
this.index = index;
}
public IndexRequest createIndexRequest(SingleCarTrackListModel element) {
String jsonStr = JSON.toJSONString(element, SerializerFeature.WRITE_MAP_NULL_FEATURES);
JSONObject jsonObject = JSONObject.parseObject(jsonStr, Feature.OrderedField);
jsonObject.put("recordTime", DateUtil.getNowTime("yyyy-MM-dd HH:mm:ss.SSS"));
//按天对索引进行分区
String index = getIndexNameByDate(element);
ElasticSearchSinkUtil.createIndex(index,true,getMappingInfo());
//String id = element.getOrgCode()+"_"+element.getId()+"_"+element.getPicLicense()+"_"+element.getG;
IndexRequest indexRequest = Requests.indexRequest()
.index(index)
// .type(StringUtils.isEmpty(element.getPicLicense()) ?element.getId().toString():element.getPicLicense())
// .id(UUID.randomUUID().toString())
// .timeout(TimeValue.timeValueSeconds(60))
.source(jsonObject.toJSONString(), XContentType.JSON);
return indexRequest;
}
@Override
public void process(SingleCarTrackListModel row, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
try {
requestIndexer.add(createIndexRequest(row));
} catch (Exception e){
e.printStackTrace();
}
}
/**
* 创建mapping
* @param client
* @throws IOException
*/
public XContentBuilder getMappingInfo() {
//创建mapping约束字段
try {
XContentBuilder mapping = XContentFactory.jsonBuilder()
.startObject()
//.startObject(index)
//.startObject("mapping")
.startObject("properties")
.startObject("orgCode").field("type", "text").field("fielddata", true).endObject()
.startObject("endTime").field("type", "text").field("fielddata", true).endObject()
.endObject()
//.endObject()
//.endObject()
.endObject();
return mapping;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
private String getIndexNameByDate(SingleCarTrackListModel row){
String globalDate = DateUtil.toDateTime(row.getGlobalEndTime().getTime(),"yyyyMMdd");
//按天对索引进行分区
String index = this.index+"_"+globalDate;
return index;
}
}
package com.wanji.indicators.task.track.service.sink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.wanji.indicators.model.RoutePathCarTrackModel;
import com.wanji.indicators.model.SingleCarTrackListModel;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.ElasticSearchSinkUtil;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
/**
* @author fengyi
* @date 2023/3/2
* @description
*/
public class PartitionRoutePathElasticSearchSinkFunction implements ElasticsearchSinkFunction<RoutePathCarTrackModel>, Serializable {
private static Logger log = LoggerFactory.getLogger(PartitionRoutePathElasticSearchSinkFunction.class);
private String index;
public PartitionRoutePathElasticSearchSinkFunction(String index) {
this.index = index;
}
public IndexRequest createIndexRequest(RoutePathCarTrackModel element) {
String id = element.getTargetId().toString()+"_"+element.getPicLicense()+element.getLineNo();
String jsonStr = JSON.toJSONString(element, SerializerFeature.WRITE_MAP_NULL_FEATURES);
JSONObject jsonObject = JSONObject.parseObject(jsonStr, Feature.OrderedField);
jsonObject.put("recordTime", DateUtil.getNowTime("yyyy-MM-dd HH:mm:ss.SSS"));
String globalDate = DateUtil.toDateTime(DateUtil.StringToMillis(element.getEndTime(),"yyyy-MM-dd HH:mm:ss.SSS"),"yyyyMMdd");
//按天对索引进行分区
String index = this.index+"_"+globalDate;
ElasticSearchSinkUtil.createIndex(index,true,getMappingInfo());
//log.info(jsonObject.toJSONString());
return Requests.indexRequest()
.index(index)
.source(jsonObject.toJSONString(), XContentType.JSON)
;
}
@Override
public void process(RoutePathCarTrackModel row, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
try {
requestIndexer.add(createIndexRequest(row));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 创建mapping
* @param client
* @throws IOException
*/
public XContentBuilder getMappingInfo() {
//创建mapping约束字段
try {
XContentBuilder mapping = XContentFactory.jsonBuilder()
.startObject()
//.startObject(indexName)
.startObject("properties")
//.startObject("orgCode").field("type", "text").field("fielddata", "true").endObject()
.startObject("endTime").field("type", "text").field("fielddata", "true").endObject()
//.endObject()
.endObject()
.endObject();
return mapping;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
package com.wanji.indicators.task.track.service.sink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.wanji.indicators.model.RoutePathCarTrackModel;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
/**
* @author fengyi
* @date 2023/3/2
* @description
*/
public class RoutePathElasticSearchSinkFunction implements ElasticsearchSinkFunction<RoutePathCarTrackModel>, Serializable {
private static Logger log = LoggerFactory.getLogger(RoutePathElasticSearchSinkFunction.class);
private String index;
public RoutePathElasticSearchSinkFunction(String index) {
this.index = index;
}
public IndexRequest createIndexRequest(RoutePathCarTrackModel element) {
String id = element.getTargetId().toString()+"_"+element.getPicLicense()+element.getLineNo();
String jsonStr = JSON.toJSONString(element, SerializerFeature.WRITE_MAP_NULL_FEATURES);
JSONObject jsonObject = JSONObject.parseObject(jsonStr, Feature.OrderedField);
jsonObject.put("recordTime", DateUtil.getNowTime("yyyy-MM-dd HH:mm:ss.SSS"));
//log.info(jsonObject.toJSONString());
return Requests.indexRequest()
.index(index)
.source(jsonObject.toJSONString(), XContentType.JSON)
;
}
@Override
public void process(RoutePathCarTrackModel row, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
try {
requestIndexer.add(createIndexRequest(row));
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.wanji.indicators.task.track.stream;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.task.track.service.CarRoutePathMainNew;
import com.wanji.indicators.task.track.service.TrackStoreMainNew;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import java.util.Calendar;
import java.util.Properties;
public class CarRoutePathMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
public static void main(String[] args) {
//批量轨迹es index,逗号分割:esIndexName,windowTime
String indexName = args[0];
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
DateTime currentDateTime = new DateTime();
DateTime dateTime = currentDateTime.withMillisOfDay(0).plusDays(-1);
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms","1000")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(properties.getProperty("consumer.topic"))
.setGroupId(properties.getProperty("consumer.group.id")+"-Car-Route-Path")
/* 设置起始偏移量有以下几种情况:
1.从指定的位置消费:OffsetsInitializer.offsets(Map< TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))//设置起始偏移量,也就是从哪里消费
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(),"kafka-car-track-source");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("轨迹帧数据-JsonToObject");
SingleOutputStreamOperator<CarTrackModel> carTrackModelStream =
frameModelStream.
flatMap(new CarTrackFlatMap())
.setParallelism(1)
.name("轨迹帧数据解析-ToCarTrackModel");
try {
//轨迹路径
CarRoutePathMainNew.init(env,indexName).run(carTrackModelStream);
env.execute("路径计算工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.task.track.stream;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.task.track.service.CarRoutePathMainNew;
import com.wanji.indicators.task.track.service.TrackStoreMainNew;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import java.util.*;
public class TrackAndRouteMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
public static void main(String[] args) {
//批量轨迹es index,逗号分割:esIndexName,windowTime
String indexName = args[0];
String indexName1 = args[1];
// ParameterTool parameter = ParameterTool.fromArgs(args);
// String indexName = parameter.get("Track_ES_IndexName");
// String indexName1 = parameter.get("Route_ES_IndexName");
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
String topic = properties.getProperty("consumer.topic");
if (args.length>2){
topic = args[2];
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60*1000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri")));
//env.setStateBackend(new FsStateBackend(properties.getProperty("check.point.uri")));
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// env.setStateBackend(
// new MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE * 100));
DateTime currentDateTime = new DateTime();
DateTime dateTime = currentDateTime.withMillisOfDay(0).minusHours(-4);
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic)
.setGroupId(properties.getProperty("consumer.group.id"))
/* 设置起始偏移量有以下几种情况
1.从指定的位置消费:OffsetsInitializer.offsets(Map<TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-car-track-source");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("轨迹帧数据-JsonToObject");
SingleOutputStreamOperator<CarTrackModel> carTrackModelStream =
frameModelStream.
flatMap(new CarTrackFlatMap())
.setParallelism(1)
.name("轨迹帧数据解析-ToCarTrackModel");
//筛选机动车类型数据
SingleOutputStreamOperator<CarTrackModel> filterStream =
carTrackModelStream.filter(new FilterFunction<CarTrackModel>() {
@Override
public boolean filter(CarTrackModel value) throws Exception {
String motorObjectType = Constant.MOTOR_TYPES;
String[] sps = motorObjectType.split(",");
List<String> typeList = Arrays.asList(sps);
String carType = value.getOriginalType().toString();
if (typeList.contains(carType)){
return true;
}
return false;
}
}).name("筛选机动车数据");
try {
//批量轨迹封装
TrackStoreMainNew.init(env, indexName).run(carTrackModelStream);
CarRoutePathMainNew.init(env, indexName1).run(carTrackModelStream);
env.execute("轨迹处理工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.task.track.stream;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.task.track.service.AreaRealTimeIndicatorMainNew;
import com.wanji.indicators.task.track.service.CarRoutePathMainNew;
import com.wanji.indicators.task.track.service.TrackStoreMainNew;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import java.util.Properties;
/**
* 对单辆车轨迹,每30秒封装一个批次
*/
public class TrackUnionMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
public static void main(String[] args) {
//批量轨迹es index,逗号分割:esIndexName,windowTime
String indexName = "";//args[0];
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DateTime currentDateTime = new DateTime();
//过去一天0点
DateTime dateTime = currentDateTime.withMillisOfDay(0).plusDays(-1);
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms","1000")
.setProperty("commit.offsets.on.checkpoint","true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(properties.getProperty("consumer.topic"))
.setGroupId(properties.getProperty("consumer.group.id")+"_BATCH_TRACK")
/* 设置起始偏移量有以下几种情况:
1.从指定的位置消费:OffsetsInitializer.offsets(Map< TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))//设置起始偏移量,也就是从哪里消费
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(),"kafka-car-track-source");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("轨迹帧数据-JsonToObject");
SingleOutputStreamOperator<CarTrackModel> carTrackModelStream =
frameModelStream.
flatMap(new CarTrackFlatMap())
.setParallelism(1)
.name("轨迹帧数据解析-ToCarTrackModel");
try {
//批量轨迹封装
TrackStoreMainNew.init(env,indexName).run(carTrackModelStream);
//AreaRealTimeIndicatorMainNew.init(env,indexName).run(carTrackModelStream);
env.execute("轨迹处理工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.task.travelTime.service;
import com.alibaba.fastjson.JSONObject;
import com.wanji.indicators.config.RedisConfig;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.task.queuelength.service.func.QueueAggregateFunction;
import com.wanji.indicators.task.queuelength.service.func.QueueProcessWindow;
import com.wanji.indicators.task.queuelength.service.model.MaxQueueResultModel;
import com.wanji.indicators.task.queuelength.service.sink.QueueRedisMapper;
import com.wanji.indicators.task.track.service.key.KeySelectorGlobalIdPlateNo;
import com.wanji.indicators.task.track.service.sink.BatchTrackElasticSearchSinkFunction;
import com.wanji.indicators.task.travelTime.service.func.TravelAggregateFunction;
import com.wanji.indicators.task.travelTime.service.func.TravelProcessWindow;
import com.wanji.indicators.task.travelTime.service.model.TravelTimeResultModel;
import com.wanji.indicators.task.travelTime.service.sink.TravelRedisMapper;
import com.wanji.indicators.util.ElasticSearchSinkUtil;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
public class MaxTravelTimeMainNew implements Serializable {
private final StreamExecutionEnvironment env;
private final PropertiesHelper instance;
private Properties p = new Properties();
private String path;
private String indexName = "area.realtime.indicator";
private int windowTime = 2;
private MaxTravelTimeMainNew(StreamExecutionEnvironment env, String paramInfo) {
this.env = env;
this.instance = PropertiesHelper.getInstance();
this.p = this.instance.getProperties();
}
public static MaxTravelTimeMainNew init(StreamExecutionEnvironment env, String paramInfo) {
return new MaxTravelTimeMainNew(env,paramInfo);
}
public void run(SingleOutputStreamOperator<CarTrackModel> filterStream) {
SingleOutputStreamOperator<TravelTimeResultModel> groupByPlateStream =
filterStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarTrackModel>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getGlobalTimeStamp()))
.keyBy(new KeySelectorGlobalIdPlateNo())
.window(EventTimeSessionWindows.withGap(Time.minutes(2)))
.aggregate(new TravelAggregateFunction(),new TravelProcessWindow())
.name("旅行时间、速度计算");
try {
SingleOutputStreamOperator<TravelTimeResultModel> travelTimeStream = groupByPlateStream.keyBy(TravelTimeResultModel::getId).countWindow(2).maxBy("travelTime");
travelTimeStream.print();
travelTimeStream.addSink( new RedisSink<TravelTimeResultModel>(new RedisConfig().getRedisConfig(), new TravelRedisMapper(Constant.AREA_REALTIME_INDICATOR)));
// SingleOutputStreamOperator<TravelTimeResultModel> travelSpeedStream = groupByPlateStream.keyBy(TravelTimeResultModel::getId).countWindow(2).maxBy("travelSpeed");
// travelTimeStream.addSink( new RedisSink<TravelTimeResultModel>(new RedisConfig().getRedisConfig(), new TravelRedisMapper(Constant.AREA_REALTIME_INDICATOR)));
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.wanji.indicators.task.travelTime.service.func;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.task.travelTime.service.model.TravelTimeResultModel;
import com.wanji.indicators.util.ArithOfBigDecmial;
import com.wanji.indicators.util.DateUtil;
import com.wanji.indicators.util.GeomsConvertUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
public class TravelAggregateFunction implements AggregateFunction<CarTrackModel, TravelTimeResultModel, TravelTimeResultModel> {
private static final Logger log = LoggerFactory.getLogger(TravelAggregateFunction.class);
@Override
public TravelTimeResultModel createAccumulator() {
TravelTimeResultModel initModel = new TravelTimeResultModel();
initModel.setTravelSpeed(0D);
initModel.setTravelTime(0D);
initModel.setDistance(0D);
return initModel;
}
/**
* 每进入一个数据就会执行一次
*
* @param value 当前进入的数据
* @param accumulator 之前计算好的中间结果
* @return
*/
@Override
public TravelTimeResultModel add(CarTrackModel value, TravelTimeResultModel accumulator) {
//记录初始进入时间
if (accumulator.getTimestamp() == null) {
accumulator.setTimestamp(value.getGlobalTimeStamp());
accumulator.setStartTimestamp(value.getGlobalTimeStamp());
accumulator.setLng(value.getLongitude());
accumulator.setLat(value.getLatitude());
accumulator.setGlobalId(value.getOrgCode()+"_"+value.getId()+"_"+value.getPicLicense());
} else {
//从出现到当前帧的经过的时间
long travelTime = value.getGlobalTimeStamp() - accumulator.getTimestamp();
if (travelTime > 0) {
//行驶距离
double distance = GeomsConvertUtil.getDistance(accumulator.getLng(), accumulator.getLat(), value.getLongitude(), value.getLatitude());
accumulator.setTravelTime((accumulator.getTravelTime() + travelTime)/1000);
accumulator.setDistance(accumulator.getDistance() + distance);
//速度
double speed = ArithOfBigDecmial.div(distance, travelTime);
accumulator.setTravelSpeed(speed);
accumulator.setTimestamp(value.getGlobalTimeStamp());
}
log.info("[carId:{},midId:{},travelTime:{},nowTime:{},preTime:{}]",value.getOrgCode()+"_"+value.getId()+"_"+value.getPicLicense(),accumulator.getGlobalId(),accumulator.getTravelTime(), DateUtil.toDateTime(value.getGlobalTimeStamp(),"yyyy-MM-dd HH:mm:ss.SSS"),DateUtil.toDateTime(accumulator.getTimestamp(),"yyyy-MM-dd HH:mm:ss.SSS"));
}
//System.out.println("avgSpeed:"+accumulator.getAvgSpeed()+":"+accumulator.getSumSpeed()/accumulator.getCount()+" maxSpeed:"+accumulator.getMaxSpeed());
return accumulator;
}
/*
当window的结束时间到达时,触发这个方法,返回结果
*/
@Override
public TravelTimeResultModel getResult(TravelTimeResultModel accumulator) {
//log.info("AggResult:"+accumulator);
return accumulator;
}
/**
* 在session窗口才会用到merge,时间窗口其实用不到
*
* @param a
* @param b
* @return
*/
@Override
public TravelTimeResultModel merge(TravelTimeResultModel a, TravelTimeResultModel b) {
return null;
}
}
package com.wanji.indicators.task.travelTime.service.func;
import com.wanji.indicators.task.travelTime.service.model.TravelTimeResultModel;
import com.wanji.indicators.util.DateUtil;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
public class TravelProcessWindow extends ProcessWindowFunction<
TravelTimeResultModel,
TravelTimeResultModel,
Tuple3<String, Integer, String>,
TimeWindow> {
private static final Logger log = LoggerFactory.getLogger(TravelProcessWindow.class);
@Override
public void process(Tuple3<String, Integer, String> s, Context context, Iterable<TravelTimeResultModel> elements, Collector<TravelTimeResultModel> out) throws Exception {
long windowStartTs = context.window().getStart();
long windowEndTs = context.window().getEnd();
List<TravelTimeResultModel> list = StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList());
if (elements.iterator().hasNext()) {
TravelTimeResultModel result = elements.iterator().next();
result.setStartTime(DateUtil.toDateTime(windowStartTs,"yyyy-MM-dd HH:mm:ss.SSS"));
result.setEndTime(DateUtil.toDateTime(windowEndTs,"yyyy-MM-dd HH:mm:ss.SSS"));
log.info("Travel Data Result:" + result.toString());
out.collect(result);
}
}
}
package com.wanji.indicators.task.travelTime.service.model;
import com.wanji.indicators.constant.Constant;
import lombok.Data;
import java.io.Serializable;
/**
* @author fengyi
* @date 2023/4/6
* @description
*/
@Data
public class TravelTimeResultModel implements Serializable {
private String id = Constant.HASH_KEY_TRAVEL_DATA;
// private String plateNo;
// private String globalCode;
private String globalId;
private String startTime;
private Long startTimestamp;
private String endTime;
private Double lng;//经度
private Double lat;//纬度
private Double travelTime;//旅行时间
private Double travelSpeed;//旅行速度
private Double distance;//行驶距离
private Long timestamp;
}
package com.wanji.indicators.task.travelTime.service.sink;
import com.alibaba.fastjson.JSON;
import com.wanji.indicators.task.freeflow.service.model.MaxSpeedResultModel;
import com.wanji.indicators.task.queuelength.service.model.MaxQueueResultModel;
import com.wanji.indicators.task.travelTime.service.model.TravelTimeResultModel;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* @author fengyi
* @date 2023/4/7
* @description
*/
public class TravelRedisMapper implements RedisMapper<TravelTimeResultModel> {
private String redisKey ;
public TravelRedisMapper(String redisKey){
this.redisKey = redisKey;
}
// 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, this.redisKey);
}
@Override
public String getKeyFromData(TravelTimeResultModel data) {
return data.getId();
}
@Override
public String getValueFromData(TravelTimeResultModel data) {
return JSON.toJSONString(data);
}
}
package com.wanji.indicators.task.travelTime.stream;
import com.wanji.indicators.constant.Constant;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.model.FrameModel;
import com.wanji.indicators.task.track.service.CarRoutePathMainNew;
import com.wanji.indicators.task.track.service.TrackStoreMainNew;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.task.track.service.func.FrameFlatMap;
import com.wanji.indicators.task.travelTime.service.MaxTravelTimeMainNew;
import com.wanji.indicators.util.PropertiesHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
public class CalculateTravelTimeMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
public static void main(String[] args) {
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60*1000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(properties.getProperty("check.point.uri")));
//env.setStateBackend(new FsStateBackend(properties.getProperty("check.point.uri")));
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// env.setStateBackend(
// new MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE * 100));
DateTime currentDateTime = new DateTime();
DateTime dateTime = currentDateTime.withMillisOfDay(0).minusHours(-4);
String topic = properties.getProperty("consumer.topic");
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("auto.offset.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("commit.offsets.on.checkpoint", "true")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic)
.setGroupId(properties.getProperty("consumer.group.id")+"_TRAVEL"+ UUID.randomUUID().toString())
/* 设置起始偏移量有以下几种情况
1.从指定的位置消费:OffsetsInitializer.offsets(Map<TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式:OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// 从大于等于此时间戳开始的偏移量开始
//.setStartingOffsets(OffsetsInitializer.timestamp(dateTime.getMillis()))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-car-track-source");
SingleOutputStreamOperator<FrameModel> frameModelStream =
stream
.flatMap(new FrameFlatMap())
.setParallelism(1)
.name("轨迹帧数据-JsonToObject");
SingleOutputStreamOperator<CarTrackModel> carTrackModelStream =
frameModelStream.
flatMap(new CarTrackFlatMap())
.setParallelism(1)
.name("轨迹帧数据解析-ToCarTrackModel");
//筛选机动车类型数据
SingleOutputStreamOperator<CarTrackModel> filterStream =
carTrackModelStream.filter(new FilterFunction<CarTrackModel>() {
@Override
public boolean filter(CarTrackModel value) throws Exception {
String motorObjectType = Constant.MOTOR_TYPES;
String[] sps = motorObjectType.split(",");
List<String> typeList = Arrays.asList(sps);
String carType = value.getOriginalType().toString();
if (typeList.contains(carType)){
return true;
}
return false;
}
}).name("筛选机动车数据");
try {
//批量轨迹封装
MaxTravelTimeMainNew.init(env, "indexName").run(filterStream);
env.execute("旅行时间速度计算工作流");
} catch (Exception e) {
e.printStackTrace();
//log.error("交通指标计算任务异常 : " + e);
}
}
}
package com.wanji.indicators.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPool {
private static ExecutorService newFixedThreadPool = null;
public static ExecutorService getInstance() {
if (newFixedThreadPool == null)
synchronized (FixedThreadPool.class) {
if (newFixedThreadPool == null)
newFixedThreadPool = Executors.newFixedThreadPool(5);
}
return newFixedThreadPool;
}
}
package com.wanji.indicators.trigger;
/**
* @author fengyi
* @date 2023/3/3
* @description
*/
public class EventTimeTrigger {
}
This diff is collapsed.
This diff is collapsed.
jdbc.driver=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://192.168.2.78:3306/t_roadnetwork_changsha?userUnicode=true&characterEncoding=utf-8
jdbc.username=root
jdbc.password=Wanji300552
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment