I have implemented a simple language for an ETL process using the free monad. When using List
as input and output for both data fetching and storing, everything works fine. However I am using async libraries and work with Future[List]
common imports and definitions
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import cats.free.Free
import cats.free.Free._
sealed trait Ops[A]
type OpsF[A] = Free[Ops, A]
working with List
case class Fetch(offset: Int, amount: Int) extends Ops[List[Record]]
case class Store(recs: List[Record]) extends Ops[List[Response]]
def fetch(offset: Int, amount: Int): OpsF[List[Record]] =
liftF[Ops, List[Record]](Fetch(offset, amount))
def store(recs: List[Record]): OpsF[List[Response]] =
liftF[Ops, List[Response]](Store(recs))
def simpleEtl(offset: Int, amount: Int): Free[Ops, List[Response]] =
fetch(offset, amount).flatMap(r => store(r))
not working with Future[List]
case class Fetch(offset: Int, amount: Int) extends Ops[Future[List[Record]]]
case class Store(recs: List[Record]) extends Ops[Future[List[Response]]]
def fetch(offset: Int, amount: Int): OpsF[Future[List[Record]]] =
liftF[Ops, Future[List[Record]]](Fetch(offset, amount))
def store(recs: List[Record]): OpsF[Future[List[Response]]] =
liftF[Ops, Future[List[Response]]](Store(recs))
// explicit types in case I am misunderstanding more than I think
def simpleEtl(offset: Int, amount: Int): Free[Ops, Future[List[Response]]] =
fetch(offset, amount).flatMap { rf: Future[List[Record]] =>
val getResponses: OpsF[Future[List[Response]]] = rf map { r: List[Record] =>
store(r)
}
getResponses
}
as expected, the type returned from the flatMap
/map
is wrong - I am not getting OpsF[Future]
but a Future[OpsF]
Error:(34, 60) type mismatch;
found : scala.concurrent.Future[OpsF[scala.concurrent.Future[List[Response]]]]
(which expands to) scala.concurrent.Future[cats.free.Free[Ops,scala.concurrent.Future[List[String]]]]
required: OpsF[scala.concurrent.Future[List[Response]]]
(which expands to) cats.free.Free[Ops,scala.concurrent.Future[List[String]]]
val getResponses: OpsF[Future[List[Response]]] = rf map { r: List[Record] =>
my current workaround is to have store
accept Future[List[Record]]
and letting the interpreter map over the Future
, but it feels clumsy.
The issue is not specific to List
- e.g. Option
would be useful as well.
Am I doing it wrong? Is there some sort of a monad transformer for this?
The abstract data type Ops
defines an algebra to fetch and to store multiple Record
s. It describes two operations, but that's also the only thing the algebra should do. How the operations are actually executed, shouldn't matter at all to Fetch
and Store
, the only useful thing you expect is respectively a List[Record]
and a List[Response]
.
By making the expected result type of Fetch
and Store
a Future[List[Record]]]
, you limit the possibilities how to interpret this algebra. Maybe in your tests, you don't want to connect asynchronously to a webservice or a database and just want to test with a Map[Int, Result]
or Vector[Result]
, but now you are required to return a Future
which makes the tests more complex than they could be.
But saying that you don't need ETL[Future[List[Record]]]
doesn't solve your question: you are using async libraries and you probably want to return some Future
.
Starting with your first implementation :
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import cats.implicits._
import cats.free.Free
type Record = String
type Response = String
sealed trait EtlOp[T]
case class Fetch(offset: Int, amount: Int) extends EtlOp[List[Record]]
case class Store(recs: List[Record]) extends EtlOp[List[Response]]
type ETL[A] = Free[EtlOp, A]
def fetch(offset: Int, amount: Int): ETL[List[Record]] =
Free.liftF(Fetch(offset, amount))
def store(recs: List[Record]): ETL[List[Response]] =
Free.liftF(Store(recs))
def fetchStore(offset: Int, amount: Int): ETL[List[Response]] =
fetch(offset, amount).flatMap(store)
But now we still have no Future
s ? That's the job of our interpreter :
import cats.~>
val interpretFutureDumb: EtlOp ~> Future = new (EtlOp ~> Future) {
def apply[A](op: EtlOp[A]): Future[A] = op match {
case Store(records) =>
Future.successful(records.map(rec => s"Resp($rec)"))
// store in DB, send to webservice, ...
case Fetch(offset, amount) =>
Future.successful(List.fill(amount)(offset.toString))
// get from DB, from webservice, ...
}
}
With this interpreter (where of course you would replace Future.successful(...)
with something more useful) we can get our Future[List[Response]]
:
val responses: Future[List[Response]] =
fetchStore(1, 5).foldMap(interpretFutureDumb)
val records: Future[List[Record]] =
fetch(2, 4).foldMap(interpretFutureDumb)
responses.foreach(println)
// List(Resp(1), Resp(1), Resp(1), Resp(1), Resp(1))
records.foreach(println)
// List(2, 2, 2, 2)
But we can still create a different interpreter which doesn't return a Future
:
import scala.collection.mutable.ListBuffer
import cats.Id
val interpretSync: EtlOp ~> Id = new (EtlOp ~> Id) {
val records: ListBuffer[Record] = ListBuffer()
def apply[A](op: EtlOp[A]): Id[A] = op match {
case Store(recs) =>
records ++= recs
records.toList
case Fetch(offset, amount) =>
records.drop(offset).take(amount).toList
}
}
val etlResponse: ETL[List[Response]] =
for {
_ <- store(List("a", "b", "c", "d"))
records <- fetch(1, 2)
resp <- store(records)
} yield resp
val responses2: List[Response] = etlResponse.foldMap(interpretSync)
// List(a, b, c, d, b, c)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With