Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Model for computational tasks

I have the following requirement

  • Connect to a webserver with a username and password and get an authetication token
  • Read file to get different parameters
  • Use the auth token fro step 1 and parameters from step 2 to send an http request to the web server

Right now I have a single actor that carries out all of the above tasks as follows

package akka.first.java;

import akka.actor.UntypedActor;

public class MySingleActor extends UntypedActor {

    public void onReceive(Object msg) {

        if( msg instanceof sendRequest ) {

            //Connect to a webserver with a username and password and get an authetication token
            String token = getToken();
           // Read file to get different parameters
            Param param = readFile();
           // Use the auth token fro step 1 and parameters from step 2 to send an http request to the web server
            Response response = sendRequest (server, token, param);


        }

    }

    private Param readFile() {
        // reads file 
    }

    private String getToken() {
        //gets token 
    }
}

The readFile operation contains various subtasks that I think it should be a separate actor. But since the return from the readFile() operation is required for the actor to perform its main task of sending the request, this might be blocking which according to the docs is not recommended, what is the best way to go about this? Futures?

like image 688
user_mda Avatar asked Sep 13 '16 19:09

user_mda


People also ask

What is Akka good for?

Akka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala. Akka Insights is intelligent monitoring and observability purpose built for Akka.

What problems does Akka solve?

Akka allows you to focus on meeting business needs instead of writing low-level code to provide reliable behavior, fault tolerance, and high performance. Many common practices and accepted programming models do not address important challenges inherent in designing systems for modern computer architectures.

Is Akka widely used?

Akka framework is widely used to build applications because the applications built by Akka implement concurrency and multi-threading. Applications that are deployed in the real world should contain features like concurrency or multi-threading because they will serve multiple clients at a time.

Who uses Akka in production?

We see Akka being adopted by many large organizations in a big range of industries all from investment and merchant banking, retail and social media, simulation, gaming and betting, automobile and traffic systems, health care, data analytics and much more.


2 Answers

Official documentation offers following solutions:

  • Do the blocking call within an actor (or a set of actors managed by a router [Java, Scala]), making sure to configure a thread pool which is either dedicated for this purpose or sufficiently sized.
  • Do the blocking call within a Future, ensuring an upper bound on the number of such calls at any point in time (submitting an unbounded number of tasks of this nature will exhaust your memory or thread limits).
  • Do the blocking call within a Future, providing a thread pool with an upper limit on the number of threads which is appropriate for the hardware on which the application runs.
  • Dedicate a single thread to manage a set of blocking resources (e.g. a NIO selector driving multiple channels) and dispatch events as they occur as actor messages.

Using the futures is among the officially suggested approaches, however with extra care.

Let's consider the first approach because IMO it is more consistent.

First of all extract all the blocking IO operations into new actors that are performing only one blocking IO operation. Assume that there is only one such operation for brevity:

public class MyBlockingIOActor extends UntypedActor {
    public void onReceive(Object msg) {
        // do blocking IO call here and send the result back to sender
    }
}

Add configuration for dispatcher, that will take care of blocking actors, in actor system configuration file (usually application.conf):

#Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO
blocking-io-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 32
  }
  throughput = 1
}

Please, make sure that you use the configuration file when creating the actor system (especially if you decided to use non-standard file name for the configuration):

ActorSystem actorSystem = ActorSystem.create("my-actor-system", ConfigFactory.load("application.conf"));

After that you want to assign the actor which performs blocking IO to dedicated dispatcher. You can do it in the configuration as described here or when creating the actor:

ActorRef blockingActor = context().actorOf(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher"));

In order to get more throughput, consider wrapping blocking actor into pool:

SupervisorStrategy strategy = new OneForOneStrategy(
        5,
        Duration.create(1, TimeUnit.MINUTES),
        Collections.singletonList(Exception.class)
);
ActorRef blockingActor = context().actorOf(new SmallestMailboxPool(5).withSupervisorStrategy(strategy).props(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher")));

You can ensure that the actor uses the right dispatcher in the following way:

public class MyBlockingIOActor extends UntypedActor {
    public void preStart() {
        LOGGER.debug("using dispatcher: {}", ((Dispatcher)context().dispatcher()).id());
    }
}
like image 147
Oleg Kurbatov Avatar answered Oct 17 '22 04:10

Oleg Kurbatov


You can use Futures,or maybe RxJava with Observables and Observers. Or different actors and forward the final response to the orginial sender

  public class MySingleActor extends UntypedActor{

private ActorRef tokenActor;
private ActorRef readFileActor;

public MySingleActor(){
    tokenActor = context().actorOf(Props.create(TokenActor.class),"tokenActor");
    readFileActor = context().actorOf(Props.create(ReadFileActor.class),"readFileActor");
}
public void onReceive(Object msg) {
    if( msg instanceof sendRequest ) {
        Future<String> f= Futures.future(new Callable<String>() {
            @Override public String call() throws Exception {
                return getToken();
            }            },context().dispatcher());Patterns.pipe(f,context().dispatcher()).to(tokenActor).pipeTo(readFileActor,self());
    }       
}}

Or instead of pipe

f.onComplete(new OnComplete<String>(){ public void onComplete(Throwable t, String result){ readFileActor.tell(result,self()); } }, context().system().dispatcher());

like image 26
gaston Avatar answered Oct 17 '22 06:10

gaston