I'm writing a simple script to run bunch of tasks in parallel using the Shelly library but I want to limit the max number of tasks running at any one time. The script takes a file with an input on each line and runs a task for that input. There are a few hundred inputs in the file and I want to limit to around 16 processes at a time.
The current script actually limits to 1 (well tries to) using a QSem with an initial count of 1. I seem to be missing something though because when I run on a test file with 4 inputs I see this:
Starting Starting Starting Starting Done Done Done Done
So the threads are not blocking on the QSem as I would expect, they're all running simultaneously. I've even gone so far as to implement my own semaphores both on MVar
and TVar
and neither worked the way I expected. I'm obviously missing something fundamental but what? I've also tried compiling the code and running it as a binary.
#!/usr/bin/env runhaskell {-# LANGUAGE TemplateHaskell, QuasiQuotes, DeriveDataTypeable, OverloadedStrings #-} import Shelly import Prelude hiding (FilePath) import Text.Shakespeare.Text (lt) import qualified Data.Text.Lazy as LT import Control.Monad (forM) import System.Environment (getArgs) import qualified Control.Concurrent.QSem as QSem import Control.Concurrent (forkIO, MVar, putMVar, newEmptyMVar, takeMVar) -- Define max number of simultaneous processes maxProcesses :: IO QSem.QSem maxProcesses = QSem.newQSem 1 bkGrnd :: ShIO a -> ShIO (MVar a) bkGrnd proc = do mvar <- liftIO newEmptyMVar _ <- liftIO $ forkIO $ do -- Block until there are free processes sem <- maxProcesses QSem.waitQSem sem putStrLn "Starting" -- Run the shell command result <- shelly $ silently proc liftIO $ putMVar mvar result putStrLn "Done" -- Signal that this process is done and another can run. QSem.signalQSem sem return mvar main :: IO () main = shelly $ silently $ do [img, file] <- liftIO $ getArgs contents <- readfile $ fromText $ LT.pack file -- Run a backgrounded process for each line of input. results <- forM (LT.lines contents) $ \line -> bkGrnd $ do runStdin <command> <arguments> liftIO $ mapM_ takeMVar results
As I said in my comment, each call to bkGrnd
creates its own semaphonre, allowing every thread to continue without waiting. I would try something like this instead, where the semaphore is created in the main
and passed each time to bkGrnd
.
bkGrnd :: QSem.QSem -> ShIO a -> ShIO (MVar a)
bkGrnd sem proc = do
mvar <- liftIO newEmptyMVar
_ <- liftIO $ forkIO $ do
-- Block until there are free processes
QSem.waitQSem sem
--
-- code continues as before
--
main :: IO ()
main = shelly $ silently $ do
[img, file] <- liftIO $ getArgs
contents <- readfile $ fromText $ LT.pack file
sem <- maxProcesses
-- Run a backgrounded process for each line of input.
results <- forM (LT.lines contents) $ \line -> bkGrnd sem $ do
runStdin <command> <arguments>
liftIO $ mapM_ takeMVar results
You have an answer, but I need to add: QSem and QSemN are not thread safe if killThread or asynchronous thread death is possible.
My bug report and patch are GHC trac ticket #3160. The fixed code is available as a new library called SafeSemaphore with module Control.Concurrent.MSem, MSemN, MSampleVar, and a bonus FairRWLock.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With