Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to test a ConsumerAwareRebalanceListener?

I developed a @KafkaListener that is also marked with the ConsumerAwareRebalanceListener interface, using Spring Boot 2.0.6. I implemented the onPartitionsAssigned method, in which I rewind the offset of a fixed amount of time, let's say 60 seconds.

So far so good.

How can I test the above use case using the tools that Spring Kafka gives me? I supposed I need to start a Kafka broker (i.e., an EmbeddedKafka), then stopping the listener and then rebooting it again, to test that it read again the messages arrived in the last 60 seconds.

Can somebody help me? I googled a little, but I didn't find anything. Thanks a lot.

like image 536
riccardo.cardin Avatar asked Mar 06 '23 05:03

riccardo.cardin


2 Answers

public class MyRebalanceListener implements ConsumerAwareRebalanceListener {

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long rewindTo = System.currentTimeMillis() - 60000;
        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(partitions.stream()
                .collect(Collectors.toMap(tp -> tp, tp -> rewindTo)));
        offsetsForTimes.forEach((k, v) -> consumer.seek(k, v.offset()));
    }

}

and

@RunWith(SpringRunner.class)
@SpringBootTest
public class So52973119ApplicationTests {

    @Test
    public void rebalanceListenerTests() {
        MyRebalanceListener listener = new MyRebalanceListener();
        Consumer<?, ?> consumer = mock(Consumer.class);
        AtomicLong expected = new AtomicLong(System.currentTimeMillis() - 60_000);
        given(consumer.offsetsForTimes(anyMap())).willAnswer(i -> {
            AtomicLong offset = new AtomicLong();
            Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
            Map<TopicPartition, Long> argument = i.getArgument(0);
            argument.forEach((k, v) -> {
                offsetsForTimes.put(k, new OffsetAndTimestamp(offset.incrementAndGet(), 0L));
                assertThat(v).isBetween(expected.get(), expected.get() + 1_000);
            });
            return offsetsForTimes ;
        });
        TopicPartition t1 = new TopicPartition("foo", 0);
        TopicPartition t2 = new TopicPartition("foo", 1);
        List<TopicPartition> partitions = new ArrayList<>();
        partitions.add(t1);
        partitions.add(t2);
        listener.onPartitionsAssigned(consumer, partitions);
        verify(consumer).seek(t1, 1);
        verify(consumer).seek(t2, 2);
    }

}
like image 71
Gary Russell Avatar answered Mar 08 '23 23:03

Gary Russell


The @KafkaListener has an:

/**
 * The unique identifier of the container managing for this endpoint.
 * <p>If none is specified an auto-generated one is provided.
 * @return the {@code id} for the container managing for this endpoint.
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
 */
String id() default "";

attribute, so you can get an access to its MessageListenerContainer via mentioned KafkaListenerEndpointRegistry, which you can simply @Autowired into the test class based on Spring Testing Framework. Then, you can really stop() and start() that MessageListenerContainer in your test method.

Also pay attention how @KafkaListener has an autoStartup() attribute also.

like image 31
Artem Bilan Avatar answered Mar 08 '23 22:03

Artem Bilan