Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Object pool pattern with FS2

Tags:

scala

fs2

I am trying to see what's the best way to implement the Object Pool pattern in FS2.

Let's say we have the following MyPrinter definition:

class MyPrinter {
  import scala.util.Random.nextInt
  Thread.sleep(5000 + nextInt(1000))
  def doStuff(s: String): Unit = {
    println(s)
    Thread.sleep(1000 + nextInt(1000))
  }
  def releaseResources(): Unit =
    println("Releasing resources")
}

What is the best way to make a Stream[Task, MyPrinter] backed by a pool of n printers? When the stream ends, all the underlying resources should be properly released by calling releaseResources.

Bonus question: if a printer ends for some reason, is it possible to create a new one in the pool?

like image 294
betehess Avatar asked Apr 15 '26 06:04

betehess


1 Answers

Not sure that I got the question, but how about this

implicit val S = Strategy.fromFixedDaemonPool(10, "pooling")

val queue = new LinkedBlockingDeque[MyPrinter]()
queue.add(new MyPrinter)
queue.add(new MyPrinter)

Stream.repeatEval(Task.delay(queue.take()))
  .map(p => try p.doStuff("test") finally {
    p.releaseResources()
    queue.put(p)
  })
  .take(10)
  .runLog
  .unsafeRun()

Queue can be replaced with https://commons.apache.org/proper/commons-pool/

Upd:

If you want to process each "resource" concurrently:

concurrent.join(10)(
  Stream
    .repeatEval(Task.delay(queue.take()))
    .map(p => Stream.eval(Task.delay(p.doStuff("test"))
    .map(_ => p /* done with this resource */)))
).map(p => { p.releaseResources(); queue.put(p) /* release resource */})
 .take(10).runLog.unsafeRun()
like image 130
Eugene Zhulenev Avatar answered Apr 18 '26 01:04

Eugene Zhulenev



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!