I am trying to implicitly add Async and Sync in my code for doobie repository. The Sync and Async[F] works fine IO. I want to convert them to Future and facing problem
I have tried to create my own Aync from IO
def futureAsync(implicit F: MonadError[Future, Throwable]): Async[Future] = new Async[Future] {
override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] = IO.async(k).unsafeToFuture()
override def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] =
throw new Exception("Not implemented Future.asyncF")
override def suspend[A](thunk: => Future[A]): Future[A] = thunk
override def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(release: (A, ExitCase[Throwable]) => Future[Unit]): Future[B] =
throw new Exception("Not implemented Future.bracketCase")
override def raiseError[A](e: Throwable): Future[A] = F.raiseError(e)
override def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] = F.handleErrorWith(fa)(_ => f(new Exception("")))
override def pure[A](x: A): Future[A] = F.pure(x)
override def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = F.flatMap(fa)(f)
override def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = F.tailRecM(a)(f)
}
I am struck with implementation of two functions in there asyncF and bracketCase Can some one help?
A Future represents an eventual result of an asynchronous operation. Not thread-safe. Future is an awaitable object. Coroutines can await on Future objects until they either have a result or an exception set, or until they are cancelled. A Future can be awaited multiple times and the result is same.
An awaitable object is an object that defines __await__() method returning an iterator. Not much to add here. Just return an iterator from that method. The only thing you need to understand is how does it work.
The method create_task takes a coroutine object as a parameter and returns a Task object, which inherits from asyncio. Future . The call creates the task inside the event loop for the current thread, and starts the task executing at the beginning of the coroutine's code-block.
In asyncio Coroutine can be created by using async keyword before def. To run an async function (coroutine) you have to call it using an Event Loop. Event Loops: You can think of Event Loop as functions to run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses.
As Reactormonk says in a comment above, it's not possible to write an instance of Async
for Future
that has the right semantics, because Async
extends Sync
, and Sync
requires a representation of a computation that can be run repeatedly, while Scala's futures begin running when they're defined and can't be re-run.
It's instructive to see this for yourself, though, and I'd encourage you to try to write your own compile-able but (necessarily) unlawful Async[Future]
instance without looking at the next block of code. For the sake of the example, though, here's a quick sketch off the top of my head:
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
import cats.effect.{Async, ExitCase, IO}
def futureAsync(implicit c: ExecutionContext): Async[Future] = new Async[Future] {
def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] =
IO.async(k).unsafeToFuture()
def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] = {
val p = Promise[A]()
val f = k {
case Right(a) => p.success(a)
case Left(e) => p.failure(e)
}
f.flatMap(_ => p.future)
}
def suspend[A](thunk: => Future[A]): Future[A] = Future(thunk).flatten
def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(
release: (A, ExitCase[Throwable]) => Future[Unit]
): Future[B] = acquire.flatMap { a =>
use(a).transformWith {
case Success(b) => release(a, ExitCase.Completed).map(_ => b)
case Failure(e) => release(a, ExitCase.Error(e)).flatMap(_ => Future.failed(e))
}
}
def raiseError[A](e: Throwable): Future[A] = Future.failed(e)
def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] =
fa.recoverWith { case t => f(t) }
def pure[A](x: A): Future[A] = Future.successful(x)
def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f)
def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = f(a).flatMap {
case Right(b) => Future.successful(b)
case Left(a) => tailRecM(a)(f)
}
}
This will compile just fine, and would probably work for some situations (but please don't actually use it!). We've said it can't have the right semantics, though, and we can show that by using cats-effect's laws module.
First we need some boilerplate-y stuff you don't really need to worry about:
import cats.kernel.Eq, cats.implicits._
import org.scalacheck.Arbitrary
implicit val throwableEq: Eq[Throwable] = Eq.by[Throwable, String](_.toString)
implicit val nonFatalArbitrary: Arbitrary[Throwable] =
Arbitrary(Arbitrary.arbitrary[Exception].map(identity))
implicit def futureEq[A](implicit A: Eq[A], ec: ExecutionContext): Eq[Future[A]] =
new Eq[Future[A]] {
private def liftToEither(f: Future[A]): Future[Either[Throwable, A]] =
f.map(Right(_)).recover { case e => Left(e) }
def eqv(fx: Future[A], fy: Future[A]): Boolean =
scala.concurrent.Await.result(
liftToEither(fx).zip(liftToEither(fy)).map {
case (rx, ry) => rx === ry
},
scala.concurrent.duration.Duration(1, "second")
)
}
Then we can define a test that checks the Async
laws for our instance:
import cats.effect.laws.discipline.{AsyncTests, Parameters}
import org.scalatest.FunSuite
import org.typelevel.discipline.scalatest.Discipline
object FutureAsyncSuite extends FunSuite with Discipline {
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val params: Parameters =
Parameters.default.copy(allowNonTerminationLaws = false)
checkAll(
"Async",
AsyncTests[Future](futureAsync).async[String, String, String]
)
}
And then we can run the law tests:
scala> FutureAsyncSuite.execute()
FutureAsyncSuite:
- Async.async.acquire and release of bracket are uncancelable
- Async.async.ap consistent with product + map
- Async.async.applicative homomorphism
...
You'll see that most of the tests are green; this instance gets a lot of things right.
It does show three failed tests, though, including the following:
- Async.async.repeated sync evaluation not memoized *** FAILED ***
GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
(Discipline.scala:14)
Falsified after 1 successful property evaluations.
Location: (Discipline.scala:14)
Occurred when passed generated values (
arg0 = "淳칇멀",
arg1 = org.scalacheck.GenArities$$Lambda$7154/1834868832@1624ea25
)
Label of failing property:
Expected: Future(Success(驅ṇ숆㽝珅뢈矉))
Received: Future(Success(淳칇멀))
If you look at the laws definitions, you'll see that this is a test that defines a Future
value with delay
and then sequences it multiple times, like this:
val change = F.delay { /* observable side effect here */ }
val read = F.delay(cur)
change *> change *> read
The other two failures are similar "not memoized" violations. These tests should see the side effect happen twice, but in our case it's not possible to write delay
or suspend
for Future
in such a way that that would happen (it's worth trying, though, to convince yourself that this is the case).
To sum up: you can write an Async[Future]
instance that will pass something like 75 of the 78 Async
laws tests, but it's not possible to write an instance that will pass all of them, and using an unlawful instance is a really bad idea: both potential users of your code and libraries like Doobie will assume that your instances are lawful, and if you don't live up to this assumption you're opening the door to complex and annoying bugs.
It's worth noting that it's not too hard to write a minimal wrapper for Future
that has a lawful Async
instance (for example I've got a wrapper for Twitter's future called Rerunnable
in my catbird library). You really should just stick with cats.effect.IO
, though, and use the provided conversions to convert to and from futures in any parts of your code where you're working with traditional Future
-based APIs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With