Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring-kafka offset commiting ways

If I don't want to use auto-commmit mode - sping provides another ways to do it.

spring-kafkfa/#committing-offsets give us following information about commiting offsets:

RECORD - commit the offset when the listener returns after processing the record.
BATCH - commit the offset when all the records returned by the poll() have been processed.
TIME - commit the offset when all the records returned by the poll() have been processed as long as the ackTime since the last commit has been exceeded.
COUNT - commit the offset when all the records returned by the poll() have been processed as long as ackCount records have been received since the last commit.
COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
MANUAL - the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.
MANUAL_IMMEDIATE - commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.

I have several questions:

TIME As I understand somewhere in the spring framework exists loop which do loop

while(true){
   data = consumer.poll();
   data.foreach(record->listener.listen(record))
}

How often poll happens?

Is time the only criteria for committing offset? lets say poll returned a 100 records and when ackTime expired - only 60 records was processed?

I didn;t catch difference between MANUAL_IMMEDIATE and MANUAL

Please, clarify these questons for me.

P.S.

As I understand Garry Russel answer foreach looks like this:

while(true){
   data = consumer.poll();
   data.foreach(record->new Thread(()->listener.listen(record)).start());
}
like image 415
gstackoverflow Avatar asked Oct 30 '22 00:10

gstackoverflow


1 Answers

It depends on the version; the recent 1.3 version has a much simpler threading model, facilitated by KIP-62.

With that version, the listener is invoked on the caller thread; the next poll doesn't occur until all the current records are consumed. Aside from RECORD (and MANUAL*), the decision to commit is determined after all the records have been sent to the listener.

MANUAL_IMMEDIATE means just that; the offset is committed immediately when the user acks; with MANUAL, the manual offsets are committed after all records have been sent to the listener.

It's a bit more complicated with earlier versions; it is possible for one or two additional batches to be fetched, and the acks are performed before each poll so the offsets might be committed before all records in the first batch have been sent to the listener.

EDIT

Answering your comments below...

Yes; the threading was changed in 1.3. Before that, we had to keep polling the consumer to avoid the broker rebalancing the partitions. With <= 1.2, the ConsumerRecords are handed off the listener thread via a queue with depth of 1. The poller keeps polling until it can't fit any more ConsumerRecords in the queue; at which time, it pauses the consumer (so that subsequent polls return no records), but we have to still keep calling poll to avoid the rebalance. When the listener catches up, the consumer is resumed and messages start flowing again.

So, worst case is the container holds 3 sets of records - the one currently being processed by the listener, the one in the queue and the one we couldn't put in the queue. Any outstanding offset commits (manual or otherwise) are performed just before each poll.

Will poll thread await processor thread?

No; we couldn't do that because that would cause a rebalance - it would have been the same as if we'd called the listener on the consumer thread.

KIP-62 was actually fixed in the 0.10.1.0 client, but we didn't change the threading until 1.3; it was a major simplification, thanks to KIP-62, and I would recommend using that version.

like image 121
Gary Russell Avatar answered Nov 15 '22 05:11

Gary Russell