Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best practices for dealing with exceptions in Akka actors

Tags:

java

akka

I have the following task, for which I have Java/Executors solution working well but I'd like to implement same functionality in Akka and looking for best practices suggestions.

Problem:

Fetch/parse data from multiple URLs in parallel, block till all data to be fetched and return aggregated result. Should retry on errors (IOException etc) up to certain number of times.

My implementation so far is pretty straightforward - create Fetcher actor which knows what URLs should be fetched, it creates bunch of Worker actors and send them URLs, one per message. Once done with particular URL Worker send message back to Fetcher with a result. Fetcher keeps state of results, Workers stateless. Simplified code below.

Fetcher:

class Fetcher extends UntypedActor {
  private ActorRef worker;

  public void onReceive(Object message) throws Exception {
    if (message instanceof FetchMessage) {
      this.worker = context().actorOf(SpringExtension.SpringExtProvider.get(actorSystem).props("Worker")
              .withRouter(new RoundRobinPool(4)), "worker");
      for(URL u: urls) {
        this.worker.tell(new WorkUnit(u), getSelf());
      }
   }
   else if (message instanceof Result) {
     // accumulate results
   }
}

Worker:

class Worker extends UntypedActor {

  public void onReceive(Object message) throws Exception {
    if (message instanceof WorkUnit) {
      // fetch URL, parse etc
      // send result back to sender
      getSender().tell(new Result(...), null);
    }
}

So far so good and in absence of exceptions everything works as expected.

But if there is say IOException in fetching URL in Worker then Akka would restart Worker actor but message that Worker was processing at the time is lost. Even if I use different SupervisorStrategy the result is the same - some of the messages are effectively 'lost'. Of course I could have wrapped code inside Worker.onReceive() with try/catch but I feel that this goes against Akka philosophy. I guess I could use persistent messaging but I don't think added complexity of message persistence is justified in this case.

I need perhaps some way for a Fetcher to figure out that Worker failed on fetching some of the URLs and resend WorkUnit again or detect that some Results are not coming back for too long. What would be the best approach to handle this case?

Thanks,

like image 754
maximdim Avatar asked May 30 '14 01:05

maximdim


People also ask

What are the different ways to handle exceptions with an example?

Example: Exception handling using try... In the example, we are trying to divide a number by 0 . Here, this code generates an exception. To handle the exception, we have put the code, 5 / 0 inside the try block. Now when an exception occurs, the rest of the code inside the try block is skipped.

What happens when an actor fails in Akka?

When an actor throws an unexpected exception, a failure, while processing a message or during initialization, the actor will by default be stopped.


1 Answers

We had a similar problem in our project and we found a solution which works for us - the tasks are executed regardless exceptions, worker failures, network failures etc. Although I must admit that the code eventually became a bit complicated.

So our setup is the following:

  1. There is a WorkerControl actor that handles the task management and communication with the workers
  2. There is a number of Worker actors that live in a different VM (potentially on different physical machines)
  3. WorkerControl receives some data to be processed and dispatches the tasks between the workers

More or less we tried to follow the guidelines described here

But we also improved the failure tolerance of the design.

In the WorkerControl we keep the following data structures:

Map<ActorPath, ActorRef> registeredWorkers // registry of workers
Deque<TaskInfo> todoList                   // tasks that have not been yet processed
Map<ActorRef, TaskInfo> assignedTasks      // tasks assigned to the workers
Map<ActorPath, ActorRef> deadWorkers       // registry of dead workers

For each task to be executed we keep a data structure

class TaskInfo {
    private final WorkerTask task;
    private int failureCount = 0;
    private int restartCount = 1;
    private Date latestResultDelivery;
}

We handle the following list of possible failures

Worker fails the task by throwing an exception (i.e. IOException in your case)

We deliver a new Failure(caughtException) message to the worker control. Upon seeing it worker control increments the failureCount and puts the task in the head of todoList queue. When a given number of failures is reached the task is considered permanently failed and is never retried. (After that the permanently failed tasks can be logged, disposed, handled in a custom way).

Worker does not deliver any result in a given period of time (e.g. he fell into an infinite loop, resource contention on the worker machine, worker mysteriously disappeared, task processing taking too long)

We do two things for this

  1. We initialize the latestResultDelivery field of the taskInfo and store the task assignment in the assignedTasks map.
  2. We periodically run a "health check" on the worker control that determines whether a worker has been working on a certain task for too long.

    for (ActorRef busyWorker : assignedTasks.keySet()) {
        Date now = new Date();
        if (now.getTime()
                - assignedTasks.get(busyWorker).getLatestResultDeliveryTime() >= 0) {
            logger.warn("{} has failed to deliver the data processing result in time", nameOf(busyWorker));
            logger.warn("{} will be marked as dead", nameOf(busyWorker));
            getSelf().tell(new Failure(new IllegalStateException("Worker did not deliver any result in time")),
                    busyWorker);
            registeredWorkers.remove(busyWorker.path());
            deadWorkers.put(busyWorker.path(), busyWorker);
        }
    }

Network disconnects, worker process dying

Again we do two things:

  1. Upon worker registration with the worker control we start watching the worker actor

    registeredWorkers.put(worker.path(), worker);
    context().watch(worker);
  2. If we receive a Terminated message in the worker control we increment the restartCount and return the task back to the todoList. Again the task that has been restarted too many times eventually becomes a permanently failed and is never retried again. This is done for a situation when the task itself becomes the cause of the remote worker death (e.g. remote system shutdown due to OutOfMemoryError). We keep separate counters for failures and restarts to be able to better precise the retrying strategies.

We also do some attempts to be failure tolerant in the worker itself. E.g. the worker controls the execution time of his tasks, and also monitors if he has been doing anything at all recently.

Depending on the types of failures you need to handle you can implement a subset of the listed strategies.

Bottom line: as it was mentioned in one of the comments: in order to get task rescheduling you will need to keep some data structure in your Fetcher that maps the workers and assigned tasks.

like image 106
Sergii Vozniuk Avatar answered Oct 31 '22 10:10

Sergii Vozniuk