I have created a simple Kafka consumer that returns a Flux
of objects (the received messages) and I'm trying to test it using the StepVerifier
.
In my test, I do something like that:
Flux<Pojo> flux = consumer.start();
StepVerifier.create(flux)
.expectNextMatches(p -> p.getList().size() == 3)
.verifyComplete();
The assertion works ok (if I change the value from 3
to something else, the test fails). But, if the assertion passes, than the test never exit.
I have also tried to use the verify
method like so:
StepVerifier.create(flux)
.expectNextMatches(f -> f.getEntitlements().size() == 3)
.expectComplete()
.verify(Duration.ofSeconds(3));
In this case, I get this error:
java.lang.AssertionError: VerifySubscriber timed out on false
Any idea what I'm doing wrong?
The Kafka Flux
is probably infinite, so it never emits the onComplete
signal, which the test waits for. You can call .thenCancel().verify()
if you're only interested in testing that first value.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With