Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wait for full Kafka-message batch with Spring Boot?

When batch-consuming Kafka messages, one can limit the batch size using max.poll.records.

In case the consumer is very fast and its commit offset does not lag significantly, this means that most batches will be much smaller. I'd like to only receive "full" batches, i.e., having my consumer function only invoked then the batch size is reached. So I'm looking for something like min.poll.records, which does not exist in that form.

Here is a minimal example of what I'm doing:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

@Component
class TestConsumer {
    @Bean
    fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
        val configs = kafkaProperties.buildConsumerProperties()
        configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
        factory.isBatchListener = true
        return factory
    }

    @KafkaListener(
        topics = ["myTopic"],
        containerFactory = "kafkaBatchListenerContainerFactory"
    )
    fun batchListen(values: List<ConsumerRecord<String, String>>) {
        println(values.count())
    }
}

When started with a bit of consumer lag, it outputs something like:

[...]
1000
1000
1000
[...]
1000
1000
1000
256
27
8
9
3
1
1
23
[...]

Is there any way (without manually sleep-ing in the consumer handler in case of "incomplete" batches) to have the function invoked when one of the following two conditions is met? - only when at least n messages are there - or at least m milliseconds were spend waiting

like image 286
Tobias Hermann Avatar asked Aug 31 '25 02:08

Tobias Hermann


2 Answers

Kafka has no min.poll.records; you can approximate it using fetch.min.bytes if your records are a similar length. Also see fetch.max.wait.ms.

like image 56
Gary Russell Avatar answered Sep 02 '25 19:09

Gary Russell


Since, as pointed out nicely by Gary Russel, it's currently not possible to do make Kafka do what I was looking for, here is my solution with manual buffering, which achieves the desired behavior:

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.text.SimpleDateFormat
import java.util.*
import javax.annotation.PreDestroy

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

@Component
class TestConsumer {
    @KafkaListener(topics = ["myTopic"])
    fun listen(value: String) {
        addToBuffer(value)
    }

    private val buffer = mutableSetOf<String>()

    @Synchronized
    fun addToBuffer(message: String) {
        buffer.add(message)
        if (buffer.size >= 300) {
            flushBuffer()
        }
    }

    @Synchronized
    @Scheduled(fixedDelay = 700)
    @PreDestroy
    fun flushBuffer() {
        if (buffer.isEmpty()) {
            return
        }
        val timestamp = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(Date())
        println("$timestamp: ${buffer.count()}")
        buffer.clear()
    }
}

Example output:

[...]
2020-01-03T07:01:13.032: 300
2020-01-03T07:01:13.041: 300
2020-01-03T07:01:13.078: 300
2020-01-03T07:01:13.133: 300
2020-01-03T07:01:13.143: 300
2020-01-03T07:01:13.188: 300
2020-01-03T07:01:13.197: 300
2020-01-03T07:01:13.321: 300
2020-01-03T07:01:13.352: 300
2020-01-03T07:01:13.359: 300
2020-01-03T07:01:13.399: 300
2020-01-03T07:01:13.407: 300
2020-01-03T07:01:13.533: 300
2020-01-03T07:01:13.571: 300
2020-01-03T07:01:13.580: 300
2020-01-03T07:01:13.607: 300
2020-01-03T07:01:13.611: 300
2020-01-03T07:01:13.632: 300
2020-01-03T07:01:13.682: 300
2020-01-03T07:01:13.687: 300
2020-01-03T07:01:13.708: 300
2020-01-03T07:01:13.712: 300
2020-01-03T07:01:13.738: 300
2020-01-03T07:01:13.880: 300
2020-01-03T07:01:13.884: 300
2020-01-03T07:01:13.911: 300
2020-01-03T07:01:14.301: 300
2020-01-03T07:01:14.714: 300
2020-01-03T07:01:15.029: 300
2020-01-03T07:01:15.459: 300
2020-01-03T07:01:15.888: 300
2020-01-03T07:01:16.359: 300
[...]

So we see after catching up with the consumer lag, it provides batches of 300 matching the topic throughput.

Yes, the @Synchronized does kill concurrent processing, but in my use-case, this part is far away from being the bottleneck.

like image 31
Tobias Hermann Avatar answered Sep 02 '25 18:09

Tobias Hermann