Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I optimize parallel sorting to improve temporal performance?

I have an algorithm for parallel sorting a list of a given length:

import Control.Parallel (par, pseq)
import Data.Time.Clock (diffUTCTime, getCurrentTime)
import System.Environment (getArgs)
import System.Random (StdGen, getStdGen, randoms)


parSort :: (Ord a) => [a] -> [a]
parSort (x:xs)    = force greater `par` (force lesser `pseq`
                                         (lesser ++ x:greater))
    where lesser  = parSort [y | y <- xs, y <  x]
          greater = parSort [y | y <- xs, y >= x]
parSort _         = []

sort :: (Ord a) => [a] -> [a]
sort (x:xs) = lesser ++ x:greater
    where lesser  = sort [y | y <- xs, y <  x]
          greater = sort [y | y <- xs, y >= x]
sort _ = []

parSort2 :: (Ord a) => Int -> [a] -> [a]
parSort2 d list@(x:xs)
  | d <= 0     = sort list
  | otherwise = force greater `par` (force lesser `pseq`
                                     (lesser ++ x:greater))
      where lesser      = parSort2 d' [y | y <- xs, y <  x]
            greater     = parSort2 d' [y | y <- xs, y >= x]
            d' = d - 1
parSort2 _ _              = []

force :: [a] -> ()
force xs = go xs `pseq` ()
    where go (_:xs) = go xs
          go [] = 1


randomInts :: Int -> StdGen -> [Int]
randomInts k g = let result = take k (randoms g)
                 in force result `seq` result

testFunction = parSort

main = do
  args <- getArgs
  let count | null args = 500000
            | otherwise = read (head args)
  input <- randomInts count `fmap` getStdGen
  start <- getCurrentTime
  let sorted = testFunction input
  putStrLn $ "Sort list N = " ++ show (length sorted)
  end <- getCurrentTime
  putStrLn $ show (end `diffUTCTime` start) 

I want to get the time to perform parallel sorting on 2, 3 and 4 processor cores less than 1 core. At the moment, this result I can not achieve. Here are my program launches:

1. SortList +RTS -N1 -RTS 10000000
time = 41.2 s
2.SortList +RTS -N3 -RTS 10000000
time = 39.55 s
3.SortList +RTS -N4 -RTS 10000000
time = 54.2 s

What can I do?

Update 1:

testFunction = parSort2 60
like image 547
Vasiliy Avatar asked Apr 23 '19 12:04

Vasiliy


1 Answers

Here's one idea you can play around with, using Data.Map. For simplicity and performance, I assume substitutivity for the element type, so we can count occurrences rather than storing lists of elements. I'm confident that you can get better results using some fancy array algorithm, but this is simple and (essentially) functional.

When writing a parallel algorithm, we want to minimize the amount of work that must be done sequentially. When sorting a list, there's one thing that we really can't avoid doing sequentially: splitting up the list into pieces for multiple threads to work on. We'd like to get that done with as little effort as possible, and then try to work mostly in parallel from then on.

Let's start with a simple sequential algorithm.

{-# language BangPatterns, TupleSections #-}
import qualified Data.Map.Strict as M
import Data.Map (Map)
import Data.List
import Control.Parallel.Strategies

type Bag a = Map a Int

ssort :: Ord a => [a] -> [a]
ssort xs =
  let m = M.fromListWith (+) $ (,1) <$> xs
  in concat [replicate c x | (x,c) <- M.toList m]

How can we parallelize this? First, let's break up the list into pieces. There are various ways to do this, none of them great. Assuming a small number of capabilities, I think it's reasonable to let each of them walk the list itself. Feel free to experiment with other approaches.

-- | Every Nth element, including the first
everyNth :: Int -> [a] -> [a]
everyNth n | n <= 0 = error "What you doing?"
everyNth n = go 0 where
  go !_ [] = []
  go 0 (x : xs) = x : go (n - 1) xs
  go k (_ : xs) = go (k - 1) xs

-- | Divide up a list into N pieces fairly. Walking each list in the
-- result will walk the original list.
splatter :: Int -> [a] -> [[a]]
splatter n = map (everyNth n) . take n . tails

Now that we have pieces of list, we spark threads to convert them to bags.

parMakeBags :: Ord a => [[a]] -> Eval [Bag a]
parMakeBags xs = 
  traverse (rpar . M.fromListWith (+)) $ map (,1) <$> xs

Now we can repeatedly merge pairs of bags until we have just one.

parMergeBags_ :: Ord a => [Bag a] -> Eval (Bag a)
parMergeBags_ [] = pure M.empty
parMergeBags_ [t] = pure t
parMergeBags_ q = parMergeBags_ =<< go q where
  go [] = pure []
  go [t] = pure [t]
  go (t1:t2:ts) = (:) <$> rpar (M.unionWith (+) t1 t2) <*> go ts

But ... there's a problem. In each round of merges, we use only half as many capabilities as we did in the previous one, and perform the final merge with just one capability. Ouch! To fix this, we'll need to parallelize unionWith. Fortunately, this is easy!

import Data.Map.Internal (Map (..), splitLookup, link)

parUnionWith
  :: Ord k
  => (v -> v -> v)
  -> Int -- Number of threads to spark
  -> Map k v
  -> Map k v
  -> Eval (Map k v)
parUnionWith f n t1 t2 | n <= 1 = rseq $ M.unionWith f t1 t2
parUnionWith _ !_ Tip t2 = rseq t2
parUnionWith _ !_ t1 Tip = rseq t1
parUnionWith f n (Bin _ k1 x1 l1 r1) t2 = case splitLookup k1 t2 of
  (l2, mb, r2) -> do
    l1l2 <- parEval $ parUnionWith f (n `quot` 2) l1 l2
    r1r2 <- parUnionWith f (n `quot` 2) r1 r2
    case mb of
      Nothing -> rseq $ link k1 x1 l1l2 r1r2
      Just x2 -> rseq $ link k1 fx1x2 l1l2 r1r2
        where !fx1x2 = f x1 x2

Now we can fully parallelize bag merging:

-- Uses the given number of capabilities per merge, initially,
-- doubling for each round.
parMergeBags :: Ord a => Int -> [Bag a] -> Eval (Bag a)
parMergeBags !_ [] = pure M.empty
parMergeBags !_ [t] = pure t
parMergeBags n q = parMergeBags (n * 2) =<< go q where
  go [] = pure []
  go [t] = pure [t]
  go (t1:t2:ts) = (:) <$> parEval (parUnionWith (+) n t1 t2) <*> go ts

We can then implement a parallel merge like this:

parMerge :: Ord a => [[a]] -> Eval [a]
parMerge xs = do
  bags <- parMakeBags xs
  -- Why 2 and not one? We only have half as many
  -- pairs as we have lists (capabilities we want to use)
  -- so we double up.
  m <- parMergeBags 2 bags
  pure $ concat [replicate c x | (x,c) <- M.toList m]

Putting the pieces together,

parSort :: Ord a => Int -> [a] -> Eval [a]
parSort n = parMerge . splatter n

pSort :: Ord a => Int -> [a] -> [a]
pSort n = runEval . parMerge . splatter n

There's just one sequential piece remaining that we can parallelize: converting the final bag to a list. Is it worth parallelizing? I'm pretty sure that in practice it is not. But let's do it anyway, just for fun! To avoid considerable extra complexity, I'll assume that there aren't large numbers of equal elements; repeated elements in the result will lead to some work (thunks) remaining in the result list.

We'll need a basic partial list spine forcer:

-- | Force the first n conses of a list
walkList :: Int -> [a] -> ()
walkList n _ | n <= 0 = ()
walkList _ [] = ()
walkList n (_:xs) = walkList (n - 1) xs

And now we can convert the bag to a list in parallel chunks without paying for concatenation:

-- | Use up to the given number of threads to convert a bag
-- to a list, appending the final list argument.
parToListPlus :: Int -> Bag k -> [k] -> Eval [k]
parToListPlus n m lst | n <= 1 = do
  rseq (walkList (M.size m) res)
  pure res
  -- Note: the concat and ++ should fuse away when compiling with
  -- optimization.
  where res = concat [replicate c x | (x,c) <- M.toList m] ++ lst
parToListPlus _ Tip lst = pure lst
parToListPlus n (Bin _ x c l r) lst = do
  r' <- parEval $ parToListPlus (n `quot` 2) r lst
  res <- parToListPlus (n `quot` 2) l $ replicate c x ++ r'
  rseq r' -- make sure the right side is finished
  pure res

And then we modify the merger accordingly:

parMerge :: Ord a => Int -> [[a]] -> Eval [a]
parMerge n xs = do
  bags <- parMakeBags xs
  m <- parMergeBags 2 bags
  parToListPlus n m []
like image 79
dfeuer Avatar answered Nov 01 '22 18:11

dfeuer