Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Knowing when akka actors are finished

Tags:

java

akka

There are a few people working on a project along with me that have been trying to figure out the best way to deal with this issue. It seems this should be a standard thing wanted regularly, but for some reason we can't seem to get the right answer.

If I have some work to be done and I throw a bunch of messages at a router, how can I tell when all the work is done? For example, if we're reading lines of a 1 million line file and sending the line off to actors to process this, and you need to process the next file, but must wait for the first to complete, how can you know when it is complete?

One further comment. I'm aware and have used Await.result() and Await.ready() used with Patters.ask(). One difference is, each line would have a Future and we'd have a HUGE array of these futures to wait on, not just one. Additionally, we are populating a large domain model taking up considerable memory, and do not wish to add additional memory for holding an equal number of futures in memory waiting to be composed, while using actors each one completes after doing it's work not holding memory waiting to be composed.

We're using Java and not Scala.

Pseudo code:

for(File file : files) {
    ...
    while((String line = getNextLine(fileStream)) != null) {
        router.tell(line, this.getSelf());
    }
    // we need to wait for this work to finish to do the next
    // file because it's dependent on the previous work
}

It would seem you'd often want to do a lot of work and know when it's finished with actors.

like image 295
Steven Edison Avatar asked Oct 03 '22 13:10

Steven Edison


1 Answers

I believe I have a solution for you and it does not involve accumulating a whole bunch of Futures. First, the high level concept. There will be two actors participating in this flow. The first we'll call FilesProcessor. This actor will be short lived and stateful. Whenever you want to process a bunch of files sequentially, you spin up an instance of this actor and pass it a message containing the names (or paths) of the files you want to process. When it has completed processing of all of the files, it stops itself. The second actor we will call LineProcessor. This actor is stateless, long lived and pooled behind a router. It processes a file line and then responds back to whoever requested the line processing telling them it has completed processing that line. Now onto the code.

First the messages:

public class Messages {

  public static class ProcessFiles{
    public final List<String> fileNames;
    public ProcessFiles(List<String> fileNames){
      this.fileNames = fileNames;
    }
  }

  public static class ProcessLine{
    public final String line;
    public ProcessLine(String line){
      this.line = line;
    }
  }

  public static class LineProcessed{}

  public static LineProcessed LINE_PROCESSED = new LineProcessed();
}

And the FilesProcessor:

public class FilesProcessor extends UntypedActor{
  private List<String> files;
  private int awaitingCount;
  private ActorRef router;

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof ProcessFiles){      
      ProcessFiles pf = (ProcessFiles)msg;
      router = ... //lookup router;
      files = pf.fileNames;
      processNextFile();
    }
    else if (msg instanceof LineProcessed){
      awaitingCount--;
      if (awaitingCount <= 0){
        processNextFile();
      }
    }

  }

  private void processNextFile(){
    if (files.isEmpty()) getContext().stop(getSelf());
    else{            
      String file = files.remove(0);
      BufferedReader in = openFile(file);
      String input = null;
      awaitingCount = 0;

      try{
        while((input = in.readLine()) != null){
          router.tell(new Messages.ProcessLine(input), getSelf());
          awaitingCount++;
        }        
      }
      catch(IOException e){
        e.printStackTrace();
        getContext().stop(getSelf());
      }

    }
  }

  private BufferedReader openFile(String name){
    //do whetever to load file 
    ...
  }

}

And the LineProcessor:

public class LineProcessor extends UntypedActor{

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof ProcessLine){
      ProcessLine pl = (ProcessLine)msg;

      //Do whatever line processing...

      getSender().tell(Messages.LINE_PROCESSED, getSelf());
    }
  }

}

Now the line processor is sending a response back with no additional content. You could certainly change this if you needed to send something back based on the processing of the line. I'm sure this code is not bullet proof, I just wanted to show you a high level concept for how you could accomplish this flow without request/response semantics and Futures.

If you have any questions on this approach or want more detail, let me know and I'd be happy to provide it.

like image 156
cmbaxter Avatar answered Oct 10 '22 02:10

cmbaxter