Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Huge memory consumption for simple multithreaded Haskell

I have a relatively simple "copy" program that merely copies all the lines of one file to another. I'm playing around with Haskell's concurrency support with TMQueue and STM so I thought I'd try it like this:

{-# LANGUAGE BangPatterns #-}

module Main where

import Control.Applicative
import Control.Concurrent.Async              -- from async
import Control.Concurrent.Chan
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMQueue        -- from stm-chans
import Control.Monad (replicateM, forM_, forever, unless)
import qualified Data.ByteString.Char8 as B
import Data.Function (fix)
import Data.Maybe (catMaybes, maybe)
import System.IO (withFile, IOMode(..), hPutStrLn, hGetLine)
import System.IO.Error (catchIOError)

input  = "data.dat"
output = "out.dat"
batch = 100 :: Int

consumer :: TMQueue B.ByteString -> IO ()
consumer q = withFile output WriteMode $ \fh -> fix $ \loop -> do
  !items <- catMaybes <$> replicateM batch readitem
  forM_ items $ B.hPutStrLn fh
  unless (length items < batch) loop
  where
    readitem = do
      !item <- atomically $ readTMQueue q
      return item

producer :: TMQueue B.ByteString -> IO ()
producer q = withFile input ReadMode $ \fh ->
  (forever (B.hGetLine fh >>= atomically . writeTMQueue q))
  `catchIOError` const (atomically (closeTMQueue q) >> putStrLn "Done")

main :: IO ()
main = do
  q <- atomically newTMQueue
  thread <- async $ consumer q
  producer q
  wait thread

I can make a little test input file like this

ghc -e 'writeFile "data.dat" (unlines (map show [1..5000000]))'

And build it like this

ghc --make QueueTest.hs -O2 -prof -auto-all -caf-all -threaded -rtsopts -o q

When I run it like so ./q +RTS -s -prof -hc -L60 -N2, it says that "2117 MB total memory in use"! But the input file is only 38 MB!

I am new to profiling, but I have produced graph after graph and cannot pinpoint my mistake.

like image 356
Elliot Cameron Avatar asked Aug 27 '14 20:08

Elliot Cameron


1 Answers

As the OP points out, by now I may as well write a real answer. Let's start with the memory consumption.

Two useful references are Memory footprint of Haskell data types and http://blog.johantibell.com/2011/06/memory-footprints-of-some-common-data.html. We'll also need to look at the definitions of some of our structures.

-- from http://hackage.haskell.org/package/stm-chans-3.0.0.2/docs/src/Control-Concurrent-STM-TMQueue.html

data TMQueue a = TMQueue
    {-# UNPACK #-} !(TVar Bool)
    {-# UNPACK #-} !(TQueue a)
    deriving Typeable


-- from http://hackage.haskell.org/package/stm-2.4.3/docs/src/Control-Concurrent-STM-TQueue.html

-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
                       {-# UNPACK #-} !(TVar [a])

The TQueue implementation uses a standard functional queue with a read end and write end.

Let's set an upper bound on memory usage and assume that we read the entire file into the TMQueue before the consumer does anything. In that case, the write end of our TQueue will contain a list with one element per input line (stored as a bytestring). Each list node will look like

(:) bytestring tail

which takes 3 words (1 per field + 1 for the constructor). Each bytestring is 9 words, so add the two together and there are 12 words of overhead per line, not including the actual data. Your test data is 5 million lines, so that's 60 million words of overhead for the whole file (plus some constants), which on a 64-bit system is about 460MB (assuming I did my math right, always questionable). Add in 40MB for the actual data, and we get values pretty close to what I see on my system.

So, why is our memory usage close to this upper bound? I have a theory (investigation left as an exercise!). First, the producer is likely to run a bit faster than the consumer simply because reading is usually faster than writing (I'm using spinning disks, maybe an SSD would be different). Here's the definition of readTQueue:

-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> case reverse ys of
                       [] -> error "readTQueue"
                       (z:zs) -> do writeTVar write []
                                    writeTVar read zs
                                    return z

First we try to read from the read end, and if that's empty we try to read from the write end, after reversing that list.

What I think is happening is this: when the consumer needs to read from the write end, it needs to traverse the input list within the STM transaction. This takes some time, which will cause it to contend with the producer. As the producer gets further ahead, this list gets longer, causing the read to take yet more time, during which the producer is able to write more values, causing the read to fail. This process repeats until the producer finishes, and only then does the consumer get a chance to process the bulk of the data. Not only does this ruin concurrency, it adds more CPU overhead because the consumer transaction is continually retrying and failing.

So, what about unagi? There are a couple key differences. First, unagi-chan uses arrays internally instead of lists. This reduces the overhead a little. Most of the overhead is from the ByteString pointers, so not much, but a little. Secondly, unagi keeps chunks of arrays. Even if we pessimistically assume that the producer always wins contentions, after the array gets filled it's pushed off the producer's side of the channel. Now the producer is writing to a new array and the consumer reads from the old array. This situation is near-ideal; there's no contention to shared resources, the consumer has good locality of reference, and because the consumer is working on a different chunk of memory there aren't issues with cache coherence. Unlike my theoretical description of the TMQueue, now you're getting concurrent operations, allowing the producer to clear some of the memory usage so it never hits the upper bound.

As an aside, I think the consumer batching is not beneficial. Handles are buffered by the IO subsystem already, so I don't think this gains anything. For me performance improved a little when I changed the consumer to operate line-by-line anyway.

Now, what can you do about this problem? Going from my working hypothesis that TMQueue is suffering from contention problems, and your specified requirements, you'll just need to use another type of queue. Obviously unagi works pretty well. I also tried TMChan, it was about 25% slower than unagi but used 45% less memory, so that could be a good option too. (this isn't too surprising, TMChan has a different structure from TMQueue so it'll have different performance characteristics)

You could also try to change your algorithm so that the producer sends multi-line chunks. This would lower the memory overhead from all the ByteStrings.

So, when is it ok to use TMQueue? If the producer and consumer are about the same speed, or the consumer is faster, it should be ok. Also, if processing times are non-uniform, or the producer runs in bursts, you'll probably get good amortized performance. This is pretty much a worst-case situation, and perhaps it should be reported as a bug against stm? I think if the read function were changed to

-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> do writeTVar write []
                        let (z:zs) = reverse ys
                        writeTVar read zs
                        return z

it would avoid this problem. Now the z and zs bindings should both be evaluated lazily, so the list traversal would happen outside this transaction, allowing the read operation to succeed sometimes under contention. Assuming I'm correct about the issue in the first place, of course (and that this definition is lazy enough). There might be other unexpected downsides though.

like image 87
John L Avatar answered Oct 13 '22 03:10

John L