Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ZeroMQ PUSH/PULL and lost message

I'm making use of ZeroMQ from .NET and got stuck trying to fix a weird issue. I've got a socket of type PUSH and one of type PULL over TCP. When the client disconnects, the server is still able to to send a message (note that no flags are passed to the Socket.Send method) which gets completely lots before starting to block and waiting for the client to reconnect and delivering messages I try to send afterwards.

How can I avoid losing the message (or in the worst case test if a client is connected and if not send a dummy message which I can afford losing)?

Thanks in advance!

Edit: further testing reveals that if I wait 1 second after sending the first message after disconnection by the client, the second one will block, but if I don't wait at all I can send as many messages as I want and they'll all get lost. That's quite confusing...

like image 834
em70 Avatar asked Feb 23 '11 19:02

em70


1 Answers

The ZeroMQ documentation notes that this is a problem with PUSH/PULL setups and suggests the following pattern: an addition of a REP/REQ setup to provide node coordination when you're expecting a fixed number of subscribers. However, if you are not able to know the number of subscribers in advance, you should consider changing your protocol to be more resilient to these conditions.

Synchronized publisher in C (from ZGuide)

//
//  Synchronized publisher
//
#include "zhelpers.h"

//  We wait for 10 subscribers
#define SUBSCRIBERS_EXPECTED  10

int main (void) 
{
    s_version_assert (2, 1);
    void *context = zmq_init (1);

    //  Socket to talk to clients
    void *publisher = zmq_socket (context, ZMQ_PUB);
    zmq_bind (publisher, "tcp://*:5561");

    //  Socket to receive signals
    void *syncservice = zmq_socket (context, ZMQ_REP);
    zmq_bind (syncservice, "tcp://*:5562");

    //  Get synchronization from subscribers
    int subscribers = 0;
    while (subscribers < SUBSCRIBERS_EXPECTED) {
        //  - wait for synchronization request
        char *string = s_recv (syncservice);
        free (string);
        //  - send synchronization reply
        s_send (syncservice, "");
        subscribers++;
    }
    //  Now broadcast exactly 1M updates followed by END
    int update_nbr;
    for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
        s_send (publisher, "Rhubarb");

    s_send (publisher, "END");

    zmq_close (publisher);
    zmq_close (syncservice);
    zmq_term (context);
    return 0;
}
like image 175
user7116 Avatar answered Nov 02 '22 05:11

user7116