Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Rebalancing and listeners pitfalls

I am reading Kafka: The Definitive Guide and would like to better understand the rebalance listener. The example in the book simple uses a HashMap to maintain the current offsets that have been processed and will commit the current state when a partition is revoked. My concerns are:

There are two issues/questions I have around the code example:

  1. The language used leads me to assume that these callbacks are made on a different thread. So, shouldn't thread safety be considered when applying the current offsets? Additionally, shouldn't the current batch be cancelled after this is committed?
  2. It says to use commitSync to make sure offsets are committed before the rebalance proceeds. However this is only synchronous within that consumer. Is there some mechanism where the coordinator will not proceed until it hears back from all subscribed consumers?
like image 835
Justin Pihony Avatar asked Apr 11 '18 17:04

Justin Pihony


1 Answers

  1. I re-read the section in the book and I agree I was a bit confused too!

    The Javadoc states:

    This callback will only execute in the user thread as part of the poll(long) call whenever partition assignment changes.

    I had a look at the code and confirmed the rebalance listener methods are indeed called in the same thread that owns the Consumer.

  2. Yes you should use commitSync() when committing in the rebalance listener.

    To explain why, let's look at the golden path example. We start with a consumer happily consuming and heartbeating regularly to the coordinator. At some point the coordinator returns a REBALANCE_IN_PROGRESS error to a heartbeat request. This can be caused by a new member wanting to join the group, a member leaving or failing to heartbeat, or new partition being added/removed from the subscription. At this point, all consumers need to rejoin the group.

    Before attempting to rejoin the group, the consumer will synchronously execute ConsumerRebalanceListener.onPartitionsRevoked(). Once the listener returns, the consumer will send a JoinRequest to the coordinator to rejoin the group.

    That said, and I think this is what you were thinking about, if your callback takes too long (> session.timeout.ms) to commit, the group could be already be in another generation and the partitions with offset trying to be committed assigned to another member. In that case, the commit will fail even if it was synchronous. But by using commitSync() in the listener you are guaranteed the consumer won't rejoin the group before completing the commit.

like image 115
Mickael Maison Avatar answered Oct 20 '22 07:10

Mickael Maison