Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MapReduce implementation with Akka

I'm trying to implement MapReduce on top of Akka and was lucky to find the code of the book Akka Essentials. However, I have found two major issues with this example implementation, and both seem like fundamental concurrency design flaws which btw is quite shocking to find in a book about Akka:

  1. Upon completion the Client side will call shutdown() but at that point there is no guarantee that the messages went through to the WCMapReduceServer. I see that the WCMapReduceServer only gets a partial number of Client messages at any time and then WCMapReduceServer outputs [INFO] [06/25/2013 09:30:01.594] [WCMapReduceApp-5] [ActorSystem(WCMapReduceApp)] REMOTE: RemoteClientShutdown@akka://[email protected]:2552 meaning the Client shutdown() happens before the Client actually manages to flush all pending messages. In the Client code line 41 we see the shutdown() takes place without flushing first. Is there a way in Akka to enforce flushing outbound messages before shutting down the system?

  2. The other actually bigger flaw, which I already fixed, is the way used to signal EOF to the MapReduce server that the main task (file of words) is done given that all subtasks (each line of the file) are done. He sends a special String message DISPLAY_LIST and this message is queued with lowest priority see code. The big flaw here is that even though DISPLAY_LIST has the lowest priority, if any Map (or Reduce) task takes arbitrarily long, the DISPLAY_LIST message will go through before all the MapReduce subtasks have completed and therefore the outcome of this MapReduce example is non-deterministic i.e. you can get different dictionaries out of each run. The issue can be revealed by replacing the MapActor#onReceive implementation with the following i.e. make one Map step arbitrarily long:

    public void onReceive(Object message) {
        System.out.println("MapActor -> onReceive(" + message + ")");
        if (message instanceof String) {
            String work = (String) message;
            // ******** BEGIN SLOW DOWN ONE MAP REQUEST
            if ("Thieves! thieves!".equals(work)) {
                try {
                    System.out.println("*** sleeping!");
                    Thread.sleep(5000);
                    System.out.println("*** back!");
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // ******** END SLOW DOWN ONE MAP REQUEST
    
            // perform the work
            List<Result> list = evaluateExpression(work);
    
            // reply with the result
            actor.tell(list);
    
        } else throw new IllegalArgumentException("Unknown message [" + message + "]");
    }
    

Reading the book a bit further one finds:

We have Thread.sleep() because there is no guarantee in which order the messages are processed. The first Thread.sleep() method ensures that all the string sentence messages are processed completely before we send the Result message.

I'm sorry but Thread.sleep() has never been the means of ensuring anything in concurrency. Therefore no wonder books like this will end up full of fundamental concurrency flaws in their examples.

like image 906
SkyWalker Avatar asked Nov 13 '22 02:11

SkyWalker


1 Answers

I have solved both problems, and also migrated the code to the latest Akka version 2.2-M3.

The solution to the first issue is to have the MapReduce remote MasterActor send back a ShutdownInfo notification as soon as it gets the TaskInfo notification which is sent from the Client once all messages have been sent. The TaskInfo contains the information of how many subtasks a MapReduce task has e.g. in this case how many lines in the text file.

The solution to the second problem is sending the TaskInfo with the total number of subtasks. Here the AggregatorActor counts the number of subtasks it has processed, compares it to the TaskInfo and signals that the job is done when they match (currently just print a message).

The interesting and correct behavior is shown in the output:

  • ClientActor sends a bunch of messages which are "subtasks". Note that the Identity request pattern is used to gain access to the ActorRef of the remote MapReduce MasterActor.
  • ClientActor sends last the TaskInfo message saying how many subtasks were previously sent.
  • MasterActor forwards String messages to MapActor which in turns forwards to ReduceActor
  • One MapActor is a lengthy one namely the one with content "Thieves! thieves!" this slows the MapReduce computation a bit.
  • Meanwhile MasterActor receives the TaskInfo last message and sends back to ClientActor the ShudownInfo
  • ClientActor runs system.shutdown() and Client terminates. Note that the MapReduce is still in the middle of the processing and the Client shutdown does not interfere.
  • The lengthy MapActor comes back and the message processing continues.
  • AggregatorActor receives the TaskInfo and by counting the subtasks confirms that the total number of substasks have been completed and signals completion.

The code may be fetch from my repository: https://github.com/bravegag/akka-mapreduce-example

Feedback always welcome.

like image 188
SkyWalker Avatar answered Nov 14 '22 21:11

SkyWalker