If I don't want to use auto-commmit mode - sping provides another ways to do it.
spring-kafkfa/#committing-offsets give us following information about commiting offsets:
RECORD
- commit the offset when the listener returns after processing the record.BATCH
- commit the offset when all the records returned by the poll() have been processed.TIME
- commit the offset when all the records returned by the poll() have been processed as long as the ackTime since the last commit has been exceeded.COUNT
- commit the offset when all the records returned by the poll() have been processed as long as ackCount records have been received since the last commit.COUNT_TIME
- similar to TIME and COUNT but the commit is performed if either condition is true.MANUAL
- the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.MANUAL_IMMEDIATE
- commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.
I have several questions:
TIME
As I understand somewhere in the spring framework exists loop which do loop
while(true){
data = consumer.poll();
data.foreach(record->listener.listen(record))
}
How often poll happens?
Is time the only criteria for committing offset? lets say poll returned a 100 records and when ackTime expired - only 60 records was processed?
I didn;t catch difference between MANUAL_IMMEDIATE
and MANUAL
Please, clarify these questons for me.
P.S.
As I understand Garry Russel answer foreach looks like this:
while(true){
data = consumer.poll();
data.foreach(record->new Thread(()->listener.listen(record)).start());
}
It depends on the version; the recent 1.3 version has a much simpler threading model, facilitated by KIP-62.
With that version, the listener is invoked on the caller thread; the next poll doesn't occur until all the current records are consumed. Aside from RECORD
(and MANUAL*
), the decision to commit is determined after all the records have been sent to the listener.
MANUAL_IMMEDIATE
means just that; the offset is committed immediately when the user acks; with MANUAL
, the manual offsets are committed after all records have been sent to the listener.
It's a bit more complicated with earlier versions; it is possible for one or two additional batches to be fetched, and the acks are performed before each poll so the offsets might be committed before all records in the first batch have been sent to the listener.
EDIT
Answering your comments below...
Yes; the threading was changed in 1.3. Before that, we had to keep polling the consumer to avoid the broker rebalancing the partitions. With <= 1.2, the ConsumerRecords
are handed off the listener thread via a queue with depth of 1. The poller keeps polling until it can't fit any more ConsumerRecords
in the queue; at which time, it pause
s the consumer (so that subsequent polls return no records), but we have to still keep calling poll
to avoid the rebalance. When the listener catches up, the consumer is resume
d and messages start flowing again.
So, worst case is the container holds 3 sets of records - the one currently being processed by the listener, the one in the queue and the one we couldn't put in the queue. Any outstanding offset commits (manual or otherwise) are performed just before each poll.
Will poll thread await processor thread?
No; we couldn't do that because that would cause a rebalance - it would have been the same as if we'd called the listener on the consumer thread.
KIP-62 was actually fixed in the 0.10.1.0 client, but we didn't change the threading until 1.3; it was a major simplification, thanks to KIP-62, and I would recommend using that version.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With