I have the following task, for which I have Java/Executors solution working well but I'd like to implement same functionality in Akka and looking for best practices suggestions.
Problem:
Fetch/parse data from multiple URLs in parallel, block till all data to be fetched and return aggregated result. Should retry on errors (IOException etc) up to certain number of times.
My implementation so far is pretty straightforward - create Fetcher actor which knows what URLs should be fetched, it creates bunch of Worker actors and send them URLs, one per message. Once done with particular URL Worker send message back to Fetcher with a result. Fetcher keeps state of results, Workers stateless. Simplified code below.
Fetcher:
class Fetcher extends UntypedActor {
private ActorRef worker;
public void onReceive(Object message) throws Exception {
if (message instanceof FetchMessage) {
this.worker = context().actorOf(SpringExtension.SpringExtProvider.get(actorSystem).props("Worker")
.withRouter(new RoundRobinPool(4)), "worker");
for(URL u: urls) {
this.worker.tell(new WorkUnit(u), getSelf());
}
}
else if (message instanceof Result) {
// accumulate results
}
}
Worker:
class Worker extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof WorkUnit) {
// fetch URL, parse etc
// send result back to sender
getSender().tell(new Result(...), null);
}
}
So far so good and in absence of exceptions everything works as expected.
But if there is say IOException in fetching URL in Worker then Akka would restart Worker actor but message that Worker was processing at the time is lost. Even if I use different SupervisorStrategy the result is the same - some of the messages are effectively 'lost'. Of course I could have wrapped code inside Worker.onReceive() with try/catch but I feel that this goes against Akka philosophy. I guess I could use persistent messaging but I don't think added complexity of message persistence is justified in this case.
I need perhaps some way for a Fetcher to figure out that Worker failed on fetching some of the URLs and resend WorkUnit again or detect that some Results are not coming back for too long. What would be the best approach to handle this case?
Thanks,
Example: Exception handling using try... In the example, we are trying to divide a number by 0 . Here, this code generates an exception. To handle the exception, we have put the code, 5 / 0 inside the try block. Now when an exception occurs, the rest of the code inside the try block is skipped.
When an actor throws an unexpected exception, a failure, while processing a message or during initialization, the actor will by default be stopped.
We had a similar problem in our project and we found a solution which works for us - the tasks are executed regardless exceptions, worker failures, network failures etc. Although I must admit that the code eventually became a bit complicated.
So our setup is the following:
WorkerControl
actor that handles the task management and communication with the workersWorkerControl
receives some data to be processed and dispatches the tasks between the workersMore or less we tried to follow the guidelines described here
But we also improved the failure tolerance of the design.
In the WorkerControl we keep the following data structures:
Map<ActorPath, ActorRef> registeredWorkers // registry of workers
Deque<TaskInfo> todoList // tasks that have not been yet processed
Map<ActorRef, TaskInfo> assignedTasks // tasks assigned to the workers
Map<ActorPath, ActorRef> deadWorkers // registry of dead workers
For each task to be executed we keep a data structure
class TaskInfo {
private final WorkerTask task;
private int failureCount = 0;
private int restartCount = 1;
private Date latestResultDelivery;
}
We handle the following list of possible failures
Worker fails the task by throwing an exception (i.e. IOException in your case)
We deliver a new Failure(caughtException)
message to the worker control. Upon seeing it worker control increments the failureCount
and puts the task in the head of todoList
queue. When a given number of failures is reached the task is considered permanently failed and is never retried. (After that the permanently failed tasks can be logged, disposed, handled in a custom way).
Worker does not deliver any result in a given period of time (e.g. he fell into an infinite loop, resource contention on the worker machine, worker mysteriously disappeared, task processing taking too long)
We do two things for this
latestResultDelivery
field of the taskInfo
and store the task assignment in the assignedTasks
map.for (ActorRef busyWorker : assignedTasks.keySet()) { Date now = new Date(); if (now.getTime() - assignedTasks.get(busyWorker).getLatestResultDeliveryTime() >= 0) { logger.warn("{} has failed to deliver the data processing result in time", nameOf(busyWorker)); logger.warn("{} will be marked as dead", nameOf(busyWorker)); getSelf().tell(new Failure(new IllegalStateException("Worker did not deliver any result in time")), busyWorker); registeredWorkers.remove(busyWorker.path()); deadWorkers.put(busyWorker.path(), busyWorker); } }
Network disconnects, worker process dying
Again we do two things:
Upon worker registration with the worker control we start watching the worker actor
registeredWorkers.put(worker.path(), worker); context().watch(worker);
If we receive a Terminated
message in the worker control we increment the restartCount
and return the task back to the todoList
. Again the task that has been restarted too many times eventually becomes a permanently failed and is never retried again. This is done for a situation when the task itself becomes the cause of the remote worker death (e.g. remote system shutdown due to OutOfMemoryError). We keep separate counters for failures and restarts to be able to better precise the retrying strategies.
We also do some attempts to be failure tolerant in the worker itself. E.g. the worker controls the execution time of his tasks, and also monitors if he has been doing anything at all recently.
Depending on the types of failures you need to handle you can implement a subset of the listed strategies.
Bottom line: as it was mentioned in one of the comments: in order to get task rescheduling you will need to keep some data structure in your Fetcher that maps the workers and assigned tasks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With