Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka.Kafka - warning message - Resuming partitions

Am getting debug messages continuously as resuming the partitions for all the topics. Like below. This message prints every millisecond on my server continuously.

08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-7
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-6
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-9
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-8

This Here is the code

val zookeeperHost = "localhost"
val zookeeperPort = "9092"
// Kafka queue settings
 val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
       .withBootstrapServers(zookeeperHost + ":" + zookeeperPort)
       .withGroupId((groupName))
       .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

 // Streaming the Messages from Kafka queue
 Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName))
     .map(msg => {
       consumed(msg.record.value)
     })
     .runWith(Sink.ignore)

Please help to do the partition correctly to stop the DEBUG messages.

like image 812
Arun Kannan Avatar asked Nov 08 '22 05:11

Arun Kannan


1 Answers

Seems that the reactive-kafka code resumes every partition before starting to fetch:

consumer.assignment().asScala.foreach { tp =>
  if (partitionsToFetch.contains(tp)) consumer.resume(java.util.Collections.singleton(tp))
  else consumer.pause(java.util.Collections.singleton(tp))
}
def tryPoll{...}
checkNoResult(tryPoll(0))

KafkaConsumer.resume method is a no-op if the partitions were not previously paused.

like image 86
amethystic Avatar answered Nov 15 '22 09:11

amethystic