Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use the free monad with Future[M[_]]

I have implemented a simple language for an ETL process using the free monad. When using List as input and output for both data fetching and storing, everything works fine. However I am using async libraries and work with Future[List]

common imports and definitions

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import cats.free.Free
import cats.free.Free._

sealed trait Ops[A]
type OpsF[A] = Free[Ops, A]

working with List

case class Fetch(offset: Int, amount: Int) extends Ops[List[Record]]
case class Store(recs: List[Record]) extends Ops[List[Response]]

def fetch(offset: Int, amount: Int): OpsF[List[Record]] = 
    liftF[Ops, List[Record]](Fetch(offset, amount))
def store(recs: List[Record]): OpsF[List[Response]] = 
    liftF[Ops, List[Response]](Store(recs))

def simpleEtl(offset: Int, amount: Int): Free[Ops, List[Response]]  = 
    fetch(offset, amount).flatMap(r => store(r))

not working with Future[List]

case class Fetch(offset: Int, amount: Int) extends Ops[Future[List[Record]]]
case class Store(recs: List[Record]) extends Ops[Future[List[Response]]]

def fetch(offset: Int, amount: Int): OpsF[Future[List[Record]]] = 
    liftF[Ops, Future[List[Record]]](Fetch(offset, amount))
def store(recs: List[Record]): OpsF[Future[List[Response]]] = 
    liftF[Ops, Future[List[Response]]](Store(recs))

// explicit types in case I am misunderstanding more than I think
def simpleEtl(offset: Int, amount: Int): Free[Ops, Future[List[Response]]] = 
fetch(offset, amount).flatMap { rf: Future[List[Record]] =>
  val getResponses: OpsF[Future[List[Response]]] = rf map { r: List[Record] =>
    store(r)
  }
  getResponses
}

as expected, the type returned from the flatMap/map is wrong - I am not getting OpsF[Future] but a Future[OpsF]

Error:(34, 60) type mismatch;
 found   :  scala.concurrent.Future[OpsF[scala.concurrent.Future[List[Response]]]]
(which expands to)  scala.concurrent.Future[cats.free.Free[Ops,scala.concurrent.Future[List[String]]]]
 required: OpsF[scala.concurrent.Future[List[Response]]]
(which expands to)  cats.free.Free[Ops,scala.concurrent.Future[List[String]]]
    val getResponses: OpsF[Future[List[Response]]] = rf map { r: List[Record] =>

my current workaround is to have store accept Future[List[Record]] and letting the interpreter map over the Future, but it feels clumsy.

The issue is not specific to List - e.g. Option would be useful as well.

Am I doing it wrong? Is there some sort of a monad transformer for this?

like image 978
kostja Avatar asked Jun 13 '16 07:06

kostja


1 Answers

The abstract data type Ops defines an algebra to fetch and to store multiple Records. It describes two operations, but that's also the only thing the algebra should do. How the operations are actually executed, shouldn't matter at all to Fetch and Store, the only useful thing you expect is respectively a List[Record] and a List[Response].

By making the expected result type of Fetch and Store a Future[List[Record]]], you limit the possibilities how to interpret this algebra. Maybe in your tests, you don't want to connect asynchronously to a webservice or a database and just want to test with a Map[Int, Result] or Vector[Result], but now you are required to return a Future which makes the tests more complex than they could be.

But saying that you don't need ETL[Future[List[Record]]] doesn't solve your question: you are using async libraries and you probably want to return some Future.

Starting with your first implementation :

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import cats.implicits._
import cats.free.Free

type Record = String
type Response = String

sealed trait EtlOp[T]
case class Fetch(offset: Int, amount: Int) extends EtlOp[List[Record]]
case class Store(recs: List[Record]) extends EtlOp[List[Response]]

type ETL[A] = Free[EtlOp, A]

def fetch(offset: Int, amount: Int): ETL[List[Record]] = 
  Free.liftF(Fetch(offset, amount))
def store(recs: List[Record]): ETL[List[Response]] = 
  Free.liftF(Store(recs))

def fetchStore(offset: Int, amount: Int): ETL[List[Response]] =
  fetch(offset, amount).flatMap(store)

But now we still have no Futures ? That's the job of our interpreter :

import cats.~>

val interpretFutureDumb: EtlOp ~> Future = new (EtlOp ~> Future) {
  def apply[A](op: EtlOp[A]): Future[A] = op match {
    case Store(records) => 
      Future.successful(records.map(rec => s"Resp($rec)"))
      // store in DB, send to webservice, ...
    case Fetch(offset, amount) =>
      Future.successful(List.fill(amount)(offset.toString))
      // get from DB, from webservice, ...
  }
}

With this interpreter (where of course you would replace Future.successful(...) with something more useful) we can get our Future[List[Response]] :

val responses: Future[List[Response]] = 
  fetchStore(1, 5).foldMap(interpretFutureDumb)

val records: Future[List[Record]] = 
  fetch(2, 4).foldMap(interpretFutureDumb)

responses.foreach(println)
// List(Resp(1), Resp(1), Resp(1), Resp(1), Resp(1))
records.foreach(println)
// List(2, 2, 2, 2)

But we can still create a different interpreter which doesn't return a Future :

import scala.collection.mutable.ListBuffer
import cats.Id

val interpretSync: EtlOp ~> Id = new (EtlOp ~> Id) {
  val records: ListBuffer[Record] = ListBuffer()
  def apply[A](op: EtlOp[A]): Id[A] = op match {
    case Store(recs) => 
      records ++= recs
      records.toList
    case Fetch(offset, amount) =>
      records.drop(offset).take(amount).toList
  }
}

val etlResponse: ETL[List[Response]] =
  for { 
    _       <- store(List("a", "b", "c", "d"))
    records <- fetch(1, 2)
    resp    <- store(records)
  } yield resp

val responses2: List[Response] = etlResponse.foldMap(interpretSync)
// List(a, b, c, d, b, c)
like image 80
Peter Neyens Avatar answered Sep 28 '22 15:09

Peter Neyens