Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Retrieve an Akka actor or create it if it does not exist

I am developing an application that creates some Akka actors to manage and process messages coming from a Kafka topic. Messages with the same key are processed by the same actor. I use the message key also to name the corresponding actor.

When a new message is read from the topic, I don't know if the actor with the id equal to the message key was already created by the actor system or not. Therefore, I try to resolve the actor using its name, and if it does not exist yet, I create it. I need to manage concurrency in regard to actor resolution. So it is possible that more than one client asks the actor system if an actor exists.

The code I am using right now is the following:

private CompletableFuture<ActorRef> getActor(String uuid) {
    return system.actorSelection(String.format("/user/%s", uuid))
                 .resolveOne(Duration.ofMillis(1000))
                 .toCompletableFuture()
                 .exceptionally(ex -> 
                     system.actorOf(Props.create(MyActor.class, uuid), uuid))
                 .exceptionally(ex -> {
                     try {
                         return system.actorSelection(String.format("/user/%s",uuid)).resolveOne(Duration.ofMillis(1000)).toCompletableFuture().get();
                     } catch (InterruptedException | ExecutionException e) {
                         throw new RuntimeException(e);
                     }
                 });
}

The above code is not optimised, and the exception handling can be made better.

However, is there in Akka a more idiomatic way to resolve an actor, or to create it if it does not exist? Am I missing something?

like image 764
riccardo.cardin Avatar asked Mar 18 '19 20:03

riccardo.cardin


People also ask

What happens when an actor fails in Akka?

You are viewing the documentation for the new actor APIs, to view the Akka Classic documentation, see Classic Fault Tolerance. When an actor throws an unexpected exception, a failure, while processing a message or during initialization, the actor will by default be stopped.

How do you make an Akka actor?

In Akka you can't create an instance of an Actor using the new keyword. Instead, you create Actor instances using a factory spawn methods. Spawn does not return an actor instance, but a reference, akka. actor.

Can an Akka actor stop other actors?

In Akka, you can stop Actors by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop top level Actor. The actual termination of the actor is performed asynchronously.

Can an Akka actor stop itself?

An actor can stop itself by returning. Behaviors. stopped as the next behavior. A child actor can be forced to stop after it finishes processing its current message by using the stop method of the ActorContext from the parent actor.


2 Answers

Consider creating an actor that maintains as its state a map of message IDs to ActorRefs. This "receptionist" actor would handle all requests to obtain a message processing actor. When the receptionist receives a request for an actor (the request would include the message ID), it tries to look up an associated actor in its map: if such an actor is found, it returns the ActorRef to the sender; otherwise it creates a new processing actor, adds that actor to its map, and returns that actor reference to the sender.

like image 176
Jeffrey Chung Avatar answered Oct 25 '22 10:10

Jeffrey Chung


I would consider using akka-cluster and akka-cluster-sharding. First, this gives you throughput, and as well, reliability. However, it will also make the system manage the creation of the 'entity' actors.

But you have to change the way you talk to those actors. You create a ShardRegion actor which handles all the messages:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;


public class MyEventReceiver extends AbstractActor {

    private final ActorRef shardRegion;

    public static Props props() {
        return Props.create(MyEventReceiver.class, MyEventReceiver::new);
    }

    static ShardRegion.MessageExtractor messageExtractor 
      = new ShardRegion.HashCodeMessageExtractor(100) {
            // using the supplied hash code extractor to shard
            // the actors based on the hashcode of the entityid

        @Override
        public String entityId(Object message) {
            if (message instanceof EventInput) {
                return ((EventInput) message).uuid().toString();
            }
            return null;
        }

        @Override
        public Object entityMessage(Object message) {
            if (message instanceof EventInput) {
                return message;
            }
            return message; // I don't know why they do this it's in the sample
        }
    };


    public MyEventReceiver() {
        ActorSystem system = getContext().getSystem();
        ClusterShardingSettings settings =
           ClusterShardingSettings.create(system);
        // this is setup for the money shot
        shardRegion = ClusterSharding.get(system)
                .start("EventShardingSytem",
                        Props.create(EventActor.class),
                        settings,
                        messageExtractor);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(
                EventInput.class,
                e -> {
                    log.info("Got an event with UUID {} forwarding ... ",
                            e.uuid());
                    // the money shot
                    deviceRegion.tell(e, getSender());
                }
        ).build();
    }
}

So this Actor MyEventReceiver runs on all nodes of your cluster, and encapsulates the shardRegion Actor. You no longer message your EventActors directly, but, using the MyEventReceiver and deviceRegion Actors, you use the sharding system keep track of which node in the cluster the particular EventActor lives on. It will create one if none have been created before, or route it messages if it has. Every EventActor must have a unique id: which is extracted from the message (so a UUID is pretty good for that, but it could be some other id, like a customerID, or an orderID, or whatever, as long as its unique for the Actor instance you want to process it with).

(I'm omitting the EventActor code, it's otherwise a pretty normal Actor, depending what you are doing with it, the 'magic' is in the code above).

The sharding system automatically knows to create the EventActor and allocate it to a shard, based on the algorithm you've chosen (in this particular case, it's based on the hashCode of the unique ID, which is all I've ever used). Furthermore, you're guaranteed only one Actor for any given unique ID. The message is transparently routed to the correct Node and Shard wherever it is; from whichever Node and Shard it's being sent.

There's more info and sample code in the Akka site & documentation.

This is a pretty rad way to make sure that the same Entity/Actor always processes messages meant for it. The cluster and sharding takes automatic care of distributing the Actors properly, and failover and the like (you would have to add akka-persistence to get passivation, rehydration, and failover if the Actor has a bunch of strict state associated with it (that must be restored)).

like image 28
scot Avatar answered Oct 25 '22 10:10

scot