Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to limit an Akka Stream to execute and send down one message only once per second?

I have an Akka Stream and I want the stream to send messages down stream approximately every second.

I tried two ways to solve this problem, the first way was to make the producer at the start of the stream only send messages once every second when a Continue messages comes into this actor.

// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }

This works for a short while then a flood of Continue messages appear in the ActorPublisher actor, I assume (guess but not sure) from downstream via back-pressure requesting messages as the downstream can consume fast but the upstream is not producing at a fast rate. So this method failed.

The other way I tried was via backpressure control, I used a MaxInFlightRequestStrategy on the ActorSubscriber at the end of the stream to limit the number of messages to 1 per second. This works but messages coming in come in at approximately three or so at a time, not just one at a time. It seems the backpressure control doesn't immediately change the rate of messages coming in OR messages were already queued in the stream and waiting to be processed.

So the problem is, how can I have an Akka Stream which can process one message only per second?


I discovered that MaxInFlightRequestStrategy is a valid way to do it but I should set the batch size to 1, its batch size is default 5, which was causing the problem I found. Also its an over-complicated way to solve the problem now that I am looking at the submitted answer here.

like image 226
Phil Avatar asked Mar 25 '16 06:03

Phil


1 Answers

You can either put your elements through the throttling flow, which will back pressure a fast source, or you can use combination of tick and zip.

The first solution would be like this:

val veryFastSource =
  Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))

val throttlingFlow = Flow[Long].throttle(
  // how many elements do you allow
  elements = 1,
  // in what unit of time
  per = 1.second,
  maximumBurst = 0,
  // you can also set this to Enforcing, but then your
  // stream will collapse if exceeding the number of elements / s
  mode = ThrottleMode.Shaping
)

veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println))

The second solution would be like this:

val veryFastSource =
  Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))

val tickingSource = Source.tick(1.second, 1.second, 0) 

veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println))
like image 127
lpiepiora Avatar answered Nov 18 '22 13:11

lpiepiora