I have a simple algorithm to implement: compare each line with each other line. Each line contains one number, and the comparison function is the distance. The sum of all distances is the final result.
This can be implemented as simply as follows:
sumOfDistancesOnSmallFile :: FilePath -> IO Integer
sumOfDistancesOnSmallFile path = withFile path ReadMode $ \h->do
distances <- liftM ( (map textRead) ) $ hListToEOF Text.hGetLine h
let subSet = drop offset distances
let emptySubSet = null subSet
return $ if (emptySubSet)
then (0)
else (distancesManyToMany subSet)
hListToEOF :: (Handle -> IO a) -> Handle -> IO [a]
hListToEOF func h = do
element <- func h
atEOF <- hIsEOF h
rest <- case(atEOF) of
True -> return []
False -> hListToEOF func h
return $ element:rest
distancesManyToMany :: [Integer]->Integer
distancesManyToMany (x:xs) = distancesOneToMany x xs + (distancesManyToMany xs)
distancesManyToMany _ = 0
distancesOneToMany :: Integer -> [Integer] -> Integer
distancesOneToMany one many = sum $ map (distance one) many
distance :: Integer -> Integer -> Integer
distance a b = (a-b)
To get reasonable big data on each line, i've used the following file generator:
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Integer->Integer-> [Integer]
infiniteList i j = (i+j) * (i+j) : infiniteList j (i+j)
A 2000 line file, of 840kb will take 1.92 seconds and 1.5Gb allocations, with a maximum usage of around 1.5Mb.
A 6k line file, of 7.5mb will take 22 seconds, 34Gb allocations, with a maximum memory usage of around 15Mb
Unfortunately my data will be millions of lines. I initially attempted to improve speed (about which I asked in 2 previous posts about MapReduce combined with Iteratee IO), but the actual limiting problem is space.
Intermediate thought:This could be overcome by reading the complete file for each number to compare. This does take a lot of additional time, because the file needs to be opened and parsed for each line that is to be compared with the remainder of the file. Also the number of memory allocations will become quadratic. So that's not really useful as a final solution
The final step: That was my first step towards my goal: batched execution. I would like to take a few k lines into memory. Apply the ManyToMany algorithm on those in memory. Then, iterate through the remainder of the file. On each iteration step, only one successive line needs to be read and parsed, which then can be compared to all items in the memory batch.
By choosing a batch size big enough the file does not have to be re-read often. My implementation is as follows:
sumOfDistancesOnBigFileUsingBatches :: FilePath -> Int -> Int -> IO Integer
sumOfDistancesOnBigFileUsingBatches path batchSize offset = do
(firstResult, maybeRecurse) <- singleResultBatch path batchSize offset
recursiveResult <- case maybeRecurse of
Nothing -> return 0
Just newOffset -> sumOfDistancesOnBigFileUsingBatches path batchSize newOffset
return $ firstResult + recursiveResult
singleResultBatch :: FilePath -> Int -> Int -> IO(Integer, Maybe Int)
singleResultBatch path batchSize offset = withFile path ReadMode $ \h->do
distances <- readDistances h
let (batch, subSet) = splitAt batchSize $ drop offset distances
let batchInner = distancesManyToMany batch
let recursionTerminated = null subSet
let (batchToSubSet, newOffset) = if (recursionTerminated)
then (0, Nothing)
else (distancesSetToSet batch subSet, Just (offset+batchSize))
return (batchInner+batchToSubSet, newOffset)
where
readDistances h = liftM ( (map textRead) ) $ hListToEOF Text.hGetLine h
distancesSetToSet :: [Integer] -> [Integer] -> Integer
distancesSetToSet xs ys = sum $ map (\one->distancesOneToMany one xs) ys
On the 2k line file, with a batch size of 500 it finished with 2.16secs, 2.2Gb allocations and around 6Mb required space. That is 4 times the space of the simplest version! It might be coincidence, but there are also 4 batches utilized...
What surprised me, is that all the required space is consumed initially, later on the required space only decreases. This becomes a problem with a 50k line file (500MB), because then it runs out of memory.
My question is: why does the batches solution consume more memory? It seems to keep the whole file in memory for each batch, even though it should (at least that's my intention) only keep one single batch in memory.
EDIT: I removed the details of the 6k line file and 500line batches (I took a wrong profile file)
And as addition, here is the space profile generated using the 2k line file and 500line batches:
EDIT2: Profiling with retainer resulted in:
total time = 2.24 secs (112 ticks @ 20 ms)
total alloc = 2,126,803,896 bytes (excludes profiling overheads)
COST CENTRE MODULE %time %alloc
textRead MapReduceTestStrictStrings 47.3 44.4
distance MapReduceTestStrictStrings 25.9 25.3
distancesOneToMany MapReduceTestStrictStrings 18.8 29.5
singleResultBatch MapReduceTestStrictStrings 4.5 0.0
readTextDevice Data.Text.IO.Internal 2.7 0.0
individual inherited
COST CENTRE MODULE no. entries %time %alloc %time %alloc
MAIN MAIN 1 0 0.0 0.0 100.0 100.0
main Main 1604 2 0.0 0.0 100.0 100.0
sumOfDistancesOnBigFileUsingBatches MapReduceTestStrictStrings 1605 4 0.0 0.0 100.0 100.0
singleResultBatch MapReduceTestStrictStrings 1606 20 4.5 0.0 100.0 100.0
distancesSetToSet MapReduceTestStrictStrings 1615 3 0.0 0.0 34.8 43.3
distancesOneToMany MapReduceTestStrictStrings 1616 3000 14.3 23.2 34.8 43.2
distance MapReduceTestStrictStrings 1617 1500000 20.5 20.0 20.5 20.0
textRead MapReduceTestStrictStrings 1614 5000 47.3 44.4 47.3 44.4
distancesManyToMany MapReduceTestStrictStrings 1611 2004 0.0 0.0 9.8 11.7
distancesOneToMany MapReduceTestStrictStrings 1612 2000 4.5 6.3 9.8 11.6
distance MapReduceTestStrictStrings 1613 499000 5.4 5.3 5.4 5.3
hListToEOF MapReduceTestStrictStrings 1609 23996 0.9 0.6 3.6 0.6
readTextDevice Data.Text.IO.Internal 1610 1660 2.7 0.0 2.7 0.0
CAF:main4 Main 1591 1 0.0 0.0 0.0 0.0
CAF:main5 Main 1590 1 0.0 0.0 0.0 0.0
main Main 1608 0 0.0 0.0 0.0 0.0
CAF GHC.Num 1580 1 0.0 0.0 0.0 0.0
CAF GHC.IO.Handle.FD 1526 2 0.0 0.0 0.0 0.0
CAF GHC.IO.FD 1510 2 0.0 0.0 0.0 0.0
CAF System.Event.Thread 1508 3 0.0 0.0 0.0 0.0
CAF GHC.IO.Encoding.Iconv 1487 2 0.0 0.0 0.0 0.0
CAF System.Event.Internal 1486 2 0.0 0.0 0.0 0.0
CAF System.Event.Unique 1483 1 0.0 0.0 0.0 0.0
CAF GHC.Conc.Signal 1480 1 0.0 0.0 0.0 0.0
CAF Data.Text.Internal 813 1 0.0 0.0 0.0 0.0
CAF Data.Text.Array 811 1 0.0 0.0 0.0 0.0
Retainer sets created during profiling:
SET 2 = {<MAIN.SYSTEM>}
SET 3 = {<MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 15 = {<GHC.IO.FD.CAF>}
SET 17 = {<System.Event.Thread.CAF>}
SET 18 = {<>}
SET 44 = {<GHC.IO.Handle.FD.CAF>}
SET 47 = {<GHC.IO.Handle.FD.CAF>, <MAIN.SYSTEM>}
SET 56 = {<GHC.Conc.Signal.CAF>}
SET 57 = {<>, <MAIN.SYSTEM>}
SET 66 = {<MAIN.SYSTEM>, <MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 67 = {<System.Event.Thread.CAF>, <>, <MAIN.SYSTEM>}
SET 72 = {<GHC.Conc.Sync.CAF>, <MAIN.SYSTEM>}
SET 76 = {<MapReduceTestStrictStrings.hListToEOF,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 81 = {<GHC.IO.Handle.FD.CAF>, <MAIN.SYSTEM>, <MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 83 = {<GHC.IO.Encoding.Iconv.CAF>, <GHC.IO.Handle.FD.CAF>, <MAIN.SYSTEM>, <MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 86 = {<GHC.Conc.Signal.CAF>, <>}
SET 95 = {<MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesManyToMany,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 96 = {<MAIN.SYSTEM>, <MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesManyToMany,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 100 = {<MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>, <MapReduceTestStrictStrings.hListToEOF,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 102 = {<MAIN.SYSTEM>, <MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>, <MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesManyToMany,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 103 = {<MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 136 = {<GHC.IO.Handle.FD.CAF>, <MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 143 = {<GHC.Conc.Sync.CAF>, <GHC.IO.Handle.FD.CAF>, <MAIN.SYSTEM>}
SET 144 = {<MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 145 = {<MAIN.SYSTEM>, <MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 146 = {<MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 147 = {<MAIN.SYSTEM>, <MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 148 = {<MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>, <MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
And the following .hp image:
EDIT 3: The previous code all used the packages:
Data.Text.IO
Data.Text
Data.Text.Read
When i use the lazy versions of them, the total time / memory / space usages doesn't really change: 2.62 secs, 2.25Gb allocations and 5.5MB space
The accepted solution: The lazy versions did not work because the hListToEOF forced a full file read (I expected the : constructor to work lazily).
The solution is to use he following imports:
import qualified Data.ByteString.Char8 as Str
import qualified Data.Text.Lazy.IO as TextIO
import qualified Data.Text.Lazy as T
import Data.Text.Lazy.Read
and in the singleResultBatch function the following modification:
readDistances = liftM ( (map textRead . T.lines)) $ TextIO.readFile path
Then the both the speed (2.72s) and the memory allocations (2.3GB) do not change, which is expected.
The heap profile (space usage) does improve (1.8MB instead of 5.5MB), as visible in:
You need to process data incrementally. Currently, hListToEOF
reads all the data in in one go, which you then slowly process (hence the initial memory spike as everything is read in, then a slow reduction as the list is deallocated).
Instead of doing your own IO via hListToEOF
, read/stream the files lazily (e.g. with readFile
from the Text.Lazy library) and map your processing functions over them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With