Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Correct usage of scalaz Future for async execution

I guess i don't fully understand how scalaz Futures works. I'm trying to port one project from scala futures to scalaz implementation, but the problem is that performance with scalaz Future is lower. The simplest example is loading a profile on authentication request with Spray.

The function itself:

def loadProfile[A: CollectionProvider: JsonFormat](id: String) = future {
  remote.findOne[A]("id" :> id) match {
    case Some(profile) ⇒ \/-(profile)
    case None          ⇒ -\/(ProfileNotFoundRejection(id))
  }
}

scalaz version differs just in one symbol, i'm calling Future.apply from scalaz.concurrent. And now the Spray route which loads some html page:

get {
  path("profile" / "id" ~ Segment) { id ⇒
    onSuccess(loadProfile[User](id)) {
      case \/-(profile) ⇒ complete(html.page(profile))
      case -\/(pnfr)    ⇒ reject(pnfr)
    }
  }
}

As with loadProfile, scalaz version differs just in on method call:

get {
  path("profile" / "id" ~ Segment) { id ⇒
    ctx => loadProfile[User](id).runAsync {
      case \/-(profile) ⇒ ctx.complete(html.page(profile))
      case -\/(pnfr)    ⇒ ctx.reject(pnfr)
    }
  }
}

But the request with scala Future version completes in (around) 143ms, while scalaz version completes in 260ms. So i'm not much concerned about this particular request, but about asynchronous execution and scalability of service in general, as i understand in scalaz Future i have to fork execution to a separate thread manually, so it executes sequentially? Are there any good intro/tutorial into scalaz Future usage?

like image 323
4lex1v Avatar asked Nov 11 '22 21:11

4lex1v


1 Answers

scala and scalaz futures are very different:

Scala

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits._

// creating two slow futures:
val f: Future[Unit] = Future { println("f " + Thread.currentThread().getName()); Thread.sleep(10000);  }
val g: Future[Unit] = Future { println("g " + Thread.currentThread().getName()); Thread.sleep(10000);  }

// and after moment asking for success
f onSuccess { case _ => println("f s1 " + Thread.currentThread().getName()) }
g onSuccess { case _ => println("g s1 " + Thread.currentThread().getName()) }
f onSuccess { case _ => println("f s2 " + Thread.currentThread().getName()) }
g onSuccess { case _ => println("g s2 " + Thread.currentThread().getName()) }

We get an output, immediately after creating f and g

f ForkJoinPool-1-worker-5
g ForkJoinPool-1-worker-3

And rest output after ~10 seconds

f s1 ForkJoinPool-1-worker-5
g s1 ForkJoinPool-1-worker-5
f s2 ForkJoinPool-1-worker-5
g s2 ForkJoinPool-1-worker-5

Scalaz

import scalaz.concurrent._ // z!
import scala.concurrent.ExecutionContext.Implicits._

// creating two slow futures:
val f: Future[Unit] = Future { println("f " + Thread.currentThread().getName()); Thread.sleep(10000);  }
val g: Future[Unit] = Future { println("g " + Thread.currentThread().getName()); Thread.sleep(10000);  }

After creating f and g, nothing happens. We have:

f: scalaz.concurrent.Future[Unit] = Async(<function1>)
g: scalaz.concurrent.Future[Unit] = Async(<function1>)

But after run them we see the difference:

f runAsync { _ => println("f s1 " + Thread.currentThread().getName()) }
g runAsync { _ => println("g s1 " + Thread.currentThread().getName()) }
f runAsync { _ => println("f s2 " + Thread.currentThread().getName()) }
g runAsync { _ => println("g s2 " + Thread.currentThread().getName()) }

We get result:

f pool-4-thread-2
g pool-4-thread-1
f pool-4-thread-4
g pool-4-thread-3

f s2 pool-4-thread-4
g s2 pool-4-thread-3
g s1 pool-4-thread-1
f s1 pool-4-thread-2

There are two points worth mentioning:

  • the Futures f and g are executed again. There is no value memoisation.
  • The runAsync callback is executed in the same thread as the first computation. This because we don't explicitly fork.

It's hard to say why they perform differently in your example. The most time should be spend in remove.findOne anyway. You want to use scala.concurrent.blocking around blocking calls to help ExecutorService not to run into thread starvation (in both cases).

like image 161
phadej Avatar answered Nov 15 '22 06:11

phadej