Commit 39975e3a authored by duanruiming's avatar duanruiming

[update] 失衡优化记录优化时间处理逻辑

parent d34dcb95
Pipeline #52 failed with stages
...@@ -70,7 +70,8 @@ public class CrossStrategyOptConsumerHandler implements KafkaListenerErrorHandle ...@@ -70,7 +70,8 @@ public class CrossStrategyOptConsumerHandler implements KafkaListenerErrorHandle
crossStrategyOptService.save(imbalanceAlgorithmDTO); crossStrategyOptService.save(imbalanceAlgorithmDTO);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("当前失衡策略数据处理异常:{}", message); log.error("当前失衡策略数据处理异常:{}", e);
log.error("当前失衡策略数据内容:{}", message);
throw new Exception(e); throw new Exception(e);
} }
acknowledgment.acknowledge(); acknowledgment.acknowledge();
......
...@@ -59,6 +59,9 @@ public class ImbalanceAlgorithmDTO { ...@@ -59,6 +59,9 @@ public class ImbalanceAlgorithmDTO {
// 方案具体信息 // 方案具体信息
private SchemeData data; private SchemeData data;
private Integer isSuccessed; private Integer isSuccessed;
// 开始 = 1 结束 = -1
private Integer control_method;
private Integer control_duration;
@Data @Data
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
......
...@@ -61,4 +61,6 @@ public class CrossStrategyResultEntity { ...@@ -61,4 +61,6 @@ public class CrossStrategyResultEntity {
private String emptyTurn; private String emptyTurn;
@TableField("dt") @TableField("dt")
private Integer dt; private Integer dt;
@TableField("duration")
private Integer duration;
} }
package net.wanji.datacenter.service.impl; package net.wanji.datacenter.service.impl;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils; import net.wanji.common.utils.tool.JacksonUtils;
...@@ -11,11 +12,14 @@ import net.wanji.datacenter.pojo.dto.ImbalanceAlgorithmDTO; ...@@ -11,11 +12,14 @@ import net.wanji.datacenter.pojo.dto.ImbalanceAlgorithmDTO;
import net.wanji.datacenter.pojo.dto.PhaseEmptyDataDTO; import net.wanji.datacenter.pojo.dto.PhaseEmptyDataDTO;
import net.wanji.datacenter.pojo.entity.CrossStrategyResultEntity; import net.wanji.datacenter.pojo.entity.CrossStrategyResultEntity;
import net.wanji.datacenter.service.CrossStrategyOptService; import net.wanji.datacenter.service.CrossStrategyOptService;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.Objects; import java.util.Objects;
/** /**
...@@ -97,6 +101,28 @@ public class CrossStrategyOptServiceImpl implements CrossStrategyOptService { ...@@ -97,6 +101,28 @@ public class CrossStrategyOptServiceImpl implements CrossStrategyOptService {
@Override @Override
public void save(ImbalanceAlgorithmDTO dto) { public void save(ImbalanceAlgorithmDTO dto) {
try { try {
if (Objects.equals(-1, dto.getControl_method())) {
String crossId = dto.getCrossId();
SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
Date issueTime = dto.getIssueTime();
Date date = new Date();
String currentDate = format.format(date);
LambdaQueryWrapper<CrossStrategyResultEntity> query = new LambdaQueryWrapper<CrossStrategyResultEntity>();
query.eq(CrossStrategyResultEntity::getCrossId, crossId);
query.eq(CrossStrategyResultEntity::getDt, currentDate);
query.orderByDesc(CrossStrategyResultEntity::getIssueTime);
query.last("limit 1");
List<CrossStrategyResultEntity> results = crossStrategyResultMapper.selectList(query);
if (!CollectionUtils.isEmpty(results) && results.size() == 1) {
CrossStrategyResultEntity crossStrategyResultEntity = results.get(0);
long offset = date.getTime() - issueTime.getTime();
int duration = (int) (offset / 1000);
log.warn("当前失衡路口:{},当前路口发生事件:{}, 计算时间差:{}", crossId, issueTime, duration);
crossStrategyResultEntity.setDuration(duration + crossStrategyResultEntity.getDuration());
crossStrategyResultMapper.updateById(crossStrategyResultEntity);
}
return;
}
ObjectMapper mapper = JacksonUtils.getInstance(); ObjectMapper mapper = JacksonUtils.getInstance();
CrossStrategyResultEntity crossStrategyResultEntity = new CrossStrategyResultEntity(); CrossStrategyResultEntity crossStrategyResultEntity = new CrossStrategyResultEntity();
crossStrategyResultEntity.setCrossId(dto.getPlatformId()); crossStrategyResultEntity.setCrossId(dto.getPlatformId());
...@@ -118,6 +144,7 @@ public class CrossStrategyOptServiceImpl implements CrossStrategyOptService { ...@@ -118,6 +144,7 @@ public class CrossStrategyOptServiceImpl implements CrossStrategyOptService {
crossStrategyResultEntity.setControlDir(dto.getControlDir()); crossStrategyResultEntity.setControlDir(dto.getControlDir());
crossStrategyResultEntity.setData(mapper.writeValueAsString(dto.getData())); crossStrategyResultEntity.setData(mapper.writeValueAsString(dto.getData()));
crossStrategyResultEntity.setResponseContent(dto.getResponseContent()); crossStrategyResultEntity.setResponseContent(dto.getResponseContent());
crossStrategyResultEntity.setDuration(dto.getControl_duration());
if (Objects.nonNull(issueTime)) { if (Objects.nonNull(issueTime)) {
String parse = DateUtil.format(issueTime, "yyyyMMdd"); String parse = DateUtil.format(issueTime, "yyyyMMdd");
crossStrategyResultEntity.setDt(Integer.valueOf(parse)); crossStrategyResultEntity.setDt(Integer.valueOf(parse));
......
package net.wanji.datacenter.service.impl; package net.wanji.datacenter.service.impl;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.BeanListUtils; import net.wanji.common.utils.tool.BeanListUtils;
import net.wanji.databus.dao.mapper.*; import net.wanji.databus.dao.mapper.*;
...@@ -166,7 +165,7 @@ public class DataProcessServiceImpl implements DataProcessService { ...@@ -166,7 +165,7 @@ public class DataProcessServiceImpl implements DataProcessService {
if (Objects.nonNull(eventInfoPO)) { if (Objects.nonNull(eventInfoPO)) {
Double trafficIndex = crossDataRealtimePO.getTrafficIndex(); Double trafficIndex = crossDataRealtimePO.getTrafficIndex();
eventInfoPO.setTrafficIndex(trafficIndex); eventInfoPO.setTrafficIndex(trafficIndex);
while (retryCount <= 3) { while (retryCount <= 5) {
try { try {
eventInfoMapper.updateOne(eventInfoPO); eventInfoMapper.updateOne(eventInfoPO);
return; return;
...@@ -175,9 +174,9 @@ public class DataProcessServiceImpl implements DataProcessService { ...@@ -175,9 +174,9 @@ public class DataProcessServiceImpl implements DataProcessService {
log.warn("检测到死锁,进行第 {} 次重试", retryCount); log.warn("检测到死锁,进行第 {} 次重试", retryCount);
try { try {
// 等待一会,避免立刻冲突 // 等待一会,避免立刻冲突
Thread.sleep(100); Thread.sleep(300);
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
log.warn("检测到死锁, Thread.sleep 100ms,异常:{}", ignored); log.warn("检测到死锁, Thread.sleep 300ms,异常:{}", ignored);
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment