Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I create a thread pool?

Tags:

haskell

Sometimes I want to run a maximum amount of IO actions in parallel at once for network-activity, etc. I whipped up a small concurrent thread function which works well with https://gist.github.com/810920, but this isn't really a pool as all IO actions must finish before others can start.

The type of what I'm looking for would be something like:

runPool :: Int -> [IO a] -> IO [a]

and should be able to operate on finite and infinite lists.

The pipes package looks like it would be able to achieve this quite well, but I feel there is probably a similar solution to the gist I have provided just using mvars, etc, from the haskell-platform.

Has anyone encountered an idiomatic solution without any heavy dependencies?

like image 355
Lyndon Maydwell Avatar asked Feb 08 '12 12:02

Lyndon Maydwell


People also ask

How do you create a thread pool in C++?

Threadpool class is initialized with some fixed number of worker threads which can be done by thread::hardware_concurrency() function. Initially we can also create zero (0) threads when there is no work to be done. These created threads wait on the condition variable.

What does thread pool need?

A thread pool helps mitigate the issue of performance by reducing the number of threads needed and managing their lifecycle. Essentially, threads are kept in the thread pool until they're needed, after which they execute the task and return the pool to be reused later.


2 Answers

You need a thread pool, if you want something short, you could get inspiration from Control.ThreadPool (from the control-engine package which also provide more general functions), for instance threadPoolIO is just :

threadPoolIO :: Int -> (a -> IO b) -> IO (Chan a, Chan b)
threadPoolIO nr mutator = do
    input <- newChan
    output <- newChan
    forM_ [1..nr] $
        \_ -> forkIO (forever $ do
            i <- readChan input
            o <- mutator i
            writeChan output o)
    return (input, output)

It use two Chan for communication with the outside but that's usually what you want, it really help writing code that don't mess up.

If you absolutely want to wrap it up in a function of your type you can encapsulate the communication too :

runPool :: Int -> [IO a] -> IO [a]
runPool n as = do
  (input, output) <- threadPoolIO n (id)
  forM_ as $ writeChan input
  sequence (repeat (length as) $ readChan output)

This won't keep the order of your actions, is that a problem (it's easy to correct by transmitting the index of the action or just using an array instead to store the responses) ?

Note : the n threads will stay alive forever with this simplistic version, adding a "killAll" returned action to threadPoolIO would resolve this problem handily if you intend to create and trash several of those pool in a long running application (if not, given the weight of threads in Haskell, it's probably not worth the bother). Note that this function works on finite lists only, that's because IO is normally strict so you can't start to process elements of IO [a] before the whole list is produced, if you really want that you'll have either to use lazy IO with unsafeInterleaveIO (maybe not the best idea) or completely change your model and use something like conduits to stream your results.

like image 186
Jedai Avatar answered Sep 20 '22 12:09

Jedai


let me hackage that for you.

http://hackage.haskell.org/package/resource-pool ?

http://hackage.haskell.org/package/threads-pool ?

like image 34
sclv Avatar answered Sep 21 '22 12:09

sclv