Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using TChan with Timeout

I have a TChan as input for a thread which should behave like this:

If sombody writes to the TChan within a specific time, the content should be retrieved. If there is nothing written within the specified time, it should unblock and continue with Nothing.

My attempt on this was to use the timeout function from System.Timeout like this:

timeout 1000000 $ atomically $ readTChan pktChannel

This seemed to work but now I discovered, that I am sometimes loosing packets (they are written to the channel, but not read on the other side. In the log I get this:

2014.063.11.53.43.588365 Pushing Recorded Packet: 2 1439
2014.063.11.53.43.592319 Run into timeout
2014.063.11.53.44.593396 Run into timeout
2014.063.11.53.44.593553 Pushing Recorded Packet: 3 1439
2014.063.11.53.44.597177 Sending Recorded Packet: 3 1439

Where "Pushing Recorded Packet" is the writing from the one thread and "Sending Recorded Packet" is the reading from the TChan in the sender thread. The line with Sending Recorded Packet 2 1439 is missing, which would indicate a successful read from the TChan.

It seems that if the timeout is received at the wrong point in time, the channel looses the packet. I suspect that the threadKill function used inside timeout and STM don't play well together.

Is this correct? Does somebody have another solution that does not loose the packet?

like image 626
MichaelO Avatar asked Mar 04 '14 12:03

MichaelO


1 Answers

Use registerDelay, an STM function, to signal a TVar when the timeout is reached. You can then use the orElse function or the Alternative operator <|> to select between the next TChan value or the timeout.

import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import System.Random

-- write random values after a random delay
packetWriter :: Int -> TChan Int -> IO ()
packetWriter maxDelay chan = do
  let xs = randomRs (10000 :: Int, maxDelay + 50000) (mkStdGen 24036583)
  forM_ xs $ \ x -> do
    threadDelay x
    atomically $ writeTChan chan x

-- block (retry) until the delay TVar is set to True
fini :: TVar Bool -> STM ()
fini = check <=< readTVar

-- Read the next value from a TChan or timeout
readTChanTimeout :: Int -> TChan a -> IO (Maybe a)
readTChanTimeout timeoutAfter pktChannel = do
  delay <- registerDelay timeoutAfter
  atomically $
        Just <$> readTChan pktChannel
    <|> Nothing <$ fini delay

-- | Print packets until a timeout is reached
readLoop :: Show a => Int -> TChan a -> IO ()
readLoop timeoutAfter pktChannel = do
  res <- readTChanTimeout timeoutAfter pktChannel
  case res of
    Nothing -> putStrLn "timeout"
    Just val -> do
      putStrLn $ "packet: " ++ show val
      readLoop timeoutAfter pktChannel

main :: IO ()
main = do
  let timeoutAfter = 1000000

  -- spin up a packet writer simulation
  pktChannel <- newTChanIO
  tid <- forkIO $ packetWriter timeoutAfter pktChannel

  readLoop timeoutAfter pktChannel

  killThread tid
like image 75
Nathan Howell Avatar answered Oct 11 '22 03:10

Nathan Howell