Commit 47b391b1 authored by duanruiming's avatar duanruiming

[add] 添加多kafka配置

parent 80ee780d
......@@ -101,6 +101,11 @@
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
......
......@@ -3,6 +3,7 @@ package net.wanji.opt;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
......@@ -16,6 +17,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableTransactionManagement
@EnableScheduling
@EnableAsync
@EnableKafka
public class SignalOptimizeApplication {
public static void main(String[] args) {
......
package net.wanji.opt.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author duanruiming
* @date 2024/11/18 21:48
* 多个kafka配置,不能使用Yml了,会有冲突,找不到groupId
*/
@Configuration
@EnableKafka
public class KafkaConsumersConfig {
//@Bean
//public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory1() {
// ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// factory.setConsumerFactory(consumerFactory1());
// // 可以在此设置其他属性,如concurrency, pollTimeout等
// return factory;
//}
//
//@Bean
//public ConsumerFactory<Integer, String> consumerFactory1() {
// return new DefaultKafkaConsumerFactory<>(consumerConfigs1());
//}
//
//@Bean
//public Map<String, Object> consumerConfigs1() {
// Map<String, Object> props = new HashMap<>();
// // 设置消费者所需的所有必要属性
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "37.12.182.31:9092");
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// // ... 其他必要属性
// return props;
//}
@Bean
public ConsumerFactory<Integer, String> consumerFactory2() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs2());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory2());
// 可以在此设置其他属性,如concurrency, pollTimeout等
return factory;
}
@Bean
public Map<String, Object> consumerConfigs2() {
Map<String, Object> props = new HashMap<>();
// 设置消费者所需的所有必要属性
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "37.12.182.31:9093");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// ... 其他必要属性
return props;
}
}
\ No newline at end of file
package net.wanji.opt.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author duanruiming 第一方案
* @date 2024/11/18 23:40
*/
@Component
public class KafkaListeners {
@KafkaListener(topics = {"green_belt_info_data"}, groupId = "group2", containerFactory = "kafkaListenerContainerFactory2")
public void listen2(String message) {
// 处理消息
System.err.println("22222222222222" + message);
}
}
......@@ -54,7 +54,7 @@ import java.util.stream.Stream;
@Service
public class RunningEvaluateServiceImpl implements RunningEvaluateService {
@Value("${spring.kafka.clusters.cluster1.bootstrap-servers}")
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.empty-phase-topic}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment