Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Scala ARM with Futures?

I want to implement the ARM (automated resource management) pattern, where the resource is used asynchronously.

The problem

Suppose my resource looks like:

class MyResource { 
  def foo() : Future[MyResource] = ???
  // Other methods returning various futures
  def close() : Unit = ???
}
object MyResource { 
  def open(name: String): Future[MyResource] = ???
} 

The desired usage pattern is:

val r : Future[MyResource] = MyResource.open("name")
r flatMap (r => {
  r.foo() /* map ... */ andThen {
    case _ => r.close()
  }
})

The elided mapped functions can be complex, involving branching and chaining futures that make repeated calls to methods of r that return Futures.

I want to make sure r.close() is called after all future continuations have completed (or failed). Doing this manually at every call-site is error prone. This calls for an ARM solution.

Attempted solutions

The scala-arm library is normally synchronous. This code wouldn't do the right thing, because close() would be called before the futures inside the block had completed:

for (r <- managed(MyResource.open("name"))) {
  r map (_.foo()) // map ...
}

I though of using this wrapper:

def usingAsync[T](opener: => Future[MyResource]) (body: MyResource => Future[T]) : Future[T] =
  opener flatMap { 
    myr => body(myr) andThen { case _ => myr.close() } }

Then the call site would look like:

usingAsync(MyResource.open("name")) ( myr => {
  myr.foo // map ...
})

But then, the code inside the block would be responsible for returning a Future that completed when all the other futures created by that block completed. If it accidentally didn't, then again the resource would be closed before all futures using it were complete. And there would be no static verification to catch this error. For example, this would be a runtime error:

usingAsync(MyResource.open("name")) ( myr => {
  myr.foo() // Do one thing
  myr.bar() // Do another
})

How to solve this?

Apparently, I could use scala-arm's delimited continuation support (CPS). It looks a bit complex and I'm afraid of getting it wrong. And it requires enabling a compiler plugin. Also, my team is very new to scala and I don't want to require them to use CPS.

Is CPS the only way forward? Is there a library or design pattern that does this more simply with Futures, or an example of doing this with scala-arm?

like image 252
danarmak Avatar asked Dec 21 '13 15:12

danarmak


People also ask

How do you use Future in Scala?

Future. Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.

What is Future and promise in Scala?

The Promise is a writable, single-assignment container that completes a Future. The Promise is similar to the Future. However, the Future is about the read-side of an asynchronous operation, while the Promise is about the write-side.

What is await result in Scala?

Await. result tries to return the Future result as soon as possible and throws an exception if the Future fails with an exception while Await. ready returns the completed Future from which the result (Success or Failure) can safely be extracted.

What Future sequence does?

This Future. sequence() function converts a list of Futures into a single Future that means collections of Futures into a single Future. In simple words, List[Future[T]] ======> Future[List[T]] . It is also known as composing Futures.


1 Answers

Reactive Extensions (Rx) could be an alternative solution. There is increasing momentum around this programming paradigm, that is now available in many languages including Scala.

The basis of Rx is to create an Observable which is a source of asynchronous events. Observable can be chained in sophisticated ways, this is it's power. You subscribe to an Observable to listen for onNext, onError and onComplete events. You also receive a Subscription back that allows you to cancel.

I think you would probably add a resource.close() call in and onCompleted and/or onError handler.

See RxScala docs for:

Observable.subscribe(
    onNext: (T) ⇒ Unit, 
    onError: (Throwable) ⇒ Unit, 
    onCompleted: () ⇒ Unit): Subscription

More info:

  • RxScala site: http://rxscala.github.io/
  • RxScala Observable: http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable
  • Good intro by Ben Christensen at NetFlix: http://www.infoq.com/presentations/netflix-functional-rx
  • Erik Meijer gives code examples of chained Observable and in the Coursera course Principles of Reactive Programming by Martin Odersky, Erik Meijer and Roland Kuhn.
like image 200
reggoodwin Avatar answered Oct 01 '22 18:10

reggoodwin