What's the difference? Can the term KafkaConsumer and KafkaListener be used interchangeably?
The @KafkaListener
is a high level API for the ConcurrentMessageListenerContainer
, which spawns several internal listeners around KafkaConsumer
.
The difference is that that KafkaConsumer
API is pollable on demand when you call its poll()
whenever you need. The listener abstraction is about to have an infinite loop around that poll()
and it produces messages for records whenever they appear from the poll()
. We have there a task executor which runs a logic like this:
while (isRunning()) {
try {
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// Ignore, we're stopping
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
break;
}
catch (Exception e) {
handleConsumerException(e);
}
catch (Error e) { // NOSONAR - rethrown
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
if (runnable != null) {
runnable.run();
}
this.logger.error("Stopping container due to an Error", e);
wrapUp();
throw e;
}
}
The KafkaConsumer.poll()
is called in that pollAndInvoke();
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With