I am implementing a haskell program which compares each line of a file with each other line in the file. Which can be implemented single threaded as follows
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
This will run in O(n^2) time, and has to keep the complete list of integers in memory the whole time. In my actual program the line contains more numbers, out of which I construct a slightly more complex datatype than Int. This gave me out of memory errors on the data I have to process.
So there are two improvements to be made to the above-mentioned single threaded solution. First, speed up the actual running time. Second, find a way to not keep the whole list in memory the full time. I know this requires parsing the complete file n times. Thus there will be O(n^2) comparisons, and O(n^2) lines parsed. This is OK for me as I'd rather have a slow successful program than a failing program. When the input file is small enough I can always reside to a simpler version.
To use multiple cpu cores I took the Mapreduce implementation out of Real World Haskell (chapter 24, available here).
I modified the chunking function from the book to, instead of dividing the complete file in chunks, return as many chunks as lines with each chunk representing one element of
tails . lines . readFile
Because I want the program also to be scalable in file-size, I initially used lazy IO. This however fails with "Too many open files", about which I asked in a previous question (the file handles were disposed too late by the GC). The full lazy IO version is posted there.
As the accepted answer explains, strict IO could solve the issue. That indeed solves the "Too many open files" problem for 2k line files, but fails with "out of memory" on a 50k file.
Note that the first single threaded implementation (without mapreduce) is capable of handling a 50k file.
The alternative solution, which also appeals most to me, is to use iteratee IO. I expected this to solve both the file handle, and memory resource exhaustion. My implementation however still fails with a "Too many open files" error on a 2k line file.
The iteratee IO version has the same mapReduce function as in the book, but has a modified chunkedFileEnum to let it work with an Enumerator.
Thus my question is; what is wrong with the following iteratee IO base implementation? Where is the Laziness?.
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO
import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)
import Data.Text(Text)
import Data.Text.Read
import Data.Maybe
import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances
--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)
--Applying my operation simply on a file
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But I want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
where transformer input = case reader input of
Right (val, remainder) -> return [val]
Left err -> return [0]
readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)
--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
maybeNum <- EL.head
maybe (return 0) distancesOneToManyIt maybeNum
distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
maybeNum <- EL.head
maybe (return 0) combineNextDistance maybeNum
where combineNextDistance nextNum = do
rest <- distancesOneToManyIt base
return $ combineDistances [(distance base nextNum),rest]
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path
distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
rpar (sumValuesAsReduceFunc)
where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
sumValuesAsReduceFunc :: [IO Int] -> IO Int
sumValuesAsReduceFunc = liftM sum . sequence
--Working with (file)chunk enumerators:
data ChunkSpec = CS{
chunkOffset :: !Int
, chunkLength :: !Int
} deriving (Eq,Show)
chunkedFileEnum :: (NFData (a)) => MonadIO m =>
(FilePath-> IO [ChunkSpec])
-> ([Enumerator Text m b]->IO a)
-> FilePath
-> IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedEnum chunkCreator path
r <- funcOnChunks chunks
(rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles
chunkedEnum :: MonadIO m=>
(FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
return (chunk,h)
-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Str.hGet h 256
case Str.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Str.length bytes)
findNewline newOffset
findChunks 0
Btw, I'm running
HaskellPlatform 2011.2.0 on Mac OS X 10.6.7 (snow leopard)
with the following packages:
bytestring 0.9.1.10
parallel 3.1.0.1
enumerator 0.4.8 , with a manual here
As the error says, there are too many open files. I expected Haskell to run most of the program sequentially, but some 'sparks' parallel. However, as sclv mentioned, Haskell always sparks the evaluations.
This usually is not a problem in a pure functional program, but it is when dealing with IO (resources). I scaled the parallelism as described in the Real World Haskell book too far up. So my conclusion is to do parallelism only on a limited scale when dealing with IO resources within the sparks. In the pure functional part, excessive parallelism may succeed.
Thus the answer to my post is, to not use MapReduce on the whole program, but within an inner pure functional part.
To show where the program actually failed, i configured it with --enable-executable-profiling -p, build it, and ran it using +RTS -p -hc -L30. Because the executable fails immediately, there is no memory allocation profile. The resulting time allocation profile in the .prof file starts with the following:
individual inherited
COST CENTRE MODULE no. entries %time %alloc %time %alloc
MAIN MAIN 1 0 0.0 0.3 100.0 100.0
main Main 1648 2 0.0 0.0 50.0 98.9
sumOfDistancesOnFileWithIt MapReduceTest 1649 1 0.0 0.0 50.0 98.9
chunkedFileEnum MapReduceTest 1650 1 0.0 0.0 50.0 98.9
chunkedEnum MapReduceTest 1651 495 0.0 24.2 50.0 98.9
lineOffsets MapReduceTest 1652 1 50.0 74.6 50.0 74.6
chunkedEnum returns IO ([Enumerator Text m b], [Handle]), and apparently receives 495 entries. The input file was a 2k line file, so the single entry on lineOffsets returned a list of 2000 offsets. There is not a single entry in distancesUsingMapReduceIt, so the actual work did not even start!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With