Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why Play 2.5 Akka chunk response getting loaded all at once

I'm trying to implement chunk response in webapp using PLay 2 with Akka. However, instead of load the response by chunk by chunk all the response is coming as once. Below is the code by which I'm creating chunk in the controller:

/**
 * 
 */    
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.pmw.tinylog.Logger;
import play.cache.CacheApi;
import play.cache.Cached;
import play.filters.csrf.AddCSRFToken;
import play.filters.csrf.CSRF;
import play.libs.Json;
import play.libs.concurrent.HttpExecutionContext;
import play.mvc.Controller;
import play.mvc.Http;
import play.mvc.Http.Cookie;
import play.mvc.Result;

import akka.NotUsed;
import akka.actor.Status;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

/**
 * @author Abhinabyte
 *
 */
@Singleton
@AddCSRFToken
public class GetHandler extends Controller {

    @Inject
    private CacheApi cache;

    @Inject
    private HttpExecutionContext httpExecutionContext;

    public CompletionStage<Result> index() {

return CompletableFuture.supplyAsync( () ->
            Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
                    .mapMaterializedValue(sourceActor -> {

                        CompletableFuture.runAsync(() -> {
                            sourceActor.tell(ByteString.fromString("1"), null);
                            sourceActor.tell(ByteString.fromString("2"), null);
                            sourceActor.tell(ByteString.fromString("3"), null);
                            try {
                                Thread.sleep(3000);//intentional delay
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            sourceActor.tell(ByteString.fromString("444444444444444444444444444444444444444444444444444444444444444444444444"), null);
                            sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
                        });

                        return sourceActor;
                    })
    ).thenApplyAsync( chunks -> ok().chunked(chunks).as("text/html"));  

    }

}


And below is the Akka thread pool configuration at application.conf :

akka {
  jvm-exit-on-fatal-error = on
  actor {
    default-dispatcher {
      fork-join-executor {
        parallelism-factor = 1.0
        parallelism-max = 64
        task-peeking-mode = LIFO
      }
    }
  }
}

play.server.netty {
  eventLoopThreads = 0
  maxInitialLineLength = 4096
  log.wire = false
  transport = "native"
}

As you can see before sending last to last chunk I'm intentionally delaying the response time. So logically, all chunked data before it should be delivered before it.
However, in my case whole bunch of data is getting loaded. I've tested in all browser(even have tried to CURL).
What I'm missing in here?

like image 840
Abhinab Kanrar Avatar asked Aug 09 '16 05:08

Abhinab Kanrar


2 Answers

Blocking in mapMaterializedValue will do that because it runs in the Akka default-dispatcher thread, thus preventing message routing for the duration (see this answer for details). You want to dispatch your slow, blocking code asynchronously, with the actor reference for it to post messages to. Your example will do what you expect if you run it in a future:

public CompletionStage<Result> test() {
    return CompletableFuture.supplyAsync( () ->
            Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
                    .mapMaterializedValue(sourceActor -> {

                        CompletableFuture.runAsync(() -> {

                            for (int i = 0; i < 20; i++) {
                                sourceActor.tell(ByteString.fromString(String.valueOf(i) + "<br/>\n"), null);
                                try {
                                    Thread.sleep(500);//intentional delay
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                            sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
                        });

                        return sourceActor;
                    })
    ).thenApplyAsync( chunks -> ok().chunked(chunks).as("text/html"));
}
like image 195
Mikesname Avatar answered Nov 01 '22 09:11

Mikesname


If you check the Source code, you can see that the first parameter is bufferSize

public static <T> Source<T,ActorRef> actorRef(int bufferSize,
                                               OverflowStrategy overflowStrategy)

all your elements that you generate in the stream probably have less then 256 bytes, hence only one http chunk is generated. Try to add more elements like in @Mikesname example.

like image 30
Ion Cojocaru Avatar answered Nov 01 '22 08:11

Ion Cojocaru