I developed a @KafkaListener
that is also marked with the ConsumerAwareRebalanceListener
interface, using Spring Boot 2.0.6. I implemented the onPartitionsAssigned
method, in which I rewind the offset of a fixed amount of time, let's say 60 seconds.
So far so good.
How can I test the above use case using the tools that Spring Kafka gives me? I supposed I need to start a Kafka broker (i.e., an EmbeddedKafka
), then stopping the listener and then rebooting it again, to test that it read again the messages arrived in the last 60 seconds.
Can somebody help me? I googled a little, but I didn't find anything. Thanks a lot.
public class MyRebalanceListener implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
long rewindTo = System.currentTimeMillis() - 60000;
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(partitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> rewindTo)));
offsetsForTimes.forEach((k, v) -> consumer.seek(k, v.offset()));
}
}
and
@RunWith(SpringRunner.class)
@SpringBootTest
public class So52973119ApplicationTests {
@Test
public void rebalanceListenerTests() {
MyRebalanceListener listener = new MyRebalanceListener();
Consumer<?, ?> consumer = mock(Consumer.class);
AtomicLong expected = new AtomicLong(System.currentTimeMillis() - 60_000);
given(consumer.offsetsForTimes(anyMap())).willAnswer(i -> {
AtomicLong offset = new AtomicLong();
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
Map<TopicPartition, Long> argument = i.getArgument(0);
argument.forEach((k, v) -> {
offsetsForTimes.put(k, new OffsetAndTimestamp(offset.incrementAndGet(), 0L));
assertThat(v).isBetween(expected.get(), expected.get() + 1_000);
});
return offsetsForTimes ;
});
TopicPartition t1 = new TopicPartition("foo", 0);
TopicPartition t2 = new TopicPartition("foo", 1);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(t1);
partitions.add(t2);
listener.onPartitionsAssigned(consumer, partitions);
verify(consumer).seek(t1, 1);
verify(consumer).seek(t2, 2);
}
}
The @KafkaListener
has an:
/**
* The unique identifier of the container managing for this endpoint.
* <p>If none is specified an auto-generated one is provided.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";
attribute, so you can get an access to its MessageListenerContainer
via mentioned KafkaListenerEndpointRegistry
, which you can simply @Autowired
into the test class based on Spring Testing Framework. Then, you can really stop()
and start()
that MessageListenerContainer
in your test method.
Also pay attention how @KafkaListener
has an autoStartup()
attribute also.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With