Sorry for the question being too generic, but someone has some tutorial or guide on how to perform producer and consumer testing with kafka embedded. I've tried several, but there are several versions of dependencies and none actually works =/
I'm using spring cloud stream kafka.
Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.
To test Kafka APIs, you use the API Connection test step. To add it to a test case, you will need a ReadyAPI Test Pro license. If you do not have it, try a ReadyAPI trial.
If you want to write integration tests using EmbeddedKafka , then you can do something like this. Assume we have some KafkaListener , which accepts a RequestDto as a Payload . In your test class you should create a TestConfiguration in order to create producer beans and to autowire KafkaTemplate into your test.
First, we start by decorating our test class with two pretty standard Spring annotations: The @SpringBootTest annotation will ensure that our test bootstraps the Spring application context. We also use the @DirtiesContext annotation, which will make sure this context is cleaned and reset between different tests.
We generally recommend using the Test Binder in tests but if you want to use an embedded kafka server, it can be done...
Add this to your POM...
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
Test app...
@SpringBootApplication
@EnableBinding(Processor.class)
public class So43330544Application {
public static void main(String[] args) {
SpringApplication.run(So43330544Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public byte[] handle(byte[] in){
return new String(in).toUpperCase().getBytes();
}
}
application.properties...
spring.cloud.stream.bindings.output.destination=so0544out
spring.cloud.stream.bindings.input.destination=so0544in
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=so0544
Test case...
@RunWith(SpringRunner.class)
@SpringBootTest
public class So43330544ApplicationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
@Autowired
private KafkaTemplate<byte[], byte[]> template;
@Autowired
private KafkaProperties properties;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}
@Test
public void testSendReceive() {
template.send("so0544in", "foo".getBytes());
Map<String, Object> configs = properties.buildConsumerProperties();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);
Consumer<byte[], byte[]> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("so0544out"));
ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
consumer.commitSync();
assertThat(records.count()).isEqualTo(1);
assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With