I have an Akka Stream and I want the stream to send messages down stream approximately every second.
I tried two ways to solve this problem, the first way was to make the producer at the start of the stream only send messages once every second when a Continue messages comes into this actor.
// When receive a Continue message in a ActorPublisher
// do work then...
if (totalDemand > 0) {
import scala.concurrent.duration._
context.system.scheduler.scheduleOnce(1 second, self, Continue)
}
This works for a short while then a flood of Continue messages appear in the ActorPublisher actor, I assume (guess but not sure) from downstream via back-pressure requesting messages as the downstream can consume fast but the upstream is not producing at a fast rate. So this method failed.
The other way I tried was via backpressure control, I used a MaxInFlightRequestStrategy
on the ActorSubscriber
at the end of the stream to limit the number of messages to 1 per second. This works but messages coming in come in at approximately three or so at a time, not just one at a time. It seems the backpressure control doesn't immediately change the rate of messages coming in OR messages were already queued in the stream and waiting to be processed.
So the problem is, how can I have an Akka Stream which can process one message only per second?
I discovered that MaxInFlightRequestStrategy
is a valid way to do it but I should set the batch size to 1, its batch size is default 5, which was causing the problem I found. Also its an over-complicated way to solve the problem now that I am looking at the submitted answer here.
You can either put your elements through the throttling flow, which will back pressure a fast source, or you can use combination of tick
and zip
.
The first solution would be like this:
val veryFastSource =
Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))
val throttlingFlow = Flow[Long].throttle(
// how many elements do you allow
elements = 1,
// in what unit of time
per = 1.second,
maximumBurst = 0,
// you can also set this to Enforcing, but then your
// stream will collapse if exceeding the number of elements / s
mode = ThrottleMode.Shaping
)
veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println))
The second solution would be like this:
val veryFastSource =
Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000))
val tickingSource = Source.tick(1.second, 1.second, 0)
veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With