Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Proper way to programmatically stop an Alpakka Kafka stream

We are trying to use Akka Streams with Alpakka Kafka to consume a stream of events in a service. For handling event processing errors we are using Kafka autocommit and more than one queue. For example, if we have the topic user_created, which we want to consume from a products service, we also create user_created_for_products_failed and user_created_for_products_dead_letter. These two extra topics are coupled to a specific Kafka consumer group. If an event fails to be processed, it goes to the failed queue, where we try to consume again in five minutes--if it fails again it goes to dead letters.

On deployment we want to ensure that we don't lose events. So we are trying to stop the stream before stopping the application. As I said, we are using autocommit, but all of these events that are "flying" are not processed yet. Once the stream and application are stopped, we can deploy the new code and start the application again.

After reading the documentation, we have seen the KillSwitch feature. The problem that we are seeing in it is that the shutdown method returns Unit instead Future[Unit] as we expect. We are not sure that we won't lose events using it, because in tests it looks like it goes too fast to be working properly.

As a workaround, we create an ActorSystem for each stream and use the terminate method (which returns a Future[Terminate]). The problem with this solution is that we don't think that creating an ActorSystem per stream will scale well, and terminate takes a lot of time to resolve (in tests it takes up to one minute to shut down).

Have you faced a problem like this? Is there a faster way (compared to ActorSystem.terminate) to stop a stream and ensure that all the events that the Source has emitted have been processed?

like image 781
SergiGP Avatar asked Mar 07 '19 14:03

SergiGP


People also ask

How do I stop Kafka streaming?

To stop the application instance, call the KafkaStreams#close() method: // Stop the Kafka Streams threads streams. close(); To allow your application to gracefully shutdown in response to SIGTERM, it is recommended that you add a shutdown hook and call KafkaStreams#close .

Why is Akka streaming?

Akka Streams is a module built on top of Akka Actors to make the ingestion and processing of streams easy. It provides easy-to-use APIs to create streams that leverage the power of the Akka toolkit without explicitly defining actor behaviors and messages.

Why use Kafka streams over Kafka?

Kafka Streams greatly simplifies the stream processing from topics. Built on top of Kafka client libraries, it provides data parallelism, distributed coordination, fault tolerance, and scalability.

Where does Kafka streams run?

A Kafka Streams application is typically running on many application instances. Because Kafka Streams partitions the data for processing it, an application's entire state is spread across the local state stores of the application's running instances.


Video Answer


1 Answers

From the documentation (emphasis mine):

When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.

val (consumerControl, streamComplete) =
  Consumer
    .plainSource(consumerSettings,
                 Subscriptions.assignmentWithOffset(
                   new TopicPartition(topic, 0) -> offset
                 ))
    .via(businessFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

consumerControl.shutdown()

Consumer.control.shutdown() returns a Future[Done]. From its Scaladoc description:

Shutdown the consumer Source. It will wait for outstanding offset commit requests to finish before shutting down.

Alternatively, if you're using offset storage in Kafka, use Consumer.Control.drainAndShutdown, which also returns a Future. Again from the documentation (which contains more information about what drainAndShutdown does under the covers):

val drainingControl =
  Consumer
    .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record).map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()

val streamComplete = drainingControl.drainAndShutdown()

The Scaladoc description for drainAndShutdown:

Stop producing messages from the Source, wait for stream completion and shut down the consumer Source so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.

like image 123
Jeffrey Chung Avatar answered Oct 14 '22 08:10

Jeffrey Chung