Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create Async[Future] from Async[IO]

I am trying to implicitly add Async and Sync in my code for doobie repository. The Sync and Async[F] works fine IO. I want to convert them to Future and facing problem

I have tried to create my own Aync from IO

def futureAsync(implicit F: MonadError[Future, Throwable]): Async[Future] = new Async[Future] {
    override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] = IO.async(k).unsafeToFuture()

    override def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] =
      throw new Exception("Not implemented Future.asyncF")

    override def suspend[A](thunk: => Future[A]): Future[A] = thunk

    override def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(release: (A, ExitCase[Throwable]) => Future[Unit]): Future[B] =
      throw new Exception("Not implemented Future.bracketCase")

    override def raiseError[A](e: Throwable): Future[A] = F.raiseError(e)

    override def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] = F.handleErrorWith(fa)(_ => f(new Exception("")))

    override def pure[A](x: A): Future[A] = F.pure(x)

    override def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = F.flatMap(fa)(f)

    override def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = F.tailRecM(a)(f)
  }

I am struck with implementation of two functions in there asyncF and bracketCase Can some one help?

like image 344
user11034858 Avatar asked May 03 '19 10:05

user11034858


People also ask

What is Future in Python async?

A Future represents an eventual result of an asynchronous operation. Not thread-safe. Future is an awaitable object. Coroutines can await on Future objects until they either have a result or an exception set, or until they are cancelled. A Future can be awaited multiple times and the result is same.

What is Awaitable Python?

An awaitable object is an object that defines __await__() method returning an iterator. Not much to add here. Just return an iterator from that method. The only thing you need to understand is how does it work.

What is Asyncio Create_task?

The method create_task takes a coroutine object as a parameter and returns a Task object, which inherits from asyncio. Future . The call creates the task inside the event loop for the current thread, and starts the task executing at the beginning of the coroutine's code-block.

How do you create async function in Python?

In asyncio Coroutine can be created by using async keyword before def. To run an async function (coroutine) you have to call it using an Event Loop. Event Loops: You can think of Event Loop as functions to run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses.


1 Answers

As Reactormonk says in a comment above, it's not possible to write an instance of Async for Future that has the right semantics, because Async extends Sync, and Sync requires a representation of a computation that can be run repeatedly, while Scala's futures begin running when they're defined and can't be re-run.

An unlawful instance

It's instructive to see this for yourself, though, and I'd encourage you to try to write your own compile-able but (necessarily) unlawful Async[Future] instance without looking at the next block of code. For the sake of the example, though, here's a quick sketch off the top of my head:

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
import cats.effect.{Async, ExitCase, IO}

def futureAsync(implicit c: ExecutionContext): Async[Future] = new Async[Future] {
  def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] =
    IO.async(k).unsafeToFuture()

  def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] = {
    val p = Promise[A]()
    val f = k {
      case Right(a) => p.success(a)
      case Left(e) => p.failure(e)
    }
    f.flatMap(_ => p.future)
  }

  def suspend[A](thunk: => Future[A]): Future[A] = Future(thunk).flatten

  def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(
    release: (A, ExitCase[Throwable]) => Future[Unit]
  ): Future[B] = acquire.flatMap { a =>
    use(a).transformWith {
      case Success(b) => release(a, ExitCase.Completed).map(_ => b)
      case Failure(e) => release(a, ExitCase.Error(e)).flatMap(_ => Future.failed(e))
    }
  }

  def raiseError[A](e: Throwable): Future[A] = Future.failed(e)
  def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] =
    fa.recoverWith { case t => f(t) }

  def pure[A](x: A): Future[A] = Future.successful(x)
  def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f)
  def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = f(a).flatMap {
    case Right(b) => Future.successful(b)
    case Left(a) => tailRecM(a)(f)
  }
}

This will compile just fine, and would probably work for some situations (but please don't actually use it!). We've said it can't have the right semantics, though, and we can show that by using cats-effect's laws module.

Checking the laws

First we need some boilerplate-y stuff you don't really need to worry about:

import cats.kernel.Eq, cats.implicits._
import org.scalacheck.Arbitrary

implicit val throwableEq: Eq[Throwable] =  Eq.by[Throwable, String](_.toString)
implicit val nonFatalArbitrary: Arbitrary[Throwable] =
  Arbitrary(Arbitrary.arbitrary[Exception].map(identity))

implicit def futureEq[A](implicit A: Eq[A], ec: ExecutionContext): Eq[Future[A]] =
  new Eq[Future[A]] {
    private def liftToEither(f: Future[A]): Future[Either[Throwable, A]] =
      f.map(Right(_)).recover { case e => Left(e) }

      def eqv(fx: Future[A], fy: Future[A]): Boolean =
        scala.concurrent.Await.result(
        liftToEither(fx).zip(liftToEither(fy)).map {
          case (rx, ry) => rx === ry
        },
        scala.concurrent.duration.Duration(1, "second")
      )
  }

Then we can define a test that checks the Async laws for our instance:

import cats.effect.laws.discipline.{AsyncTests, Parameters}
import org.scalatest.FunSuite
import org.typelevel.discipline.scalatest.Discipline

object FutureAsyncSuite extends FunSuite with Discipline {
  implicit val ec: ExecutionContext = ExecutionContext.global

  implicit val params: Parameters =
    Parameters.default.copy(allowNonTerminationLaws = false)

  checkAll(
    "Async",
    AsyncTests[Future](futureAsync).async[String, String, String]
  )
}

And then we can run the law tests:

scala> FutureAsyncSuite.execute()
FutureAsyncSuite:
- Async.async.acquire and release of bracket are uncancelable
- Async.async.ap consistent with product + map
- Async.async.applicative homomorphism
...

You'll see that most of the tests are green; this instance gets a lot of things right.

Where it breaks the law

It does show three failed tests, though, including the following:

- Async.async.repeated sync evaluation not memoized *** FAILED ***
  GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
   (Discipline.scala:14)
    Falsified after 1 successful property evaluations.
    Location: (Discipline.scala:14)
    Occurred when passed generated values (
      arg0 = "淳칇멀",
      arg1 = org.scalacheck.GenArities$$Lambda$7154/1834868832@1624ea25
    )
    Label of failing property:
      Expected: Future(Success(驅ṇ숆㽝珅뢈矉))
  Received: Future(Success(淳칇멀))

If you look at the laws definitions, you'll see that this is a test that defines a Future value with delay and then sequences it multiple times, like this:

val change = F.delay { /* observable side effect here */ }
val read = F.delay(cur)

change *> change *> read

The other two failures are similar "not memoized" violations. These tests should see the side effect happen twice, but in our case it's not possible to write delay or suspend for Future in such a way that that would happen (it's worth trying, though, to convince yourself that this is the case).

What you should do instead

To sum up: you can write an Async[Future] instance that will pass something like 75 of the 78 Async laws tests, but it's not possible to write an instance that will pass all of them, and using an unlawful instance is a really bad idea: both potential users of your code and libraries like Doobie will assume that your instances are lawful, and if you don't live up to this assumption you're opening the door to complex and annoying bugs.

It's worth noting that it's not too hard to write a minimal wrapper for Future that has a lawful Async instance (for example I've got a wrapper for Twitter's future called Rerunnable in my catbird library). You really should just stick with cats.effect.IO, though, and use the provided conversions to convert to and from futures in any parts of your code where you're working with traditional Future-based APIs.

like image 168
Travis Brown Avatar answered Oct 08 '22 08:10

Travis Brown