Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I idiomatically and efficiently consume a Pipe in some non-IO monad, with an IO action?

I have a Producer that creates values that depend on randomness, using my own Random monad:

policies :: Producer (Policy s a) Random x

Random is a wrapper over mwc-random that can be run from ST or IO:

newtype Random a =
  Random (forall m. PrimMonad m => Gen (PrimState m) -> m a)

runIO :: Random a -> IO a
runIO (Random r) = MWC.withSystemRandom (r @ IO)

The policies producer yields better and better policies from a simple reinforcement learning algorithm.

I can efficiently plot the policy after, say, 5,000,000 iterations by indexing into policies:

Just convergedPolicy <- Random.runIO $ Pipes.index 5000000 policies
plotPolicy convergedPolicy "policy.svg"

I now want to plot the intermediate policies on every 500,000 steps to see how they converge. I wrote a couple of functions that take the policies producer and extract a list ([Policy s a]) of, say, 10 policies—one every 500,000 iterations—and then plot all of them.

However, these functions take far longer (10x) and use more memory (4x) than just plotting the final policy as above, even though the total number of learning iterations should be the same (ie 5,000,000). I suspect that this is due to extracting a list inhibiting the garbage collector, and this seems to be an unidiomatic use of Pipes:

Idiomatic pipes style consumes the elements immediately as they are generated instead of loading all elements into memory.

What's the correct approach to consuming a pipe like this when the Producer is over some random monad (ie Random) and the effect I want to produce is in IO?

Put another way, I want to plug a Producer (Policy s a) Random x into a Consumer (Policy s a) IO x.

like image 639
Tikhon Jelvis Avatar asked Sep 09 '16 21:09

Tikhon Jelvis


1 Answers

Random is a reader that reads a generator

import Control.Monad.Primitive
import System.Random.MWC

newtype Random a = Random {
    runRandom :: forall m. PrimMonad m => Gen (PrimState m) -> m a
}

We can trivially convert a Random a into a ReaderT (Gen (PrimState m)) m a. This trivial operation is the one you want to hoist to turn a Producer ... Random a into a Producer ... IO a.

import Control.Monad.Trans.Reader

toReader :: PrimMonad m => Random a -> ReaderT (Gen (PrimState m)) m a
toReader = ReaderT . runRandom

Since toReader is trivial there won't be any random generation overhead from hoisting it. This function is written just to demonstrate its type signature.

import Pipes

hoistToReader :: PrimMonad m => Proxy a a' b b' Random                          r ->
                                Proxy a a' b b' (ReaderT (Gen (PrimState m)) m) r
hoistToReader = hoist toReader

There are two approaches to take here. The simple approach is to hoist your Consumer into the same monad, compose the pipes together, and run them.

type ReadGenIO = ReaderT GenIO IO

toReadGenIO :: MFunctor t => t Random a -> t ReadGenIO a
toReadGenIO = hoist toReader

int :: Random Int
int = Random uniform

ints :: Producer Int Random x
ints = forever $ do
    i <- lift int
    yield i

sample :: Show a => Int -> Consumer a IO ()
sample 0 = return ()
sample n = do
    x <- await
    lift $ print x
    sample (n-1)

sampleSomeInts :: Effect ReadGenIO ()
sampleSomeInts = hoist toReader ints >-> hoist lift (sample 1000)

runReadGenE :: Effect ReadGenIO a -> IO a
runReadGenE = withSystemRandom . runReaderT . runEffect

example :: IO ()
example = runReadGenE sampleSomeInts

There's another set of tools in Pipes.Lift that users of pipes should be aware of. These are the tools for running transformers like your Random monad by distributing it over a Proxy. There are pre-built tools here for running the familiar transformers from the transformers library. They are all built out of distribute. It turns a Proxy ... (t m) a into a t (Proxy ... m) a which you can run once with whatever tools you use to run a t.

import Pipes.Lift

runRandomP :: PrimMonad m => Proxy a a' b b' Random r ->
                             Gen (PrimState m) -> Proxy a a' b b' m r
runRandomP = runReaderT . distribute . hoist toReader

You can finish combining the pipes together and use runEffect to get rid of the Proxys, but you'd be juggling the generator argument yourself as you combine the Proxy ... IO rs together.

like image 142
Cirdec Avatar answered Sep 28 '22 17:09

Cirdec