I have a relatively simple "copy" program that merely copies all the lines of one file to another. I'm playing around with Haskell's concurrency support with TMQueue
and STM
so I thought I'd try it like this:
{-# LANGUAGE BangPatterns #-}
module Main where
import Control.Applicative
import Control.Concurrent.Async -- from async
import Control.Concurrent.Chan
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMQueue -- from stm-chans
import Control.Monad (replicateM, forM_, forever, unless)
import qualified Data.ByteString.Char8 as B
import Data.Function (fix)
import Data.Maybe (catMaybes, maybe)
import System.IO (withFile, IOMode(..), hPutStrLn, hGetLine)
import System.IO.Error (catchIOError)
input = "data.dat"
output = "out.dat"
batch = 100 :: Int
consumer :: TMQueue B.ByteString -> IO ()
consumer q = withFile output WriteMode $ \fh -> fix $ \loop -> do
!items <- catMaybes <$> replicateM batch readitem
forM_ items $ B.hPutStrLn fh
unless (length items < batch) loop
where
readitem = do
!item <- atomically $ readTMQueue q
return item
producer :: TMQueue B.ByteString -> IO ()
producer q = withFile input ReadMode $ \fh ->
(forever (B.hGetLine fh >>= atomically . writeTMQueue q))
`catchIOError` const (atomically (closeTMQueue q) >> putStrLn "Done")
main :: IO ()
main = do
q <- atomically newTMQueue
thread <- async $ consumer q
producer q
wait thread
I can make a little test input file like this
ghc -e 'writeFile "data.dat" (unlines (map show [1..5000000]))'
And build it like this
ghc --make QueueTest.hs -O2 -prof -auto-all -caf-all -threaded -rtsopts -o q
When I run it like so ./q +RTS -s -prof -hc -L60 -N2
, it says that "2117 MB total memory in use"! But the input file is only 38 MB!
I am new to profiling, but I have produced graph after graph and cannot pinpoint my mistake.
As the OP points out, by now I may as well write a real answer. Let's start with the memory consumption.
Two useful references are Memory footprint of Haskell data types and http://blog.johantibell.com/2011/06/memory-footprints-of-some-common-data.html. We'll also need to look at the definitions of some of our structures.
-- from http://hackage.haskell.org/package/stm-chans-3.0.0.2/docs/src/Control-Concurrent-STM-TMQueue.html
data TMQueue a = TMQueue
{-# UNPACK #-} !(TVar Bool)
{-# UNPACK #-} !(TQueue a)
deriving Typeable
-- from http://hackage.haskell.org/package/stm-2.4.3/docs/src/Control-Concurrent-STM-TQueue.html
-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
{-# UNPACK #-} !(TVar [a])
The TQueue
implementation uses a standard functional queue with a read end and write end.
Let's set an upper bound on memory usage and assume that we read the entire file into the TMQueue
before the consumer does anything. In that case, the write end of our TQueue will contain a list with one element per input line (stored as a bytestring). Each list node will look like
(:) bytestring tail
which takes 3 words (1 per field + 1 for the constructor). Each bytestring is 9 words, so add the two together and there are 12 words of overhead per line, not including the actual data. Your test data is 5 million lines, so that's 60 million words of overhead for the whole file (plus some constants), which on a 64-bit system is about 460MB (assuming I did my math right, always questionable). Add in 40MB for the actual data, and we get values pretty close to what I see on my system.
So, why is our memory usage close to this upper bound? I have a theory (investigation left as an exercise!). First, the producer is likely to run a bit faster than the consumer simply because reading is usually faster than writing (I'm using spinning disks, maybe an SSD would be different). Here's the definition of readTQueue:
-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> case reverse ys of
[] -> error "readTQueue"
(z:zs) -> do writeTVar write []
writeTVar read zs
return z
First we try to read from the read end, and if that's empty we try to read from the write end, after reversing that list.
What I think is happening is this: when the consumer needs to read from the write end, it needs to traverse the input list within the STM transaction. This takes some time, which will cause it to contend with the producer. As the producer gets further ahead, this list gets longer, causing the read to take yet more time, during which the producer is able to write more values, causing the read to fail. This process repeats until the producer finishes, and only then does the consumer get a chance to process the bulk of the data. Not only does this ruin concurrency, it adds more CPU overhead because the consumer transaction is continually retrying and failing.
So, what about unagi? There are a couple key differences. First, unagi-chan uses arrays internally instead of lists. This reduces the overhead a little. Most of the overhead is from the ByteString pointers, so not much, but a little. Secondly, unagi keeps chunks of arrays. Even if we pessimistically assume that the producer always wins contentions, after the array gets filled it's pushed off the producer's side of the channel. Now the producer is writing to a new array and the consumer reads from the old array. This situation is near-ideal; there's no contention to shared resources, the consumer has good locality of reference, and because the consumer is working on a different chunk of memory there aren't issues with cache coherence. Unlike my theoretical description of the TMQueue
, now you're getting concurrent operations, allowing the producer to clear some of the memory usage so it never hits the upper bound.
As an aside, I think the consumer batching is not beneficial. Handles are buffered by the IO subsystem already, so I don't think this gains anything. For me performance improved a little when I changed the consumer to operate line-by-line anyway.
Now, what can you do about this problem? Going from my working hypothesis that TMQueue
is suffering from contention problems, and your specified requirements, you'll just need to use another type of queue. Obviously unagi works pretty well. I also tried TMChan
, it was about 25% slower than unagi but used 45% less memory, so that could be a good option too. (this isn't too surprising, TMChan
has a different structure from TMQueue
so it'll have different performance characteristics)
You could also try to change your algorithm so that the producer sends multi-line chunks. This would lower the memory overhead from all the ByteStrings.
So, when is it ok to use TMQueue
? If the producer and consumer are about the same speed, or the consumer is faster, it should be ok. Also, if processing times are non-uniform, or the producer runs in bursts, you'll probably get good amortized performance. This is pretty much a worst-case situation, and perhaps it should be reported as a bug against stm
? I think if the read function were changed to
-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> do writeTVar write []
let (z:zs) = reverse ys
writeTVar read zs
return z
it would avoid this problem. Now the z
and zs
bindings should both be evaluated lazily, so the list traversal would happen outside this transaction, allowing the read operation to succeed sometimes under contention. Assuming I'm correct about the issue in the first place, of course (and that this definition is lazy enough). There might be other unexpected downsides though.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With