Suppose I have a function f
which takes an integer argument. f
may not terminate on some arguments, but its result is equally valuable. (For concreteness, the argument could be the seed to a random number generator, which is passed to a SAT solver.)
I want to use concurrency and invoke f 1
, f 2
, f 3
, etc., and return when the first one finishes. So, each thread should be running code that looks like
comp <- start_proc (f 1)
wait(comp || anyDone) -- wait for _either_ of these signals to be true
if comp then
set anyDone = True
What's the easiest way to do this? The AMB operator comes to mind, but I'd need to run all processes simultaneously (e.g. on a 24- or 80-core machine). (Distributed computing solutions would be even better.) A superficial look at the AMB wiki page suggests it may not support non-terminating processes?
Currently, I'm not getting the answers to work with what I want. I think this is probably more of an issue with how I'm creating processes than anything else.
Define
runProc (x:xs) =
createProcess (proc x xs) >>= \(_, _, _, h) -> waitForProcess h
Then, I want to race runProc ["zsh", "-c", "sleep 3"]
and runProc ["ls"]
. I modified Thomas' answer a little, but it didn't work.
raceL :: [IO α] -> IO α
raceL ops = do
mv <- newEmptyMVar
tids <- forM ops (\op -> forkIO (op >>= putMVar mv))
answer <- takeMVar mv
mapM_ killThread tids
return answer
Compiling with -threaded
and running with +RTS -N
(I have a 4-core machine) doesn't seem to help.
Why not just an MVar
and forkIO
?
import Control.Concurrent
import Control.Concurrent.MVar
import System.Environment
import Control.Monad
main = do
mv <- newEmptyMVar
[nrThreads] <- liftM (map read) getArgs
tids <- replicateM nrThreads (forkIO $ operation mv)
answer <- takeMVar mv
mapM_ killThread tids
operation :: MVar Int -> IO ()
operation mv = putMVar mv 5
This will fork nrThreads
light weight threads. Once one thread has finished it should place the answer in the provided MVar. All other threads will then be killed by the main thread. No explicit polling is needed as the GHC RTS will reschedule main
once the MVar
becomes non-empty.
Instead of amb, consider unamb! It provides a handful of nice primitives for racing computations, both pure and impure. For example:
Prelude Data.Unamb> unamb (last [1..]) 32
32
Prelude Data.Unamb> race (threadDelay 5000000 >> return 3) readLn
Prelude Data.Unamb Control.Concurrent> race (threadDelay 5000000 >> return 3) readLn
56
56
Prelude Data.Unamb Control.Concurrent> race (threadDelay 5000000 >> return 3) readLn
3
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With