Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use custom executor in Akka dispatcher

I am trying to use a custom executor for a dispatcher in Akka.

Specifically, I want to wrap an existing executor with my own (logging, debugging, etc.).

I've looked at relevant parts of documentation:

The default dispatcher can be configured, and is by default a Dispatcher with a “fork-join-executor”, which gives excellent performance in most cases.

and

specify using “executor” using “fork-join-executor”, “thread-pool-executor” or the FQCN of an akka.dispatcher.ExecutorServiceConfigurator

(I don't understand what FQCN is.)

How do I do this?

like image 749
Paul Draper Avatar asked Jul 30 '14 04:07

Paul Draper


People also ask

Which dispatcher is used for testing purpose in Akka?

The thread pool executor dispatcher is implemented using by a java. util. concurrent. ThreadPoolExecutor .

What does Akka dispatcher do?

Akka is mostly based on ActorSystem and as a result dispatchers are said to be the main engine of an ActorSystem. Hence the saying- dispatchers are what makes Akka “tick”. In Akka, they are responsible for selecting an actor and it's messages and assigning them to the CPU.

How can I send a message to an actor in Akka?

1) Akka Actor tell() Method It works on "fire-forget" approach. You can also use ! (bang) exclamation mark to send message. This is the preferred way of sending messages.

What is ActorSystem in Akka?

Companion object ActorSystem An actor system is a hierarchical group of actors which share common configuration, e.g. dispatchers, deployments, remote capabilities and addresses. It is also the entry point for creating or looking up actors. There are several possibilities for creating actors (see akka.


1 Answers

FQCN == Fully Qualified Class Name i.e. package name(s) + class name, ex:

java.lang.String <- FQCN for String

Here's an example of an ExecutorServiceConfigurator for the built in ThreadPoolExecutor:

class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {

  val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config

  protected def createThreadPoolConfigBuilder(config: Config, prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = {
    import akka.util.Helpers.ConfigOps
    ThreadPoolConfigBuilder(ThreadPoolConfig())
      .setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
      .setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
      .setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")
      .setMaxPoolSizeFromFactor(config getInt "max-pool-size-min", config getDouble "max-pool-size-factor", config getInt "max-pool-size-max")
      .configure(
        Some(config getInt "task-queue-size") flatMap {
          case size if size > 0 ⇒
            Some(config getString "task-queue-type") map {
              case "array"       ⇒ ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
              case "" | "linked" ⇒ ThreadPoolConfig.linkedBlockingQueue(size)
              case x             ⇒ throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
            } map { qf ⇒ (q: ThreadPoolConfigBuilder) ⇒ q.setQueueFactory(qf) }
          case _ ⇒ None
        })
  }

  def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
    threadPoolConfig.createExecutorServiceFactory(id, threadFactory)
}

Source: https://github.com/akka/akka/blob/v2.3.4/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L344

like image 174
Viktor Klang Avatar answered Oct 27 '22 11:10

Viktor Klang