I've written a library called amqp-worker that provides a function called worker
that polls a message queue (like RabbitMQ) for messages, calling a handler when a message is found. Then it goes back to polling.
It's leaking memory. I've profiled it and the graph says PAP
(partial function application) is the culprit. Where is the leak in my code? How can I avoid leaks when looping in IO
with forever
?
Here are some relevant functions. The full source is here.
Example Program. This leaks
main :: IO ()
main = do
-- connect
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
-- initialize the queues
Worker.initQueue conn queue
Worker.initQueue conn results
-- publish a message
Worker.publish conn queue (TestMessage "hello world")
-- create a worker, the program loops here
Worker.worker def conn queue onError (onMessage conn)
worker
worker :: (FromJSON a, MonadBaseControl IO m, MonadCatch m) => WorkerOptions -> Connection -> Queue key a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
worker opts conn queue onError action =
forever $ do
eres <- consumeNext (pollDelay opts) conn queue
case eres of
Error (ParseError reason bd) ->
onError (MessageParseError bd reason)
Parsed msg ->
catch
(action msg)
(onError . OtherException (body msg))
liftBase $ threadDelay (loopDelay opts)
consumeNext
consumeNext :: (FromJSON msg, MonadBaseControl IO m) => Microseconds -> Connection -> Queue key msg -> m (ConsumeResult msg)
consumeNext pd conn queue =
poll pd $ consume conn queue
poll
poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a
poll us action = do
ma <- action
case ma of
Just a -> return a
Nothing -> do
liftBase $ threadDelay us
poll us action
In computer science, a memory leak is a type of resource leak that occurs when a computer program incorrectly manages memory allocations in a way that memory which is no longer needed is not released. A memory leak may also happen when an object is stored in memory but cannot be accessed by the running code.
The primary tools for detecting memory leaks are the C/C++ debugger and the C Run-time Library (CRT) debug heap functions. The #define statement maps a base version of the CRT heap functions to the corresponding debug version. If you leave out the #define statement, the memory leak dump will be less detailed.
Here is a very simple example that demonstrates your problem:
main :: IO ()
main = worker
{-# NOINLINE worker #-}
worker :: (Monad m) => m ()
worker =
let loop = poll >> loop
in loop
poll :: (Monad m) => m a
poll = return () >> poll
If you remove the NOINLINE
, or specialize m
to
IO
(while compiling with -O
), the leak goes away.
I wrote a detailed blog
post about why
exactly this code leaks memory. The quick summary is, as Reid points out in his
answer, that the code creates and remembers a chain of partial applications of
>>
s.
I also filed a ghc ticket about this.
Maybe an easier example to understand is this one
main :: IO ()
main = let c = count 0
in c >> c
{-# NOINLINE count #-}
count :: Monad m => Int -> m ()
count 1000000 = return ()
count n = return () >> count (n+1)
Evaluating f >> g
for IO actions yields some kind of closure that has references to both f
and g
(it's basically the composition of f
and g
as functions on state tokens). count 0
returns a thunk c
that will evaluate to a big structure of closures of the form return () >> return () >> return () >> ...
. When we execute c
we build up this structure, and since we have to execute c
a second time the whole structure is still live. So this program leaks memory (regardless of optimization flags).
When count
is specialized to IO
and optimizations are enabled, GHC has a variety of tricks available to avoid building up this data structure; but they all rely on knowing that the monad is IO
.
Returning to the original count :: Monad m => Int -> m ()
, we can try to avoid building this big structure by changing the last line to
count n = return () >>= (\_ -> count (n+1))
Now the recursive call is hidden inside a lambda, so c
is just a small structure return () >>= (\_ -> BODY)
. This does actually avoid the space leak when compiling without optimizations. However when optimizations are enabled, GHC floats out count (n+1)
from the body of the lambda (since it doesn't depend on the argument) producing
count n = return () >>= (let body = count (n+1) in \_ -> body)
and now c
is a large structure again...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With