Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Limiting pipes based on time?

Is it possible to create pipes that get all values that have been sent downstream in a certain time period? I'm implementing a server where the protocol allows me to concatenate outgoing packets and compress them together, so I'd like to effectively "empty out" the queue of downstream ByteStrings every 100ms and mappend them together to then yield on to the next pipe which does the compression.

like image 599
user3261399 Avatar asked Oct 01 '22 09:10

user3261399


1 Answers

Here's a solution using pipes-concurrency. You give it any Input and it will periodically drain the input of all values:

import Control.Applicative ((<|>))
import Control.Concurrent (threadDelay)
import Data.Foldable (forM_)
import Pipes
import Pipes.Concurrent

drainAll :: Input a -> STM (Maybe [a])
drainAll i = do
    ma <- recv i
    case ma of
        Nothing -> return Nothing
        Just a  -> loop (a:)
  where
    loop diffAs = do
        ma <- recv i <|> return Nothing
        case ma of
            Nothing -> return (Just (diffAs []))
            Just a  -> loop (diffAs . (a:))

bucketsEvery :: Int -> Input a -> Producer [a] IO ()
bucketsEvery microseconds i = loop
  where
    loop = do
        lift $ threadDelay microseconds
        ma <- lift $ atomically $ drainAll i
        forM_ ma $ \a -> do
            yield a
            loop

This gives you much greater control over how you consume elements from upstream, by selecting the type of Buffer you use to build the Input.

If you're new to pipes-concurrency, you can read the tutorial which explains how to use spawn, Buffer and Input.

like image 78
Gabriella Gonzalez Avatar answered Oct 13 '22 10:10

Gabriella Gonzalez