Commit 3a3921e0 authored by duanruiming's avatar duanruiming

[add] 消费解析神思绿波带数据

parent b8988c37
package net.wanji.opt.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Map;
/**
* @author duanruiming
* @date 2024/11/19 10:20
* @description 神思绿波时序图推送数据消息
*/
@Data
@SuppressWarnings("all")
@JsonIgnoreProperties(ignoreUnknown = true)
public class GreenBeltKafkaDTO {
@JsonProperty("greenbelt_id")
private String greenbeltId; // 绿波带id
@JsonProperty("greenbelt_length")
private double greenbeltLength; // 绿波干线长度(m)
@JsonProperty("max_speed_forward")
private double maxSpeedForward; // 正向绿波推荐最大速度(km/h)
@JsonProperty("max_speed_backward")
private double maxSpeedBackward; // 反向绿波推荐最大速度(km/h)
@JsonProperty("min_speed_forward")
private double minSpeedForward; // 正向绿波推荐最小速度(km/h)
@JsonProperty("min_speed_backward")
private double minSpeedBackward; // 反向绿波推荐最小速度(km/h)
@JsonProperty("forward_direction")
private String forwardDirection; // 正向绿波方向(w2e/e2w/n2s/s2n)
@JsonProperty("backward_direction")
private String backwardDirection; // 反向绿波方向(w2e/e2w/n2s/s2n)
@JsonProperty("greenbelt_direction")
private int greenbeltDirection; // 此次策略下发绿波带方向(0=正向,1=反向,2=双向)
@JsonProperty("green_width_forward")
private double greenWidthForward; // 正向绿波带宽(s)
@JsonProperty("green_width_backward")
private double greenWidthBackward; // 反向绿波带宽(s)
@JsonProperty("phase_start_forward")
private Map<String, Double> phaseStartForward; // 每个路口正向绿波相位开始时间(s)
@JsonProperty("phase_start_backward")
private Map<String, Double> phaseStartBackward; // 每个路口反向绿波相位开始时间(s)
@JsonProperty("phase_end_forward")
private Map<String, Double> phaseEndForward; // 每个路口正向绿波相位结束时间(s)
@JsonProperty("phase_end_backward")
private Map<String, Double> phaseEndBackward; // 每个路口反向绿波相位结束时间(s)
@JsonProperty("belt_start_forward")
private Map<String, Double> beltStartForward; // 每个路口正向绿波带宽开始时间(s)
@JsonProperty("belt_start_backward")
private Map<String, Double> beltStartBackward; // 每个路口反向绿波带宽开始时间(s)
@JsonProperty("speed_forward")
private Map<String, Double> speedForward; // 每个路口正向绿波速度(km/h)
@JsonProperty("speed_backward")
private Map<String, Double> speedBackward; // 每个路口反向绿波速度(km/h)
@JsonProperty("offset")
private Map<String, Double> offset; // 每个路口相位差(s)
@JsonProperty("max_cycle")
private int maxCycle; // 最大周期(s)
@JsonProperty("control_time")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss", timezone = "GMT+8")
private String controlTime; // 策略下发时间
@JsonProperty("control_duration")
private int controlDuration; // 策略执行时长(s)
@JsonProperty("greenbelt_type")
private int greenbeltType; // 执行策略(1=单条, 2=垂直,3=平行)
private int dynamic; // 是否动态绿波(1=是,0=否)
@JsonProperty("control_method")
private int controlMethod; // -1=stop by error, 0=no control, 1=control
}
package net.wanji.opt.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import net.wanji.common.utils.tool.StringUtils;
import net.wanji.opt.dto.CrossSchedulesDTO;
import net.wanji.opt.dto.GreenBeltKafkaDTO;
import net.wanji.opt.vo.GreenBeltInfoVO;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author duanruiming 第一方案
* @author duanruiming
* @date 2024/11/18 23:40
* @description 消费神思推送kafka数据,不同kafka
*/
@Component
public class KafkaListeners {
@Slf4j
public class KafkaListeners implements KafkaListenerErrorHandler {
@KafkaListener(topics = {"green_belt_info_data"}, groupId = "group2", containerFactory = "kafkaListenerContainerFactory2")
public void listen2(String message) {
// 处理消息
System.err.println("22222222222222" + message);
public void receiveGreenBeltInfoData(String message) throws Exception {
if (StringUtils.isNotBlank(message)) {
message = greenBeltInfoData;
ObjectMapper mapper = JacksonUtils.getInstance();
GreenBeltKafkaDTO greenBeltKafkaDTO = mapper.readValue(greenBeltInfoData, GreenBeltKafkaDTO.class);
}
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
return new Object();
}
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
String errorMessage = String.format("监听主题:%s,消费者详情:%s,异常信息:%s,消息详情:%s",
consumer.listTopics(), consumer.groupMetadata(), exception, message);
log.error(errorMessage);
return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
}
public static void main(String[] args) throws Exception {
String message = greenBeltInfoData;
ObjectMapper mapper = JacksonUtils.getInstance();
GreenBeltKafkaDTO kafkaDTO = mapper.readValue(message, GreenBeltKafkaDTO.class);
// 正反绿波容器
List<GreenBeltInfoVO.DirGreenDetail> dirGreenDetails = new ArrayList<>();
// 构建正向绿波
setForward(kafkaDTO, dirGreenDetails);
// 反向绿波
setBackward(kafkaDTO, dirGreenDetails);
// 返回绿波实体
GreenBeltInfoVO greenBeltInfoVO = new GreenBeltInfoVO();
greenBeltInfoVO.setGreenId(kafkaDTO.getGreenbeltId());
greenBeltInfoVO.setLength(kafkaDTO.getGreenbeltLength());
greenBeltInfoVO.setCycle(kafkaDTO.getMaxCycle());
greenBeltInfoVO.setControlTime(kafkaDTO.getControlTime());
greenBeltInfoVO.setControlDuration(kafkaDTO.getControlDuration());
greenBeltInfoVO.setType(kafkaDTO.getGreenbeltType());
greenBeltInfoVO.setDynamic(kafkaDTO.getDynamic());
greenBeltInfoVO.setControlMethod(kafkaDTO.getControlMethod());
greenBeltInfoVO.setDirGreenDetails(dirGreenDetails);
System.err.println(mapper.writeValueAsString(greenBeltInfoVO));
}
private static void setBackward(GreenBeltKafkaDTO kafkaDTO, List<GreenBeltInfoVO.DirGreenDetail> dirGreenDetails) {
// 反向绿波路口列表
List<GreenBeltInfoVO.CrossGreenDetail> crossGreenDetails = new ArrayList<>();
Map<String, Double> phaseStartBackward = kafkaDTO.getPhaseStartBackward();
for (Map.Entry<String, Double> backEntry : phaseStartBackward.entrySet()) {
String crossId = backEntry.getKey();
Double phaseStartTime = backEntry.getValue();
GreenBeltInfoVO.CrossGreenDetail crossGreenDetail = new GreenBeltInfoVO.CrossGreenDetail();
crossGreenDetail.setCrossId(crossId);
crossGreenDetail.setPhaseStartTime(phaseStartTime);
Double phaseEndTime = kafkaDTO.getPhaseEndBackward().get(crossId);
crossGreenDetail.setPhaseEndTime(phaseEndTime);
Double greenStartTime = kafkaDTO.getBeltStartBackward().get(crossId);
crossGreenDetail.setGreenStartTime(greenStartTime);
Double speed = kafkaDTO.getSpeedBackward().get(crossId);
crossGreenDetail.setSpeed(speed);
Double offset = kafkaDTO.getOffset().get(crossId);
crossGreenDetail.setOffset(offset);
crossGreenDetails.add(crossGreenDetail);
}
// 反向绿波
// 反向绿波实体
GreenBeltInfoVO.DirGreenDetail dirGreenDetail = new GreenBeltInfoVO.DirGreenDetail();
dirGreenDetail.setDirType(0);
String forwardDirection = kafkaDTO.getBackwardDirection();
dirGreenDetail.setDir(forwardDirection);
double maxSpeedForward = kafkaDTO.getMaxSpeedBackward();
dirGreenDetail.setMaxSpeed(maxSpeedForward);
double minSpeedForward = kafkaDTO.getMinSpeedBackward();
dirGreenDetail.setMinSpeed(minSpeedForward);
double greenWidthForward = kafkaDTO.getGreenWidthBackward();
dirGreenDetail.setGreenWithTime(greenWidthForward);
dirGreenDetail.setCrossGreenDetailList(crossGreenDetails);
dirGreenDetails.add(dirGreenDetail);
}
private static void setForward(GreenBeltKafkaDTO kafkaDTO, List<GreenBeltInfoVO.DirGreenDetail> dirGreenDetails) {
// 正向绿波路口列表
List<GreenBeltInfoVO.CrossGreenDetail> crossGreenDetails = new ArrayList<>();
Map<String, Double> phaseStartForward = kafkaDTO.getPhaseStartForward();
for (Map.Entry<String, Double> forwardEntry : phaseStartForward.entrySet()) {
String crossId = forwardEntry.getKey();
Double phaseStartTime = forwardEntry.getValue();
GreenBeltInfoVO.CrossGreenDetail crossGreenDetail = new GreenBeltInfoVO.CrossGreenDetail();
crossGreenDetail.setCrossId(crossId);
crossGreenDetail.setPhaseStartTime(phaseStartTime);
Double phaseEndTime = kafkaDTO.getPhaseEndForward().get(crossId);
crossGreenDetail.setPhaseEndTime(phaseEndTime);
Double greenStartTime = kafkaDTO.getBeltStartForward().get(crossId);
crossGreenDetail.setGreenStartTime(greenStartTime);
Double speed = kafkaDTO.getSpeedForward().get(crossId);
crossGreenDetail.setSpeed(speed);
Double offset = kafkaDTO.getOffset().get(crossId);
crossGreenDetail.setOffset(offset);
crossGreenDetails.add(crossGreenDetail);
}
// 正向绿波
// 正向绿波实体
GreenBeltInfoVO.DirGreenDetail dirGreenDetail = new GreenBeltInfoVO.DirGreenDetail();
dirGreenDetail.setDirType(1);
String forwardDirection = kafkaDTO.getForwardDirection();
dirGreenDetail.setDir(forwardDirection);
double maxSpeedForward = kafkaDTO.getMaxSpeedForward();
dirGreenDetail.setMaxSpeed(maxSpeedForward);
double minSpeedForward = kafkaDTO.getMinSpeedForward();
dirGreenDetail.setMinSpeed(minSpeedForward);
double greenWidthForward = kafkaDTO.getGreenWidthForward();
dirGreenDetail.setGreenWithTime(greenWidthForward);
dirGreenDetail.setCrossGreenDetailList(crossGreenDetails);
dirGreenDetails.add(dirGreenDetail);
}
public static String greenBeltInfoData = "{\n" +
"\t\"backward_direction\": \"e2w\",\n" +
"\t\"belt_start_backward\": {\n" +
"\t\t\"13MOD0B5SI0\": 0.0,\n" +
"\t\t\"13MQJ0B5SI0\": 87.0,\n" +
"\t\t\"13MS20B5SI0\": 7.0,\n" +
"\t\t\"13MUK0B5SH0\": 79.0,\n" +
"\t\t\"13N0F0B5SH0\": 3.0,\n" +
"\t\t\"13N200B5SH0\": 86.0\n" +
"\t},\n" +
"\t\"belt_start_forward\": {\n" +
"\t\t\"13MOD0B5SI0\": 0.0,\n" +
"\t\t\"13MQJ0B5SI0\": 87.0,\n" +
"\t\t\"13MS20B5SI0\": 7.0,\n" +
"\t\t\"13MUK0B5SH0\": 79.0,\n" +
"\t\t\"13N0F0B5SH0\": 3.0,\n" +
"\t\t\"13N200B5SH0\": 86.0\n" +
"\t},\n" +
"\t\"control_duration\": 90,\n" +
"\t\"control_method\": 1,\n" +
"\t\"control_time\": \"2024-11-18T18:39:55\",\n" +
"\t\"dynamic\": 1,\n" +
"\t\"forward_direction\": \"w2e\",\n" +
"\t\"green_width_backward\": 35.9999999999999,\n" +
"\t\"green_width_forward\": 80.0,\n" +
"\t\"greenbelt_direction\": 0,\n" +
"\t\"greenbelt_id\": \"road1\",\n" +
"\t\"greenbelt_length\": 2757.0,\n" +
"\t\"greenbelt_type\": 2,\n" +
"\t\"max_cycle\": 220,\n" +
"\t\"max_speed_backward\": 45.0,\n" +
"\t\"max_speed_forward\": 35.0,\n" +
"\t\"min_speed_backward\": 35.0,\n" +
"\t\"min_speed_forward\": 25.0,\n" +
"\t\"offset\": {\n" +
"\t\t\"13MOD0B5SI0\": 198.0,\n" +
"\t\t\"13MQJ0B5SI0\": 46.0,\n" +
"\t\t\"13MS20B5SI0\": 89.0,\n" +
"\t\t\"13MUK0B5SH0\": 216.0,\n" +
"\t\t\"13N0F0B5SH0\": 201.0,\n" +
"\t\t\"13N200B5SH0\": 89.0\n" +
"\t},\n" +
"\t\"phase_end_backward\": {\n" +
"\t\t\"13MOD0B5SI0\": 132.0,\n" +
"\t\t\"13MQJ0B5SI0\": 165.0,\n" +
"\t\t\"13MS20B5SI0\": 148.0,\n" +
"\t\t\"13MUK0B5SH0\": 115.0,\n" +
"\t\t\"13N0F0B5SH0\": 165.0,\n" +
"\t\t\"13N200B5SH0\": 155.0\n" +
"\t},\n" +
"\t\"phase_end_forward\": {\n" +
"\t\t\"13MOD0B5SI0\": 132.0,\n" +
"\t\t\"13MQJ0B5SI0\": 165.0,\n" +
"\t\t\"13MS20B5SI0\": 148.0,\n" +
"\t\t\"13MUK0B5SH0\": 115.0,\n" +
"\t\t\"13N0F0B5SH0\": 165.0,\n" +
"\t\t\"13N200B5SH0\": 155.0\n" +
"\t},\n" +
"\t\"phase_start_backward\": {\n" +
"\t\t\"13MOD0B5SI0\": 39.0,\n" +
"\t\t\"13MQJ0B5SI0\": 42.0,\n" +
"\t\t\"13MS20B5SI0\": 43.0,\n" +
"\t\t\"13MUK0B5SH0\": 0.0,\n" +
"\t\t\"13N0F0B5SH0\": 42.0,\n" +
"\t\t\"13N200B5SH0\": 40.0\n" +
"\t},\n" +
"\t\"phase_start_forward\": {\n" +
"\t\t\"13MOD0B5SI0\": 39.0,\n" +
"\t\t\"13MQJ0B5SI0\": 42.0,\n" +
"\t\t\"13MS20B5SI0\": 43.0,\n" +
"\t\t\"13MUK0B5SH0\": 0.0,\n" +
"\t\t\"13N0F0B5SH0\": 42.0,\n" +
"\t\t\"13N200B5SH0\": 40.0\n" +
"\t},\n" +
"\t\"speed_backward\": {\n" +
"\t\t\"13MOD0B5SI0\": -1.0,\n" +
"\t\t\"13MQJ0B5SI0\": 36.608,\n" +
"\t\t\"13MS20B5SI0\": 41.257999999999996,\n" +
"\t\t\"13MUK0B5SH0\": 40.976,\n" +
"\t\t\"13N0F0B5SH0\": 38.536,\n" +
"\t\t\"13N200B5SH0\": 42.28\n" +
"\t},\n" +
"\t\"speed_forward\": {\n" +
"\t\t\"13MOD0B5SI0\": 30.315999999999995,\n" +
"\t\t\"13MQJ0B5SI0\": 23.904,\n" +
"\t\t\"13MS20B5SI0\": 41.739999999999995,\n" +
"\t\t\"13MUK0B5SH0\": 27.832000000000004,\n" +
"\t\t\"13N0F0B5SH0\": 25.536,\n" +
"\t\t\"13N200B5SH0\": -1.0\n" +
"\t}\n" +
"}";
}
package net.wanji.opt.vo;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.List;
/**
* @author duanruiming
* @date 2024/11/19 10:48
* @description 绿波带时序图
*/
@Data
@ApiModel(value = "GreenBeltInfoVO", description = "绿波时序图返回实体")
@JsonIgnoreProperties(ignoreUnknown = true)
public class GreenBeltInfoVO {
@ApiModelProperty("绿波id")
private String greenId;
@ApiModelProperty("绿波带长度")
private Double length;
@ApiModelProperty("绿波带周期")
private Integer cycle;
@ApiModelProperty("绿波带下发时间")
private String controlTime;
@ApiModelProperty("策略执行时长")
private Integer controlDuration;
@ApiModelProperty("执行策略(1=单条, 2=垂直,3=平行)")
private Integer type;
@ApiModelProperty("是否动态绿波(1=是,0=否)")
private Integer dynamic;
@ApiModelProperty("-1=stop by error, 0=no control, 1=control")
private Integer controlMethod;
private List<DirGreenDetail> dirGreenDetails;
@Data
public static class DirGreenDetail {
@ApiModelProperty("0-反向 1-正向 2-双向")
private Integer dirType;
@ApiModelProperty("绿波方向")
private String dir;
@ApiModelProperty("最大速度")
private Double maxSpeed;
@ApiModelProperty("最小速度")
private Double minSpeed;
@ApiModelProperty("绿波带宽")
private Double greenWithTime;
@ApiModelProperty("绿波路口详情")
private List<CrossGreenDetail> crossGreenDetailList;
}
@Data
public static class CrossGreenDetail {
@ApiModelProperty("路口")
private String crossId;
@ApiModelProperty("相位开始时间")
private Double phaseStartTime;
@ApiModelProperty("相位结束时间")
private Double phaseEndTime;
@ApiModelProperty("绿波带开始时间")
private Double greenStartTime;
@ApiModelProperty("绿波速度")
private Double speed;
@ApiModelProperty("相位差")
private Double offset;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment