Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does my concurrent Haskell program terminate prematurely?

I have a UDP server that reflects every ping message it receives (this works well I think). I the client side I would then like to do two things:

  1. make sure that I fired off N (e.g. 10000) messages, and
  2. count the number of correctly received responses.

It seems that either because of the nature of UDP or because of the forkIO thing, my client code below ends prematurely/does not do any counting at all.

Also I am very surprised to see that the function tryOnePing returns 250 times the Int 4. Why could this be?

main = withSocketsDo $ do
        s <- socket AF_INET Datagram defaultProtocol
        hostAddr <- inet_addr host
        thread <- forkIO $ receiveMessages s
        -- is there any better way to eg to run that in parallel and make sure
        -- that sending/receiving are asynchronous? 


        -- forM_ [0 .. 10000] $ \i -> do
              -- sendTo s "ping" (SockAddrInet port hostAddr)
        -- actually this would be preferred since I can discard the Int 4 that
        -- it returns but forM or forM_ are out of scope here?

        let tryOnePing i = sendTo s "ping" (SockAddrInet port hostAddr)
        pings <- mapM tryOnePing [0 .. 1000]
        let c = length $ filter (\x -> x==4) pings

        -- killThread thread
        -- took that out to make sure the function receiveMessages does not
        -- end prematurely. still seems that it does

        sClose s
        print c
        -- return()

receiveMessages :: Socket -> IO ()
receiveMessages socket = forever $ do
        -- also tried here forM etc. instead of forever but no joy
        let recOnePing i = recv socket 1024
        msg <- mapM recOnePing [0 .. 1000]
        let r = length $ filter (\x -> x=="PING") msg
        print r
        print "END"
like image 254
J Fritsch Avatar asked Nov 25 '11 17:11

J Fritsch


1 Answers

The main problem here is that when your main thread finishes, all other threads gets killed automatically. You have to get the main thread to wait for the receiveMessages thread, or it will in all likelyhood simply finish before any responses have been received. One simple way of doing this is to use an MVar.

An MVar is a synchronized cell that can either be empty or hold exactly one value. The current thread will block if it tries to take from an empty MVar or insert into a full one. In this case, we don't care about the value itself, so we'll just store a () in it.

We'll start with the MVar empty. Then the main thread will fork off the receiver thread, send all the packets, and try to take the value from the MVar.

import Control.Concurrent.MVar

main = withSocketsDo $ do
    -- prepare socket, same as before

    done <- newEmptyMVar

    -- we need to pass the MVar to the receiver thread so that
    -- it can use it to signal us when it's done
    forkIO $ receiveMessages sock done

    -- send pings, same as before

    takeMVar done    -- blocks until receiver thread is done

In the receiver thread, we will receive all the messages and then put a () in the MVar to signal that we're done receiving.

receiveMessages socket done = do
    -- receive messages, same as before

    putMVar done ()  -- allows the main thread to be unblocked

This solves the main issue, and the program runs fine on my Ubuntu laptop, but there are a couple more things you want to take care of.

  • sendTo does not guarantee that the whole string will be sent. You'll have to check the return value to see how much was sent, and retry if not all of it was sent. This can happen even for a short message like "ping" if the send buffer is full.

  • recv requires a connected socket. You'll want to use recvFrom instead. (Although it still works on my PC for some unknown reason).

  • Printing to standard output is not synchronized, so you might want to alter this so that the MVar will be used to communicate the number of received packets instead of just (). That way, you can do all the output from the main thread. Alternatively, use another MVar as a mutex to control access to standard output.

Finally, I recommend reading the documentation of Network.Socket, Control.Concurrent and Control.Concurrent.MVar carefully. Most of my answer is stitched together from information found there.

like image 180
hammar Avatar answered Sep 28 '22 12:09

hammar