I'm trying to implement MapReduce on top of Akka and was lucky to find the code of the book Akka Essentials. However, I have found two major issues with this example implementation, and both seem like fundamental concurrency design flaws which btw is quite shocking to find in a book about Akka:
Upon completion the Client side will call shutdown()
but at that point there is no guarantee that the messages went through to the WCMapReduceServer. I see that the WCMapReduceServer only gets a partial number of Client messages at any time and then WCMapReduceServer outputs [INFO] [06/25/2013 09:30:01.594] [WCMapReduceApp-5] [ActorSystem(WCMapReduceApp)] REMOTE: RemoteClientShutdown@akka://[email protected]:2552
meaning the Client shutdown()
happens before the Client actually manages to flush all pending messages. In the Client code line 41 we see the shutdown()
takes place without flushing first. Is there a way in Akka to enforce flushing outbound messages before shutting down the system?
The other actually bigger flaw, which I already fixed, is the way used to signal EOF to the MapReduce server that the main task (file of words) is done given that all subtasks (each line of the file) are done. He sends a special String message DISPLAY_LIST
and this message is queued with lowest priority see code. The big flaw here is that even though DISPLAY_LIST
has the lowest priority, if any Map (or Reduce) task takes arbitrarily long, the DISPLAY_LIST
message will go through before all the MapReduce subtasks have completed and therefore the outcome of this MapReduce example is non-deterministic i.e. you can get different dictionaries out of each run. The issue can be revealed by replacing the MapActor#onReceive implementation with the following i.e. make one Map step arbitrarily long:
public void onReceive(Object message) {
System.out.println("MapActor -> onReceive(" + message + ")");
if (message instanceof String) {
String work = (String) message;
// ******** BEGIN SLOW DOWN ONE MAP REQUEST
if ("Thieves! thieves!".equals(work)) {
try {
System.out.println("*** sleeping!");
Thread.sleep(5000);
System.out.println("*** back!");
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
// ******** END SLOW DOWN ONE MAP REQUEST
// perform the work
List<Result> list = evaluateExpression(work);
// reply with the result
actor.tell(list);
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
}
Reading the book a bit further one finds:
We have Thread.sleep() because there is no guarantee in which order the messages are processed. The first Thread.sleep() method ensures that all the string sentence messages are processed completely before we send the Result message.
I'm sorry but Thread.sleep()
has never been the means of ensuring anything in concurrency. Therefore no wonder books like this will end up full of fundamental concurrency flaws in their examples.
I have solved both problems, and also migrated the code to the latest Akka version 2.2-M3.
The solution to the first issue is to have the MapReduce remote MasterActor send back a ShutdownInfo notification as soon as it gets the TaskInfo notification which is sent from the Client once all messages have been sent. The TaskInfo contains the information of how many subtasks a MapReduce task has e.g. in this case how many lines in the text file.
The solution to the second problem is sending the TaskInfo with the total number of subtasks. Here the AggregatorActor counts the number of subtasks it has processed, compares it to the TaskInfo and signals that the job is done when they match (currently just print a message).
The interesting and correct behavior is shown in the output:
system.shutdown()
and Client terminates. Note that the MapReduce is still in the middle of the processing and the Client shutdown does not interfere.The code may be fetch from my repository: https://github.com/bravegag/akka-mapreduce-example
Feedback always welcome.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With