Commit 0bee2a04 authored by zhoushiguang's avatar zhoushiguang

local commit

parent 2c61242d
......@@ -72,7 +72,7 @@ public class WeekDirFreeFlowSpeedCache implements CommandLineRunner {
}
Map<String, List<CrossDirFreeFlowSpeedDTO>> map = new HashMap<>(1);
map.put(Constants.CROSS_FREE_FLOW_SPEED_KEY, crossDirFreeFlowSpeedDTOS);
producerHandler.send(Constants.CROSS_FREE_FLOW_SPEED_TOPIC, map);
producerHandler.send2FirstKafka(Constants.CROSS_FREE_FLOW_SPEED_TOPIC, map);
}
@Override
......
package net.wanji.datacenter.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author duanruiming
* @date 2024/04/24 9:06
*/
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.producer1.bootstrap-servers}")
private String bootstrapServers1;
@Value("${spring.kafka.producer.producer2.bootstrap-servers}")
private String bootstrapServers2;
@Value("${spring.kafka.producer.producer1.key-deserializer}")
private String keySerializer1;
@Value("${spring.kafka.producer.producer1.value-deserializer}")
private String valueSerializer1;
@Value("${spring.kafka.producer.producer2.key-deserializer}")
private String keySerializer2;
@Value("${spring.kafka.producer.producer2.value-deserializer}")
private String valueSerializer2;
@Bean
public Map<String, Object> producerConfigs1() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers1);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer1);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer1);
// 可以添加更多配置
return props;
}
@Bean
public Map<String, Object> producerConfigs2() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers2);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer2);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer2);
// 可以添加更多配置
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory1() {
return new DefaultKafkaProducerFactory<>(producerConfigs1());
}
@Bean
public ProducerFactory<String, String> producerFactory2() {
return new DefaultKafkaProducerFactory<>(producerConfigs2());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate1() {
return new KafkaTemplate<>(producerFactory1());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate2() {
return new KafkaTemplate<>(producerFactory2());
}
}
\ No newline at end of file
......@@ -4,13 +4,12 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import net.wanji.common.utils.tool.JacksonUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.Resource;
/**
* @author duanruiming
* @date 2023/08/02 16:58
......@@ -19,18 +18,35 @@ import javax.annotation.Resource;
@Slf4j
@SuppressWarnings("all")
public class ProducerHandler {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate1;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate2;
public void send2FirstKafka(String topic, Object msg) {
Page<Object> objectPage = new Page<>();
try {
ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate1.send(topic, JacksonUtils.getInstance().writeValueAsString(msg));
future.addCallback(
success -> {
},
failure -> log.error("消息发送失败:", failure));
} catch (JsonProcessingException e) {
log.error("{}主题发送kafka1消息失败:", e);
return;
}
}
public void send(String topic, Object msg) {
public void send2SecondKafka(String topic, Object msg) {
Page<Object> objectPage = new Page<>();
try {
ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(topic, JacksonUtils.getInstance().writeValueAsString(msg));
ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate2.send(topic, JacksonUtils.getInstance().writeValueAsString(msg));
future.addCallback(
success -> {},
success -> {
},
failure -> log.error("消息发送失败:", failure));
} catch (JsonProcessingException e) {
log.error("{}主题发送消息失败:", e);
log.error("{}主题发送kafka2消息失败:", e);
return;
}
}
......
......@@ -88,5 +88,9 @@ public class CrossLaneSnapshotDataDTO {
* 末车速度(km/h)
*/
private double endV;
/**
* 溢出区车辆数
*/
private int overflowNums;
}
......@@ -2,6 +2,7 @@ package net.wanji.datacenter.pojo.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.util.List;
/**
......@@ -179,5 +180,15 @@ public class LanePeriodicDataDTO {
*/
@JsonProperty("overflow_rate")
private double overflowRate;
/**
* 非机动车流量
*/
@JsonProperty("non_motor_flow")
private int nonMotorFlow;
/**
* 85位速度(km/h)
*/
@JsonProperty("v_85")
private double v85;
}
}
......@@ -118,5 +118,10 @@ public class LaneSnapshotDataDTO {
*/
@JsonProperty("overflow")
private boolean overflow;
/**
* 溢出区车辆数
*/
@JsonProperty("overflow_nums")
private int overflowNums;
}
}
......@@ -21,7 +21,7 @@ public class CrossSnapshotDataProcessService implements DataProcessService {
@Override
public void send(String topic, Object convertData) {
producerHandler.send(topic, convertData);
producerHandler.send2FirstKafka(topic, convertData);
}
@Override
......
......@@ -60,6 +60,21 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
// 五分钟周期内是否有全场红
public static final ConcurrentHashMap<String, Integer> periodHaveAllRedMap = new ConcurrentHashMap<>();
private static final Map<String, String> pedestrianMap = new HashMap<String, String>();
static {
pedestrianMap.put("11", "13NF80B5QN0");
pedestrianMap.put("12", "13NF80B5QN0");
pedestrianMap.put("13", "13NF80B5QN0");
pedestrianMap.put("14", "13NF80B5QN0");
pedestrianMap.put("21", "13NGH0B5RC0");
pedestrianMap.put("22", "13NGH0B5RC0");
pedestrianMap.put("23", "13NGH0B5RC0");
pedestrianMap.put("24", "13NGH0B5RC0");
pedestrianMap.put("31", "13NI00B5RM0");
pedestrianMap.put("32", "13NI00B5RM0");
}
@Autowired
RedisUtils redisUtils;
......@@ -81,7 +96,8 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
@Override
public void send(String topic, Object convertData) {
producerHandler.send(topic, convertData);
producerHandler.send2FirstKafka(topic, convertData);
producerHandler.send2SecondKafka(topic, convertData);
}
@Override
......@@ -128,6 +144,12 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
for (LanePeriodicDataDTO.EventList laneRealTimeItem : eventLists) {
String laneId = laneRealTimeItem.getLaneId();
if (laneId.length() == 2) {
CrossBaseLaneInfoPO laneInfoPO = new CrossBaseLaneInfoPO();
laneInfoPO.setCrossId(pedestrianMap.get(laneId));
laneInfoPO.setId(String.join(Constant.SEPARATOR_MINUS, pedestrianMap.get(laneId),laneId));
crossLaneDataRealTimePOS.add(lanePeriodicDataEventListConvert.convert2CrossLaneDataRealTimePO(laneRealTimeItem, laneInfoPO, startDate));
}
for (CrossBaseLaneInfoPO laneInfoPO : crossBaseLaneInfoPOS) {
String id = laneInfoPO.getId();
if (StringUtils.equals(laneId, id)) {
......@@ -144,12 +166,14 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
}
Map<String, List<CrossLaneDataRealTimePO>> laneDataMap = new HashMap<>();
laneDataMap.put(Constant.REAL_TIME_LANE_DATA_KEY, crossLaneDataRealTimePOS);
producerHandler.send(Constant.REAL_TIME_LANE_DATA_TOPIC, laneDataMap);
producerHandler.send2FirstKafka(Constant.REAL_TIME_LANE_DATA_TOPIC, laneDataMap);
producerHandler.send2SecondKafka(Constant.REAL_TIME_LANE_DATA_TOPIC, laneDataMap);
List<CrossDirDataRealtimePO> insertCrossDirDataRealtimePOS = dealCrossDataDirRealTime(crossDirDataRealtimePOS, decimalFormat, producerHandler, startDate);
Map<String, List<CrossDirDataRealtimePO>> dirDataMap = new HashMap<>();
dirDataMap.put(Constant.REAL_TIME_DIR_DATA_KEY, insertCrossDirDataRealtimePOS);
producerHandler.send(Constant.REAL_TIME_DIR_DATA_TOPIC, dirDataMap);
producerHandler.send2FirstKafka(Constant.REAL_TIME_DIR_DATA_TOPIC, dirDataMap);
producerHandler.send2SecondKafka(Constant.REAL_TIME_DIR_DATA_TOPIC, dirDataMap);
dealCrossDataRealTime(crossDataRealtimePOS, decimalFormat, producerHandler, startDate);
dealCrossDataTurnRealTime(crossTurnDataRealtimePOS, decimalFormat, producerHandler, startDate);
......@@ -283,7 +307,8 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
}
HashMap<String, List<CrossTurnDataRealtimePO>> turnDataMap = new HashMap<>();
turnDataMap.put(Constant.REAL_TIME_TURN_DATA_KEY, insertCrossTurnDataRealtimePOS);
producerHandler.send(Constant.REAL_TIME_TURN_DATA_TOPIC, turnDataMap);
producerHandler.send2FirstKafka(Constant.REAL_TIME_TURN_DATA_TOPIC, turnDataMap);
producerHandler.send2SecondKafka(Constant.REAL_TIME_TURN_DATA_TOPIC, turnDataMap);
}
private List<CrossDirDataRealtimePO> dealCrossDataDirRealTime(List<CrossDirDataRealtimePO> crossDirDataRealtimePOS,
......@@ -437,15 +462,11 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
spilloverIndex = spilloverIndex >= dirSpilloverIndex ? spilloverIndex : dirSpilloverIndex;
Double trafficIndexDir = dirDelayTimeIndex * 0.4 + dirCongextionIndex * 0.4 + spilloverIndex * 0.2;
log.error("trafficIndexDir信息:" + trafficIndexDir + ":" + dirDelayTimeIndex + ":" + dirCongextionIndex
+ ":" + spilloverIndex);
if (trafficIndexDir > 1.0) {
crossDirDataRealTimePO.setTrafficIndex(Double.valueOf(decimalFormat.format(trafficIndexDir)));
} else {
crossDirDataRealTimePO.setTrafficIndex(Double.valueOf(decimalFormat.format(1)));
}
log.error("trafficIndexDir处理后:" + crossDirDataRealTimePO.getTrafficIndex());
crossDirDataRealTimePO.setStatus(status);
crossDirDataRealTimePO.setDuration(5);
// 修改持续时间
......@@ -901,7 +922,8 @@ public class LanePeriodicDataProcessServiceImpl implements DataProcessService {
}
Map<String, List<CrossDataRealtimePO>> crossDataMap = new HashMap<>();
crossDataMap.put(Constant.REAL_TIME_CROSS_DATA_KEY, insertCrossDataRealtimePOS);
producerHandler.send(Constant.REAL_TIME_CROSS_DATA_TOPIC, crossDataMap);
producerHandler.send2FirstKafka(Constant.REAL_TIME_CROSS_DATA_TOPIC, crossDataMap);
producerHandler.send2SecondKafka(Constant.REAL_TIME_CROSS_DATA_TOPIC, crossDataMap);
}
private void replaceCrossPartFields(CrossDataRealtimePO po, Date startDate) {
......
......@@ -81,6 +81,7 @@ public class LaneSnapshotDataProcessService implements DataProcessService {
crossLaneSnapshotDataDTO.setStartV(list.getStartV());
crossLaneSnapshotDataDTO.setTailCarPosition(list.getTailCarPosition());
crossLaneSnapshotDataDTO.setEndV(list.getEndV());
crossLaneSnapshotDataDTO.setOverflowNums(list.getOverflowNums());
result.add(crossLaneSnapshotDataDTO);
}
}
......
......@@ -9,7 +9,7 @@ spring:
elasticsearch:
username: elastic
password: Wanji300552
uris: http://37.12.182.31:9200
uris: http://10.102.1.182:9200
application:
name: wj-datacenter-service
main:
......@@ -21,25 +21,34 @@ spring:
datasource:
master:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://37.12.182.29:3306/t_signal_control_jinan?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
url: jdbc:mysql://10.102.1.182:3306/t_signal_control_jinan?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
# url: jdbc:mysql://10.102.1.112:53306/t_signal_control?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root
password: Wanji300552
driver-class-name: com.mysql.cj.jdbc.Driver
slave:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://37.12.182.29:3306/t_signal_control_jinan?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
url: jdbc:mysql://10.102.1.182:3306/t_signal_control_jinan?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root
password: Wanji300552
driver-class-name: com.mysql.cj.jdbc.Driver
holo:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://37.12.182.29:3306/holo_roadnet?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
url: jdbc:mysql://10.102.1.182:3306/holo_roadnet?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&sessionVariables=sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'&useSSL=false&useCursorFetch=true
username: root
password: Wanji300552
driverClassName: com.mysql.cj.jdbc.Driver
kafka:
bootstrap-servers: 37.12.182.31:9092
#生产者配置
producer:
producer1:
bootstrap-servers: 10.102.1.182:9092
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
producer2:
bootstrap-servers: 10.102.1.181:9092
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
#消费者配置
consumer:
max-poll-records: 1
......@@ -50,11 +59,12 @@ spring:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: group
bootstrap-servers: 10.102.1.182:9092
listener:
concurrency: 3
ack-mode: manual_immediate
redis:
host: 37.12.182.29
host: 10.102.1.182
port: 6379
password: Wanji300552
jedis:
......@@ -120,7 +130,7 @@ eventsave:
kafka:
producer:
bootstrap.servers: 37.12.182.31:9092 # kafka连接地址
bootstrap.servers: 10.102.1.182:9092 # kafka连接地址
acks: 0
retries: 0
batch.size: 102400
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment