Here's some code that implements a small receiving server using conduit
, network-conduit
, and stm-conduit
. It receives data on a socket and then streams it through an STM-channel to the main thread.
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)
import System.Directory (removeFile)
import System.IO
type BSChan = TBMChan ByteString
listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
chan <- atomically $ newTBMChan bufSize
forkListener chan
return chan
where
forkListener chan = void . forkIO $ listen soc 2 >> loop where
loop = do
(conn, _) <- accept soc
sourceSocket conn $$ sinkTBMChan chan
close conn
loop
main :: IO ()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
socChan <- listenSocket soc 8
sourceTBMChan socChan $$ DCB.sinkHandle stdout
removeFile "mysock"
(In the real application, the stream of data from the socket gets merged with some others, which is why I don't handle it directly in the listener).
The problem is that, where I had expected this to stay open until the main thread is killed, instead it exits after the first message is received on the socket. I cannot work out why it does this, unless it's that the sink (on 2nd to last line) is exiting once it sees the end of the first stream of data. Can I persuade it not to do this? There's some stuff in Conduit
about making a source resumable, but not a sink.
From the documention of sinkTBMChan
:
When the sink is closed, the channel will close too.
So when the first socket handle closes, it causes the Source
from sourceSocket
to close, closing the connected sink which in turn closes the TBMChan
which propagates to sinkHandle
stopping the sink.
The simplest way to solve this is probably to change your loop
into a custom source that doesn't close between connections and connect that source into the TBMChan
.
listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
chan <- atomically $ newTBMChan bufSize
forkListener chan
return chan
where
forkListener chan = void . forkIO $ do
listen soc 2
loop $$ sinkTBMChan chan
loop = do
(conn, _) <- liftIO $ accept soc
sourceSocket conn
liftIO $ close conn
loop
Coordinating shutdown of writers and readers from a channel is a non-trivial problem, but you can reuse a solution from the pipes
ecosystem to solve this, which is to use the pipes-concurrency
library. This library provides several pipes
-independent utilities that you can reuse with conduit
libraries for communicating between readers and writers so that each side automatically correctly knows when to clean up and you can manually clean up either side as well, too.
The key function that you use from the pipes-concurrency
library is spawn
. Its type is:
spawn :: Buffer a -> IO (Output a, Input a)
The Buffer
specifies what underlying STM channel abstraction to use. Judging by your example code, it sounds like you want a Bounded
buffer:
spawn (Bounded 8) :: IO (Output a, Input a)
The a
can be anything in this case, so it can be a ByteString
, for example:
spawn (Bounded 8) :: IO (Output ByteString, Input ByteString)
The Input
and Output
behave like a mailbox. You add messages to the mailbox by send
ing data to the Output
s and you take messages out of the mailbox (in FIFO order) by recv
ing data from Input
s:
-- Returns `False` if the mailbox is sealed
send :: Output a -> a -> STM Bool
-- Returns `Nothing` if the mailbox is sealed
recv :: Input a -> STM (Maybe a)
The neat feature of pipes-concurrency
is that it instruments the garbage collector to automatically seal the mailbox if there either no readers or no writers to the mailbox. This avoids a common source of deadlocks.
If you were using the pipes
ecosystem you would normally use the following two higher-level utilities to read and write to mailbox.
-- Stream values into the mailbox until it is sealed
toOutput :: Output a -> Consumer a IO ()
-- Stream values from the mailbox until it is sealed
fromInput :: Input a -> Producer a IO ()
However, because the core machinery is pipes
-independent you can rewrite equivalent conduit
versions of these functions:
import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Pipes.Concurrent
toOutput' :: Output a -> Sink a IO ()
toOutput' o = awaitForever (\a -> lift $ atomically $ send o a)
fromInput' :: Input a -> Source IO a
fromInput' i = do
ma <- lift $ atomically $ recv i
case ma of
Nothing -> return ()
Just a -> do
yield a
fromInput' i
Then your main function would look like something like this:
main :: IO ()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
(output, input) <- spawn (Bounded 8)
forkIO $ readFromSocket soc $$ toOutput output
fromInput input $$ DCB.sinkHandle stdout
removeFile "mysock"
... where readFromSocket
would be some Source
that reads from your Socket
.
You can then freely write to the output
using other sources of data, too, and not worry about having to coordinate them or dispose of the input
or output
properly when you are done.
To learn more about pipes-concurrency
, I recommend reading the official tutorial.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With