For some reason I can't wrap my head around implementing this. I've got an application running with Play that calls out to Elastic Search. As part of my design, my service uses the Java API wrapped with scala future's as shown in this blog post. I've updated the code from that post to hint to the ExecutionContext that it will be doing some blocking I/O like so:
import scala.concurent.{blocking, Future, Promise}
import org.elasticsearch.action.{ActionRequestBuilder, ActionListener, ActionResponse }
def execute[RB <: ActionRequestBuilder[_, T, _, _]](request: RB): Future[T] = {
blocking {
request.execute(this)
promise.future
}
}
My actual service that constructs the queries to send to ES takes an executionContext as a constructor parameter that it then uses for calls to elastic search. I did this so that the global execution context that play uses won't have it's threads tied down by the blocking calls to ES. This S.O. comment mentions that only the global context is blocking aware, so that leaves me to have to create my own. In that same post/answer there's a lot of information about using a ForkJoin pool, but I'm not sure how to take what's written in those docs and combine it with the hints in the blocking documentation to create an execution context that responds to blocking hints.
I think one of the issues I have is that I'm not sure exactly how to respond to the blocking context in the first place? I was reading the best practices and the example it uses is an unbounded cache of threads:
Note that here I prefer to use an unbounded "cached thread-pool", so it doesn't have a limit. When doing blocking I/O the idea is that you've got to have enough threads that you can block. But if unbounded is too much, depending on use-case, you can later fine-tune it, the idea with this sample being that you get the ball rolling.
So does this mean that with my ForkJoin backed thread pool, that I should try to use a cached thread when dealing with non-blocking I/O and create a new thread for blocking IO? Or something else? Pretty much every resource I find online about using seperate thread pools tends to do what the Neophytes guide does, which is to say:
How to tune your various thread pools is highly dependent on your individual application and beyond the scope of this article.
I know it depends on your application, but in this case if I just want to create some type of blocking aware ExecutionContext and understand a decent strategy for managing the threads. If the Context is specifically for a single part of the application, should I just make a fixed thread pool size and not use/ignore the blocking
keyword in the first place?
I tend to ramble, so I'll try to break down what I'm looking for in an answer:
blocking
there.Sorry for the long question here, I'm just trying to give you a sense of what I'm looking at and that I have been trying to wrap my head around this for over a day and need some outside help.
Edit: Just to make this clear, the ElasticSearch Service's constructor signature is:
//Note that these are not implicit parameters!
class ElasticSearchService(otherParams ..., val executionContext: ExecutionContext)
And in my application start up code I have something like this:
object Global extends GlobalSettings {
val elasticSearchContext = //Custom Context goes here
...
val elasticSearchService = new ElasticSearchService(params, elasticSearchContext);
...
}
I am also reading through Play's recommendations for contexts, but have yet to see anything about blocking hints yet and I suspect I might have to go look into the source to see if they extend the BlockContext
trait.
So I dug into the documentation and Play's best practices for the situation I'm dealing with is to
In certain circumstances, you may wish to dispatch work to other thread pools. This may include CPU heavy work, or IO work, such as database access. To do this, you should first create a thread pool, this can be done easily in Scala:
And provides some code:
object Contexts {
implicit val myExecutionContext: ExecutionContext = Akka.system.dispatchers.lookup("my-context")
}
The context is from Akka, so I ran down there searching for the defaults and types of Contexts they offer, which eventually led me to the documentation on dispatchers. The default is a ForkJoinPool whose default method for managing a block is to call the managedBlock(blocker)
. This led me to reading the documentation that stated:
Blocks in accord with the given blocker. If the current thread is a ForkJoinWorkerThread, this method possibly arranges for a spare thread to be activated if necessary to ensure sufficient parallelism while the current thread is blocked.
So it seems like if I have a ForkJoinWorkerThread
then the behavior I think I want will take place. Looking at the source of ForkJoinPool some more I noted that the default thread factory is:
val defaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory = juc.ForkJoinPool.defaultForkJoinWorkerThreadFactory
Which implies to me that if I use the defaults in Akka, that I'll get a context which handles blocking in the way I expect.
So reading the Akka documentation again it would seem that specifying my context something like this:
my-context {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 8
parallelism-factor = 3.0
parallelism-max = 64
task-peeking-mode = "FIFO"
}
throughput = 100
}
would be what I want.
While I was searching in the source code I did some looking for uses of blocking
or of calling managedBlock
and found an example of overriding the ForkJoin behavior in ThreadPoolBuilder
private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext {
override def blockOn[T](thunk: ⇒ T)(implicit permission: CanAwait): T = {
val result = new AtomicReference[Option[T]](None)
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
def block(): Boolean = {
result.set(Some(thunk))
true
}
def isReleasable = result.get.isDefined
})
result.get.get // Exception intended if None
}
}
Which seems like what I originally asked for as an example of how to make something that implements the BlockContext. That file also has code showing how to make an ExecutorServiceFactory, which is what I believe
is reference by the executor
part of the configuration. So I think what I would do if I wanted to have
a totally custom context would be extend some type of WorkerThread and write my own ExecutorServiceFactory that uses the custom workerthread and then specify the fully qualified class name in the property like this post advises.
I'm probably going to go with using Akka's forkjoin :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With