Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multithreaded Kafka Consumer or PerPartition-PerConsumer

What should be the better approach while implementing kafka consumer.

Objective is read from Kafka and write back to db. Millions of Rows

Approach 1 : Per Partition - Per Consumer - Wait for message to consume(i.e. written back to db) then proceed to next in polling loop.

Approach 2 : Per Partition - Per Consumer - Send Record to worker thread or threadpool to be written back to db and later on commit the offset and keep on polling. Offset Management needs to be taken taken care. In this don't wait for message to written back to DB. Just keep on polling, pass the message to worker thread.

Any insights on both of them ?

Thanks

like image 566
bittu Avatar asked Dec 06 '16 08:12

bittu


1 Answers

Approach 1: The approach is applicable only if it is possible for you to estimate the message processing time otherwise it is not recommended.

Problem: In this approach the main problem is keeping the consumer alive, If you will wait for the messages to be completely processed before calling the poll() again, you have to make sure that your consumer should be alive until it calls poll() because kafka maintains a property named "session.timeout.ms". The kafka broker/cluster takes it action on the value of this property, if consumer is unable to call poll() again with in the time period of "session.timeout.ms", broker will mark consumer dead and it will be kicked out. Now, when consumer will finish the message processing and will call poll() again, it is considered as a new joiner and will again give the set of records starting from the offset as it was before. Keeping this scenario in mind, consumer will be stuck in an infinite loop where it will never proceed its offset.

Possible solution 1: To use this approach you need a good value of following property "session.timeout.ms" with the following side effects:

1: Value too low: Consumer will be marked dead as described above and will never proceed its offset, however messages will be processed but every time it finish the messages it will get the previous messages + new messages again.

2: Value too high: Broker will be very late in detecting the genuine failure of consumer that will result in record duplication and will effect the overall throughput.

Possible Solution 2: (Only valid for version 0.10.1.x) Official fix by Kafka in release (0.10.1.0). In this approach, two notable entities are introduced: a new property "max.poll.interval.ms" that sets the maximum delay between client calls to poll() and a background thread that is responsible for keeping the consumer alive. So, in a scenario, when consumer calls a method poll() and then gets busy in message processing , the internal background thread will keep the heart beat alive and as a result consumer will stay alive. However, this internal background thread will itself remain alive until the timeout value for the property “max.poll.interval.ms” remains valid. So, this thread will wait for the consumer to call poll() with in the time period value of “max.poll.interval.ms” if not, it will send a leave request and will die itself as well."

Again the tricky part in this solution is to find a suitable value of this property: "max.poll.interval.ms" (very important, This time will be the time for which background thread will keep the heartbeat alive without the need of explicit calling poll()).

Approach 2: Using a worker thread is a good idea but then you have to maintain an internal queue or validation for received messages which can be complex and also you need to use manual commits against auto commits. For more information about commits see this and search heading "Commits and Offsets".

Problem: In this approach the main problem is to keep track of messages received and messages processed successfully. As, your consumer will receive the message it will pass message to respective worker thread and will commit the offset and move forward to receive more messages. During this process you have to take care of following issues:

  1. What if the message is received and offset committed but later for whatever reason the worker thread failed to process the message, now how to get that message again ?
  2. What if messages are received by consumer but there are no free worker threads to process ?

Solution: There can be different ways to resolve the above issues and one way is to use the internal queue to keep the messages and manual commits that will be sent only when worker thread will report the successful processing of the message. However a very careful implementation is required because it can leads to complex code and can also results in memory management or threading issues.

Suggestion: Depending upon your requirements, you can use one approach or the other with implementing fixed for the possible issues as described above. However I would recommend a more robust solution will be to use partition pause/resume. In very abstract way your consumer should do following steps:

1: poll () for messages.

2: Pause all the respective topics/partitions.

3: Assigned messages to worker threads and wait for their processing.

4: Keep calling poll() but as the partitions are paused there will be no extra message received while consumer will be kept alive. (Make sure no new topic is registered during this point)

5: If all worker threads should report message processing success/failure then commit the offsets accordingly.

6: Resume all the partitions.

Note: There can be better ways or other solutions possible depending upon your scenario and requirements. It's just an idea or one of the possible solutions.

like image 131
TechMaster Avatar answered Oct 17 '22 20:10

TechMaster