I am developing an application that creates some Akka actors to manage and process messages coming from a Kafka topic. Messages with the same key are processed by the same actor. I use the message key also to name the corresponding actor.
When a new message is read from the topic, I don't know if the actor with the id equal to the message key was already created by the actor system or not. Therefore, I try to resolve the actor using its name, and if it does not exist yet, I create it. I need to manage concurrency in regard to actor resolution. So it is possible that more than one client asks the actor system if an actor exists.
The code I am using right now is the following:
private CompletableFuture<ActorRef> getActor(String uuid) {
return system.actorSelection(String.format("/user/%s", uuid))
.resolveOne(Duration.ofMillis(1000))
.toCompletableFuture()
.exceptionally(ex ->
system.actorOf(Props.create(MyActor.class, uuid), uuid))
.exceptionally(ex -> {
try {
return system.actorSelection(String.format("/user/%s",uuid)).resolveOne(Duration.ofMillis(1000)).toCompletableFuture().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
The above code is not optimised, and the exception handling can be made better.
However, is there in Akka a more idiomatic way to resolve an actor, or to create it if it does not exist? Am I missing something?
You are viewing the documentation for the new actor APIs, to view the Akka Classic documentation, see Classic Fault Tolerance. When an actor throws an unexpected exception, a failure, while processing a message or during initialization, the actor will by default be stopped.
In Akka you can't create an instance of an Actor using the new keyword. Instead, you create Actor instances using a factory spawn methods. Spawn does not return an actor instance, but a reference, akka. actor.
In Akka, you can stop Actors by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop top level Actor. The actual termination of the actor is performed asynchronously.
An actor can stop itself by returning. Behaviors. stopped as the next behavior. A child actor can be forced to stop after it finishes processing its current message by using the stop method of the ActorContext from the parent actor.
Consider creating an actor that maintains as its state a map of message IDs to ActorRef
s. This "receptionist" actor would handle all requests to obtain a message processing actor. When the receptionist receives a request for an actor (the request would include the message ID), it tries to look up an associated actor in its map: if such an actor is found, it returns the ActorRef
to the sender; otherwise it creates a new processing actor, adds that actor to its map, and returns that actor reference to the sender.
I would consider using akka-cluster
and akka-cluster-sharding
. First, this gives you throughput, and as well, reliability. However, it will also make the system manage the creation of the 'entity' actors.
But you have to change the way you talk to those actors. You create a ShardRegion
actor which handles all the messages:
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyEventReceiver extends AbstractActor {
private final ActorRef shardRegion;
public static Props props() {
return Props.create(MyEventReceiver.class, MyEventReceiver::new);
}
static ShardRegion.MessageExtractor messageExtractor
= new ShardRegion.HashCodeMessageExtractor(100) {
// using the supplied hash code extractor to shard
// the actors based on the hashcode of the entityid
@Override
public String entityId(Object message) {
if (message instanceof EventInput) {
return ((EventInput) message).uuid().toString();
}
return null;
}
@Override
public Object entityMessage(Object message) {
if (message instanceof EventInput) {
return message;
}
return message; // I don't know why they do this it's in the sample
}
};
public MyEventReceiver() {
ActorSystem system = getContext().getSystem();
ClusterShardingSettings settings =
ClusterShardingSettings.create(system);
// this is setup for the money shot
shardRegion = ClusterSharding.get(system)
.start("EventShardingSytem",
Props.create(EventActor.class),
settings,
messageExtractor);
}
@Override
public Receive createReceive() {
return receiveBuilder().match(
EventInput.class,
e -> {
log.info("Got an event with UUID {} forwarding ... ",
e.uuid());
// the money shot
deviceRegion.tell(e, getSender());
}
).build();
}
}
So this Actor MyEventReceiver
runs on all nodes of your cluster, and encapsulates the shardRegion
Actor. You no longer message your EventActor
s directly, but, using the MyEventReceiver
and deviceRegion
Actors, you use the sharding system keep track of which node in the cluster the particular EventActor
lives on. It will create one if none have been created before, or route it messages if it has. Every EventActor
must have a unique id: which is extracted from the message (so a UUID
is pretty good for that, but it could be some other id, like a customerID, or an orderID, or whatever, as long as its unique for the Actor instance you want to process it with).
(I'm omitting the EventActor
code, it's otherwise a pretty normal Actor, depending what you are doing with it, the 'magic' is in the code above).
The sharding system automatically knows to create the EventActor
and allocate it to a shard, based on the algorithm you've chosen (in this particular case, it's based on the hashCode
of the unique ID, which is all I've ever used). Furthermore, you're guaranteed only one Actor for any given unique ID. The message is transparently routed to the correct Node and Shard wherever it is; from whichever Node and Shard it's being sent.
There's more info and sample code in the Akka site & documentation.
This is a pretty rad way to make sure that the same Entity/Actor always processes messages meant for it. The cluster and sharding takes automatic care of distributing the Actors properly, and failover and the like (you would have to add akka-persistence
to get passivation, rehydration, and failover if the Actor has a bunch of strict state associated with it (that must be restored)).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With