Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Gracefully restart a Reactive-Kafka Consumer Stream on failure

Problem When I restart/complete/STOP stream the old Consumer does not Die/Shutdown:

[INFO ] a.a.RepointableActorRef -
  Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] 
  from Actor[akka://ufo-sightings/deadLetters]
  to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
  was not delivered. [1] dead letters encountered.

Description I'm building a service that receives a message from Kafka topic and sends the message to an external service via HTTP request.

  1. A connection with the external service can be broken, and my service needs to retry the request.

  2. Additionally, if there is an error in the Stream, entire stream needs to restart.

  3. Finally, sometimes I don't need the stream and its corresponding Kafka-consumer and I would like to shut down the entire stream

So I have a Stream:

Consumer.committableSource(customizedSettings, subscriptions)
  .flatMapConcat(sourceFunction)
  .toMat(Sink.ignore)
  .run

Http request is sent in sourceFunction

I followed new Kafka Consumer Restart instructions in the new documentation

  RestartSource.withBackoff(
      minBackoff = 20.seconds,
      maxBackoff = 5.minutes,
      randomFactor = 0.2 ) { () =>
          Consumer.committableSource(customizedSettings, subscriptions)
            .watchTermination() {
                case (consumerControl, streamComplete) =>
                  logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
                  consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = ${consumer.id} at ${DateTime.now}"))
                  streamComplete
                    .flatMap { _ =>
                      consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
                    }
                    .recoverWith {
                      case _ =>
                        consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} ERROR:CLOSED FROM UPSTREAM"))
                    }
             }
            .flatMapConcat(sourceFunction)
      }
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.left)
      .run

There is an issue opened that discusses this non-terminating Consumer in a complex Akka-stream, but there is no solution yet.

Is there a workaround that forces the Kafka Consumer termination

like image 508
Rabzu Avatar asked May 22 '18 10:05

Rabzu


People also ask

How to handle failures in Kafka?

The Kafka connector proposes three strategies to handle failures. ignore continues the processing even if there are failures. dead-letter-queue sends failing messages to another Kafka topic for further investigation. Next time, we will talk about the commit strategies because failures are inevitable, but successful processing happens sometimes!

What is the difference between Kafka consumer and Kafka Streams?

Kafka Consumer provides the basic functionalities to handle messages. Kafka Streams also provides real-time stream processing on top of the Kafka Consumer client. In this tutorial, we'll explain the features of Kafka Streams to make the stream processing experience simple and easy. 2.

Does Kafka Streams support stateless support?

Only stateless support. The client does not keep the previous state and evaluates each record in the stream individually 2.2. Kafka Streams API Kafka Streams greatly simplifies the stream processing from topics.

What is Apache Kafka consumer?

1. Introduction Apache Kafka is the most popular open-source distributed and fault-tolerant stream processing system. Kafka Consumer provides the basic functionalities to handle messages. Kafka Streams also provides real-time stream processing on top of the Kafka Consumer client.


1 Answers

How about wrapping the consumer in an Actor and registering a KillSwitch, see: https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html#dynamic-stream-handling

Then in the Actor postStop method you can terminate the stream. By wrapping the Actor in a BackoffSupervisor, you get the exponential backoff.

Example actor: https://github.com/tradecloud/kafka-akka-extension/blob/master/src/main/scala/nl/tradecloud/kafka/KafkaSubscriberActor.scala#L27

like image 96
Bennie Krijger Avatar answered Oct 18 '22 19:10

Bennie Krijger