I am using Akka Distributed Pub/Sub and have a single publisher and a subscriber. My publisher is way faster than the subscriber. Is there a way to slow down the publisher after a certain point?
Publisher code:
public class Publisher extends AbstractActor {
private ActorRef mediator;
static public Props props() {
return Props.create(Publisher.class, () -> new Publisher());
}
public Publisher () {
this.mediator = DistributedPubSub.get(getContext().system()).mediator();
this.self().tell(0, ActorRef.noSender());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, msg -> {
// Sending message to Subscriber
mediator.tell(
new DistributedPubSubMediator.Send(
"/user/" + Subscriber.class.getName(),
msg.toString(),
false),
getSelf());
getSelf().tell(++msg, ActorRef.noSender());
})
.build();
}
}
Subscriber code:
public class Subscriber extends AbstractActor {
static public Props props() {
return Props.create(Subscriber.class, () -> new Subscriber());
}
public Subscriber () {
ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg -> {
System.out.println("Subscriber message received: " + msg);
Thread.sleep(10000);
})
.build();
}
}
Unfortunately, as currently designed, I don't think that there is a way to provide "back-pressure" to the original Sender. Since you are using ActorRef.tell
to send the message to the mediator
there is no way to get a signal that the downstream receiver is backing up. This is because tell
, the method you are using, returns a void
.
Switch To Ask
If you switch your tell
to an ask
you can set an appropriate Timeout
value that will at least let you know when you don't receive a response within a particular duration.
Switch To Streams
"Back-pressure" is a primary feature of akka streams. Therefore, by switching to a stream implementation you will be able to achieve your desired goal.
If it possible to create a stream Source
from your original data, then you could use Sink.actorRef
to create a Sink
from the mediator
and use Flow.throttle to control the rate of flow to the mediator.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With