Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wait on multiple `MVar`s?

I'd like a function

takeAnyMVar :: NonEmpty (MVar a) -> IO (MVar a, a)

which waits on multiple MVars simultaneously and returns the first MVar (and its value) that becomes available.

In particular, it should only cause a single one of the MVars in the input list to be in an empty state that wasn't empty before.


I have an implementation, but it is both inefficient and incorrect:

import Data.List.NonEmpty          -- from semigroups
import Control.Concurrent.Async    -- from async
import Control.Concurrent.MVar
import Control.Applicative
import Data.Foldable

takeAnyMVar :: NonEmpty (MVar a) -> IO (MVar a, a)
takeAnyMVar = runConcurrently . foldr1 (<|>) . fmap (Concurrently . takeMVar')
    where
        takeMVar' mvar = takeMVar mvar >>= \val -> return (mvar, val)

It's inefficient because it has to start a new thread for every MVar in the list.

It is incorrect, because multiple threads might take their MVar and leave it in an empty state before they can be cancelled by the (<|>) operator (which calls race in the end). One of them will succeed and return its result, the others will discard their results but leave their MVars empty.


On Windows, there is the WaitForMultipleObjects function, which allows to wait on multiple wait handles. I suspect there is something similar in other operating systems.

Given that MVar is probably implemented in terms of OS primitives, it should be possible to write a function with the above sematics. But maybe you need access to the implementation of MVar in order to do that.

The same applies to Chan, QSem, MSem, and other concurrency primitives.

like image 292
Tobias Brandt Avatar asked Oct 26 '25 20:10

Tobias Brandt


1 Answers

If your function is the only consumer (and runs in just one thread), I guess that with base-4.8 you could use readMVar in the threads and empty only the TVar that is being returned, keeping the others untouched.

As suggested by @DanielWagner, STM would be much simpler.

import qualified Data.Foldable as F
import Data.List.NonEmpty
import Control.Concurrent.STM.TMVar
import Control.Monad
import Control.Monad.STM

takeAnyMVar :: NonEmpty (TMVar a) -> STM (TMVar a, a)
takeAnyMVar = F.foldr1 orElse . fmap (\t -> liftM ((,) t) $ takeTMVar t)

Here we simply try to takeTMVar each of them, combined with orElse. If all are empty, STM will be smart enough to wait until one of them becomes full and then restart the transaction.

like image 75
Petr Avatar answered Oct 28 '25 18:10

Petr