Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficient binary I/O over a network

I'm trying to write a small Haskell program that talks a binary network protocol, and I'm having a surprising amount of difficulty.

It seems clear that the binary data should be stored as ByteString.

Question: Should I just hGet / hPut individual multi-byte integers, or is it more performant to build a big ByteString of the whole thing and use that?

It seems like the binary package should be useful here. However, binary deals only with lazy ByteString values.

Question: Does hGet on a lazy ByteString actually read the specified number of bytes strictly? Or does it try to do some kind of lazy I/O? (I do not want lazy I/O!)

Question: Why does the documentation not specify this?

The code looks like it's going to contain a lot of "get the next integer, compare it to this value, if no then throw an error, otherwise continue to the next step..." I'm not sure how to cleanly structure that without writing spaghetti code.

In summary, what I'm trying to do is quite simple, but I seem to be struggling for a way to make the code look simple. Maybe I'm just over-thinking this and missing something obvious...

like image 829
MathematicalOrchid Avatar asked Feb 09 '15 21:02

MathematicalOrchid


2 Answers

TCP requires that the application provide its own message boundary markers. A simple protocol to mark message boundaries is to send the length of a chunk of data, the chunk of data, and whether there are remaining chunks that are part of the same message. The optimal size for the header that holds the message boundary information depends on the distribution of message sizes.

Developing our own little message protocol, we'll use two bytes for our headers. The most significant bit from the bytes (treated as a Word16) will hold whether or not there are remaining chunks in the message. The remaining 15 bits will hold the length of the message in bytes. This will allow chunk sizes up to 32k, larger than typical TCP packets. A two byte header will be less-than-optimal if messages are typically very small, especially if they are smaller than 127 bytes.

We're going to use network-simple for the networking portion of our code. We'll serialize or deserialize messages with the binary package which encodes and decodes to and from lazy ByteStrings.

import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString as B

import Network.Simple.TCP 
import Data.Bits
import Data.Binary
import Data.Functor
import Control.Monad.IO.Class

The first utility we will need is the ability to write Word16 headers into strict ByteStrings and read them back out again. We'll write them in big-endian order. Alternatively, these could be written in terms of the Binary instance for Word16.

writeBE :: Word16 -> B.ByteString
writeBE x = B.pack . map fromIntegral $ [(x .&. 0xFF00) `shiftR` 8, x .&. 0xFF]

readBE :: B.ByteString -> Maybe Word16
readBE s =
    case map fromIntegral . B.unpack $ s of
        [w1, w0] -> Just $ w1 `shiftL` 8 .|. w0
        _        -> Nothing

The main challenge will be to send and recieve the lazy ByteStrings forced on us by the binary package. Since we will only be able to send up to 32k bytes at a time, we need to be able to rechunk a lazy bytestring into chunks with a total known length no more than our maximum. A single chunk might already be more than the maximum; any chunk that doesn't fit into our new chunks is split across multiple chunks.

rechunk :: Int -> [B.ByteString] -> [(Int, [B.ByteString])]
rechunk n = go [] 0 . filter (not . B.null)
    where
        go acc l []     = [(l, reverse acc)]
        go acc l (x:xs) =
            let
                lx = B.length x
                l' = lx + l
            in
                if l' <= n
                then go (x:acc) l' xs
                else
                    let (x0, x1) = B.splitAt (n-l) x
                    in (n, reverse (x0:acc)) : go [] 0 (x1:xs)

recvExactly will loop until all of the bytes we requested have been received.

recvExactly :: MonadIO m => Socket -> Int -> m (Maybe [B.ByteString])
recvExactly s toRead = go [] toRead
    where
        go acc toRead = do
            body <- recv s toRead
            maybe (return Nothing) (go' acc toRead) body
        go' acc toRead body =
            if B.length body < toRead
            then go (body:acc) (toRead - B.length body)
            else return . Just . reverse $ acc

Sending a lazy ByteString consists of breaking it into chunks of a size we know we can send and sending each chunk along with the header holding the size and whether there are any more chunks.

sendLazyBS :: (MonadIO m) => Socket -> L.ByteString -> m ()
sendLazyBS s = go . rechunk maxChunk . L.toChunks
    where
        maxChunk = 0x7FFF
        go [] = return ()
        go ((li, ss):xs) = do
            let l = fromIntegral li
            let h = writeBE $ if null xs then l else l .|. 0x8000
            sendMany s (h:ss)
            go xs

Receiving a lazy ByteString consists of reading the two byte header, reading a chunk of the size indicated by the header, and continuing to read as long as the header indicated there are more chunks.

recvLazyBS :: (MonadIO m, Functor m) => Socket -> m (Maybe L.ByteString)
recvLazyBS s = fmap L.fromChunks <$> go [] 
    where
        go acc = do
            header <- recvExactly s 2
            maybe (return Nothing) (go' acc) (header >>= readBE . B.concat)
        go' acc h = do
            body <- recvExactly s . fromIntegral $ h .&. 0x7FFF
            let next = if h .&. 0x8000 /= 0
                       then go
                       else return . Just . concat . reverse
            maybe (return Nothing) (next . (:acc) ) body     

Sending or receiving a message that has a Binary instance is just sending an encoded lazy ByteString or receiving the lazy ByteString and decodeing it.

sendBinary :: (MonadIO m, Binary a) => Socket -> a -> m ()
sendBinary s = sendLazyBS s . encode

recvBinary :: (MonadIO m, Binary a, Functor m) => Socket -> m (Maybe a)
recvBinary s = d . fmap decodeOrFail <$> recvLazyBS s
    where
        d (Just (Right (_, _, x))) = Just x
        d _                        = Nothing
like image 181
Cirdec Avatar answered Oct 09 '22 10:10

Cirdec


Re question 1...

If the handle is configured with NoBuffering each hPutStr call will generate a write system call. This will incur a huge performance penalty for a large number of small writes. See, e.g., this SO answer for some benchmarking: https://stackoverflow.com/a/28146677/866915

On the other hand, if the handle has buffering enabled you will need to explicitly flush the handle to ensure that the buffered data is sent.

I'm assuming you're using a streaming protocol like TCP. With UDP you obviously have to form and send each message as an atomic unit.

Re question #2...

Reading the code it appears that hGet for lazy bytestrings will read from the handle in chunks of defaultChunkSize which is about 32k.

Update: It appears that hGet does not perform lazy IO in this case. Here is some code to test this. feed:

#!/usr/bin/env perl
$| = 1;
my $c = 0;
my $k = "1" x 1024;
while (1) {
  syswrite(STDOUT, $k);
  $c++;
  print STDERR "wrote 1k count = $c\n";
}

Test.hs:

import qualified Data.ByteString.Lazy as LBS
import System.IO

main = do
  s <- LBS.hGet stdin 320000
  let s2 = LBS.take 10 s
  print $ ("Length s2 = ", s2)

Running perl feed | runhaskell Test.hs it is clear that the Haskell program demands all 320k from the perl program even though it only uses the first 10 bytes.

like image 32
ErikR Avatar answered Oct 09 '22 11:10

ErikR