I try configure apache kafka in spring boot application. I read this documentation and follow the steps:
1) I add this lines to aplication.yaml
:
spring:
kafka:
bootstrap-servers: kafka_host:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
2) I create new Topic:
@Bean
public NewTopic responseTopic() {
return new NewTopic("new-topic", 5, (short) 1);
}
And now I want use KafkaTemplate
:
private final KafkaTemplate<String, byte[]> kafkaTemplate;
public KafkaEventBus(KafkaTemplate<String, byte[]> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
But Intellij IDE highlights:
To fix this I need create bean:
@Bean
public KafkaTemplate<String, byte[]> myMessageKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
And pass to constructor propirties greetingProducerFactory()
:
@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_hist4:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
But then what's the point of setting in application.yaml if I need create ProducerFactory manual?
Spring for Apache Kafka's KafkaTemplate is a thin wrapper around a Kafka producer that plays nicely with other Spring features, like dependency injection and automatic configuration. It provides a number of convenience methods for producing to Kafka topics.
Add the “Spring for Apache Kafka” dependency to your Spring Boot project. Step 2: Create a Configuration file named KafkaConfig. Below is the code for the KafkaConfig. java file.
@KafkaListener designates a method as a listener in a KafkaMessageListenerContainer. A KafkaMessageListenerContainer is how Spring Boot connects and polls records from Kafka under the hood. Remember that the @Component annotation tells Spring Boot to register our KafkaConsumer class as a managed Spring Bean.
By default KafkaTemplate<Object, Object>
is created by Spring Boot in KafkaAutoConfiguration
class. Since Spring considers generic type information during dependency injection the default bean can't be autowired into KafkaTemplate<String, byte[]>
.
I think you can safely ignore IDEA's warning; I have no problems wiring in Boot's template with different generic types...
@SpringBootApplication
public class So55280173Application {
public static void main(String[] args) {
SpringApplication.run(So55280173Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, Foo foo) {
return args -> {
template.send("so55280173", "foo");
if (foo.template == template) {
System.out.println("they are the same");
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("so55280173", 1, (short) 1);
}
}
@Component
class Foo {
final KafkaTemplate<String, String> template;
@Autowired
Foo(KafkaTemplate<String, String> template) {
this.template = template;
}
}
and
they are the same
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With