Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How would I pipe with a timeout that resets with each incoming?

The withTimeout function is suppose to pipe ConsoleEvent with a CeTimeout sent every s :: Int seconds if nothing has been received. Instead it fails to send the CeTimeout events at the appropriate times. One CeTimeout event is replaced for other events if greater than s seconds have passed with the original event being lost. Also instead of one CeTimeout event, it should be n*s CeTimeout events with n counting for each s second period that has passed. Where is the mistake, and what would be the correction? Thanks!

withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
  where
    work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ()) 
    work =
      do
        (oSent, iKept) <- spawn $ bounded 1
        (oKept, iSent) <- spawn $ unbounded
        (oTimeout, iTimeout) <- spawn $ bounded 1

        tid <- launchTimeout oTimeout >>= newMVar

        forkIO $ do
          runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept

        forkIO $ do
          runEffect . forever $ fromInput iTimeout >-> toOutput oKept

        return $ do
          await >>= (liftIO . guardedSend oSent)
          (liftIO . guardedRecv $ iSent) >>= yield

    guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
    guardedSend o ce =
      (atomically $ send o ce) >>= \case
        True -> return ()
        otherwise -> die $ "withTimeout can not send"

    guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
    guardedRecv i =
      (atomically $ recv i) >>= \case
        Just a -> return a
        otherwise -> die $ "withTimeout can not recv"

    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid = 
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

Here is a fully executable script.

like image 793
Vanson Samuel Avatar asked Sep 27 '18 15:09

Vanson Samuel


1 Answers

It seems like a Pipe will only allow one yield per await. This means that a CeTimeout can not arbitrarily be sent down the pipe because nothing came into the pipe to cause the flow. I will have to go through the source to confirm this; in the meantime this function has been refactored to return a Pipe and a Producer instead of just a Pipe. The Producer can then be joined back in the calling function. The initial plan was to return just a Pipe so that the calling function would not have to do any additional work to make timeouts work. That would have been a more self contained solution. This alternative is nice in that it is more explicit. The timeouts won't look like they are appearing out of thin air to someone that is not familiar with the pipeline.

withTimeout :: (MonadIO t) => Int -> IO (Pipe ConsoleEvent ConsoleEvent t (), Producer ConsoleEvent t ())
withTimeout ((* 1000000) -> s) =
  do
    (oTimeout, iTimeout) <- spawn $ bounded 1
    vTid <- launchTimeout oTimeout >>= newMVar

    return (factorTimeout vTid oTimeout, fromInput iTimeout)
  where
    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid = 
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: (MonadIO t) => MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent t ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

main :: IO ()
main =
  do
    hSetBuffering stdin NoBuffering
    hSetEcho stdin False

    exitSemaphore <- newEmptyMVar
    (o1, i1) <- spawn $ bounded 1
    (o2, i2) <- spawn $ bounded 1

    (timeoutTrap, timeoutRender) <- withTimeout 2

    runEffect $ yield CeBegan >-> toOutput o1

    forkIO $ do
      runEffect . forever $ chars >-> toOutput o1
      putMVar exitSemaphore ()

    -- other inputs would be piped to o1 here

    forkIO $ do
      runEffect . forever $ fromInput i1 >-> timeoutTrap >-> toOutput o2
      putMVar exitSemaphore ()

    forkIO $ do
      runEffect . forever $ timeoutRender >-> toOutput o2
      putMVar exitSemaphore ()

    forkIO $ do
      -- logic would be done before dumpPipe
      runEffect . forever $ fromInput i2 >-> dumpPipe >-> (await >> return ())
      putMVar exitSemaphore ()

    takeMVar exitSemaphore

Here is a fully executable script.

like image 64
Vanson Samuel Avatar answered Nov 15 '22 06:11

Vanson Samuel