Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka and Akka Cluster

Following is my use case

  1. Bunch of applications enqueue messages in Kafka under different topics.
  2. Have consumer of each topic distribute the work to a worker in a cluster. The work can be classified as long running, memory intensive, simple etc and the worker is chosen accordingly.

This has me exploring Akka cluster for work distribution, routing and scaling. I can use Akka "Supervisor" as a Kafka consumer and assign incoming work to the appropriate worker based on its classification.

But what I am still trying to understand is the correct way to implement a resilient way of communication between the supervisor and workers in the Akka cluster. Because as soon as the supervisor consumes the message from Kafka, the Kafka offset is committed. If some error happens in processing after the offset commit, is the following acceptable way to recover and start from where it was last left?

Make the supervisor a persistent actor by using durable mailbox backed by Kafka. Supervisor enqueues work in Kafka and worker gets its work from Kafka and commits its offset only after completing the work.

like image 855
Prasanna Avatar asked Apr 10 '16 23:04

Prasanna


1 Answers

As said by Jaakko, it really depends on the third-part library you are using.

As far as I'm concerned I have successfully used Akka Streams Kafka although I did enable offset auto-commit.

However, this library may meet your needs since it allows you to customize offset commit (see sections External Offset Storage and Offset Storage in Kafka).

The documentation says:

The Consumer.committableSource makes it possible to commit offset positions to Kafka. Compared to auto-commit this gives exact control of when a message is considered consumed.

In order to disable auto-commit, you have to complete your Akka application.conf file by adding an akka.kafka.consumer section:

akka.kafka.consumer {

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.

  kafka-clients {
    # Disable auto-commit by default
    enable.auto.commit = false
  }

}

Last version of akka-stream-kafka_2.11 (version 0.16) is compatible with Akka 2.5.x but you have to override akka-stream_2.11 dependency with the one of the Akka toolkit. Currently, I am using this library with Akka 2.5.3 and it works really well.

Hope you will find what your are looking for :)

like image 89
Antoine Avatar answered Sep 30 '22 10:09

Antoine