Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

STM with partial atomicity for certain TVars

I am doing things with STM and have among other things used the TBQueue data structure with great success. An useful feature I've been using it for involves reading from it based on a precondition in a TVar, basically like so:

shouldRead <- readTVar shouldReadVar
if shouldRead
  then do
    a <- readTBQueue queue
    doSomethingWith a
  else doSomethingElse

If we assume that queue is empty and shouldReadVar contains True before executing this block, it will result in readTBQueue calling retry, and the block will be re-executed when shouldReadVar contains False or queue contains an element, whatever happens first.


I am now in need of a synchronous channel data structure, similar to the structure described in this article (Please read it if you want to understand this question), except it needs to be readable with a pre-condition like in the previous example, and possibly compose with other stuff as well.

Let's call this data structure SyncChan with writeSyncChan and readSyncChan operations defined on it.

And here's a possible use case: This (pseudo) code (which will not work because I mix STM/IO concepts):

shouldRead <- readTVar shouldReadVar
if shouldRead
  then do
    a <- readSyncChan syncChan
    doSomethingWith a
  else doSomethingElse

Assuming that no other thread is currently blocking on a writeSyncChan call, and shouldReadChan contains True, I want the block to "retry" until either shouldReadChan contains False, or a different thread blocks on a writeSyncChan. In other words: when one thread retrys on writeSyncChan and another thread blocks reaches a readSyncChan, or vice versa, I want the value to be transferred along the channel. In all other cases, both sides should be in a retry state and thus react to a change in shouldReadVar, so that the read or write can be cancelled.

The naïve approach described in the article linked above using two (T)MVars is of course not possible. Because the data structure is synchronous, it is impossible to use it within two atomically blocks, because you cannot change one TMVar and wait for another TMVar to be changed in an atomic context.

Instead, I am looking for a kind of partial atomicity, where I can "commit" a certain part of a transaction and only roll it back when certain variables change, but not others. If I have "msg" and "ack" variables like the first example in the article above, I want to be able to write to the "msg" variable, then wait for either a value to arrive on "ack", or for my other transactional variables to change. If other transactional variables change, the whole atomic block should be retried, and if an "ack" value arrives, the transaction should continue as it was in the previous state. For the reading side, something similar should happen, except I would of course be reading from "msg" and writing to "ack."

Is this possible to do using GHC STM, or do I need to do manual MVar/rollback handling?

like image 507
dflemstr Avatar asked Jun 13 '13 21:06

dflemstr


1 Answers

This is what you want:

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad

data SyncChan a = SyncChan (TMVar a) (TMVar ())

newSyncChan :: IO (SyncChan a)
newSyncChan = do
    msg <- newEmptyTMVarIO
    ack <- newEmptyTMVarIO
    return (SyncChan msg ack)

readIf :: SyncChan a -> TVar Bool -> STM (Maybe a)
readIf (SyncChan msg ack) shouldReadVar = do
    b <- readTVar shouldReadVar
    if b
        then do
            a <- takeTMVar msg
            putTMVar ack ()
            return (Just a)
        else return Nothing

write :: SyncChan a -> a -> IO ()
write (SyncChan msg ack) a = do
    atomically $ putTMVar msg a
    atomically $ takeTMVar ack

main = do
    sc <- newSyncChan
    tv <- newTVarIO True
    forkIO $ forever $ forM_ [False, True] $ \b -> do
        threadDelay 2000000
        atomically $ writeTVar tv b
    forkIO $ forM_ [0..] $ \i -> do
        putStrLn "Writing..."
        write sc i
        putStrLn "Write Complete"
        threadDelay 300000
    forever $ do
        putStrLn "Reading..."
        a <- atomically $ readIf sc tv
        print a
        putStrLn "Read Complete"

This gives the behavior you had in mind. While the TVar is True the input and output ends will be synchronized with each other. When the TVar switches to False then the read end freely aborts and returns Nothing.

like image 86
Gabriella Gonzalez Avatar answered Oct 23 '22 22:10

Gabriella Gonzalez