I am trying to use a custom executor for a dispatcher in Akka.
Specifically, I want to wrap an existing executor with my own (logging, debugging, etc.).
I've looked at relevant parts of documentation:
The default dispatcher can be configured, and is by default a Dispatcher with a “fork-join-executor”, which gives excellent performance in most cases.
and
specify using “executor” using “fork-join-executor”, “thread-pool-executor” or the FQCN of an akka.dispatcher.ExecutorServiceConfigurator
(I don't understand what FQCN is.)
How do I do this?
The thread pool executor dispatcher is implemented using by a java. util. concurrent. ThreadPoolExecutor .
Akka is mostly based on ActorSystem and as a result dispatchers are said to be the main engine of an ActorSystem. Hence the saying- dispatchers are what makes Akka “tick”. In Akka, they are responsible for selecting an actor and it's messages and assigning them to the CPU.
1) Akka Actor tell() Method It works on "fire-forget" approach. You can also use ! (bang) exclamation mark to send message. This is the preferred way of sending messages.
Companion object ActorSystem An actor system is a hierarchical group of actors which share common configuration, e.g. dispatchers, deployments, remote capabilities and addresses. It is also the entry point for creating or looking up actors. There are several possibilities for creating actors (see akka.
FQCN == Fully Qualified Class Name i.e. package name(s) + class name, ex:
java.lang.String <- FQCN for String
Here's an example of an ExecutorServiceConfigurator for the built in ThreadPoolExecutor:
class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config
protected def createThreadPoolConfigBuilder(config: Config, prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = {
import akka.util.Helpers.ConfigOps
ThreadPoolConfigBuilder(ThreadPoolConfig())
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
.setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")
.setMaxPoolSizeFromFactor(config getInt "max-pool-size-min", config getDouble "max-pool-size-factor", config getInt "max-pool-size-max")
.configure(
Some(config getInt "task-queue-size") flatMap {
case size if size > 0 ⇒
Some(config getString "task-queue-type") map {
case "array" ⇒ ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
case "" | "linked" ⇒ ThreadPoolConfig.linkedBlockingQueue(size)
case x ⇒ throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
} map { qf ⇒ (q: ThreadPoolConfigBuilder) ⇒ q.setQueueFactory(qf) }
case _ ⇒ None
})
}
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
threadPoolConfig.createExecutorServiceFactory(id, threadFactory)
}
Source: https://github.com/akka/akka/blob/v2.3.4/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L344
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With