I'm trying to implement chunk response in webapp using PLay 2 with Akka. However, instead of load the response by chunk by chunk all the response is coming as once. Below is the code by which I'm creating chunk in the controller:
/**
*
*/
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.pmw.tinylog.Logger;
import play.cache.CacheApi;
import play.cache.Cached;
import play.filters.csrf.AddCSRFToken;
import play.filters.csrf.CSRF;
import play.libs.Json;
import play.libs.concurrent.HttpExecutionContext;
import play.mvc.Controller;
import play.mvc.Http;
import play.mvc.Http.Cookie;
import play.mvc.Result;
import akka.NotUsed;
import akka.actor.Status;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
/**
* @author Abhinabyte
*
*/
@Singleton
@AddCSRFToken
public class GetHandler extends Controller {
@Inject
private CacheApi cache;
@Inject
private HttpExecutionContext httpExecutionContext;
public CompletionStage<Result> index() {
return CompletableFuture.supplyAsync( () ->
Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
CompletableFuture.runAsync(() -> {
sourceActor.tell(ByteString.fromString("1"), null);
sourceActor.tell(ByteString.fromString("2"), null);
sourceActor.tell(ByteString.fromString("3"), null);
try {
Thread.sleep(3000);//intentional delay
} catch (InterruptedException e) {
e.printStackTrace();
}
sourceActor.tell(ByteString.fromString("444444444444444444444444444444444444444444444444444444444444444444444444"), null);
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
});
return sourceActor;
})
).thenApplyAsync( chunks -> ok().chunked(chunks).as("text/html"));
}
}
And below is the Akka thread pool configuration at application.conf :
akka {
jvm-exit-on-fatal-error = on
actor {
default-dispatcher {
fork-join-executor {
parallelism-factor = 1.0
parallelism-max = 64
task-peeking-mode = LIFO
}
}
}
}
play.server.netty {
eventLoopThreads = 0
maxInitialLineLength = 4096
log.wire = false
transport = "native"
}
As you can see before sending last to last chunk I'm intentionally delaying the response time. So logically, all chunked data before it should be delivered before it.
However, in my case whole bunch of data is getting loaded. I've tested in all browser(even have tried to CURL).
What I'm missing in here?
Blocking in mapMaterializedValue
will do that because it runs in the Akka default-dispatcher
thread, thus preventing message routing for the duration (see this answer for details). You want to dispatch your slow, blocking code asynchronously, with the actor reference for it to post messages to. Your example will do what you expect if you run it in a future:
public CompletionStage<Result> test() {
return CompletableFuture.supplyAsync( () ->
Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
CompletableFuture.runAsync(() -> {
for (int i = 0; i < 20; i++) {
sourceActor.tell(ByteString.fromString(String.valueOf(i) + "<br/>\n"), null);
try {
Thread.sleep(500);//intentional delay
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
});
return sourceActor;
})
).thenApplyAsync( chunks -> ok().chunked(chunks).as("text/html"));
}
If you check the Source code, you can see that the first parameter is bufferSize
public static <T> Source<T,ActorRef> actorRef(int bufferSize,
OverflowStrategy overflowStrategy)
all your elements that you generate in the stream probably have less then 256 bytes, hence only one http chunk is generated. Try to add more elements like in @Mikesname example.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With