Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is ZeroMQ PUB enqueing messages with no connected subscribers? ( Well, "disconnected" SUB-s )

Tags:

zeromq

I am seeing a strange behavior using ZMQ_PUB.

I have a producer which .connect()-s to different processes
that .bind() on ZMQ_SUB sockets.

The subscribers all .bind(), the publisher .connect()-s.

When a producer starts, it creates a ZMQ_PUB socket and .connect()-s it to different processes. It then immediately starts sending messages at a regular period.

As expected, if there are no connected subscribers, it drops all messages, until a subscriber starts.

The flow works normal then, when a subscriber starts, it receives the messages from that moment on.

Now, the problem is:

  1. I disconnect the subscriber ( stopping the process ).
  2. There are no active subscribers at this point, as I stopped the only one. The producer continues sending messages, which should be dropped, as there are no connected subscribers anymore…
  3. I restart the original subscriber, it binds, the publisher reconnects... and the subscriber receives all messages produced in the meantime !!

So what I see is that the producer enqueued all messages while the subscriber was down. As soon as the socket reconnected, because the subscriber process restarted, it sent all queued messages.

As I understood from here, a publisher should drop all sent messages when there are no connected subscribers:

ZeroMQ examples

"A publisher has no connected subscribers, then it will simply drop all messages."

Why is this happening?

By the way, I am using C++ over linux for these tests.

I tried setting a different identity on the subscriber when it binds, but it didn't work. Publisher still enqueues messages, and delivers them all when subscriber restart.

Thanks in advance,

Luis


UPDATE:

IMPORTANT UPDATE!!!!!
Before posting this question
I had tried different solutions. One was to set ZMQ_LINGER to 0, which didn't work.
I added ZMQ:IMMEDIATE, and it worked, but I just found out that ZMQ:IMMEDIATE alone does not work. It requires also ZMQ_LINGER.
Luis Rojas 3 hours ago

UPDATE: As per request, I am adding some simple test cases to show my point. One is a simple subscriber, which runs on command line and receives the uri where to bind, for instance :

$ ./sub tcp://127.0.0.1:50001

The other is a publisher, which receives a list of uris to connect to, for instance :

./pub tcp://127.0.0.1:50001 tcp://127.0.0.1:50002

The subscriber receives up to 5 messages, then closes socket and exit. We can see on wireshark the exchange of FIN/ACK, both ways, and how the socket moves to TIME_WAIT state. Then, publisher starts sending SYN, trying to reconnect (that probes the ZMQ_PUB knows that connection closed)

I am explicitely not unsubscribing the socket, just closing it. In my opinion, if the socket closed, the publisher should automatically end any subscription for that connection.

So what I see is : I start subscriber(one or more), I start publisher, which starts sending messages. Subscriber receives 5 messages and ends. In the meantime publisher continues sending messages, WITH NO CONNECTED SUBSCRIBER. I restart the subscriber, and receives immediately several messages, because they were queued at the publishers side. I think those queued messages break the Publish/Subscribe model, where messages should be delivered only to connected subscribers. If a susbcriber closes the connection, messages to that subscriber should be dropped. Even more, when subscriber restarts, it may decide to subscribe to other messages, but it will still receive those subscribed by a "previous encarnation" that was binded at same port.

My proposal is that ZMQ_PUB (on connect mode), when detecting a socket disconnection, should clear all subscriptions on that socket, until it reconnects and the NEW subscriber decides to resubscribe.

I apologize for language mistakes, but english is not my native language.

Pub's code:

#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>

#include <string>
#include <zeromq/zmq.hpp>

int main( int argc, char *argv[] )
{
    if ( argc < 2 )
    {
        fprintf( stderr, "Usage : %s <remoteUri1> [remoteUri2...]\n",   
        basename( argv[0] ) );
        exit ( EXIT_FAILURE );
    }

    std::string pLocalUri( argv[1] );
    zmq::context_t localContext( 1 );
    zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_PUB );
    if ( NULL == pSocket )
    {
        fprintf( stderr, "Couldn't create socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }

    int i;
    try
    {
        for ( i = 1; i < argc; i++ )
        {
            printf( "Connecting to [%s]\n", argv[i] );
            {
                pSocket->connect( argv[i] );
            }
        }
    }
    catch( ... )
    {
        fprintf( stderr, "Couldn't connect socket to %s. Aborting...\n", argv[i] );
        exit ( EXIT_FAILURE );
    }

    printf( "Publisher Up and running... sending messages\n" );
    fflush(NULL);

    int msgCounter = 0;
    do
    {
        try
        {
            char msgBuffer[1024];
            sprintf( msgBuffer, "Message #%d", msgCounter++ );
            zmq::message_t outTask( msgBuffer, strlen( msgBuffer ) + 1 );
            printf("Sending message [%s]\n", msgBuffer );
            pSocket->send ( outTask );
            sleep( 1 );
        }
        catch( ... )
        {
            fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
            exit ( EXIT_FAILURE );
        }
    }
    while ( true );

    exit ( EXIT_SUCCESS );
}

Sub's code

#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>

#include <string>
#include <zeromq/zmq.hpp>

int main( int argc, char *argv[] )
{
    if ( argc != 2 )
    {
        fprintf( stderr, "Usage : %s <localUri>\n", basename( argv[0] ) );
        exit ( EXIT_FAILURE );
    }

    std::string pLocalUri( argv[1] );
    zmq::context_t localContext( 1 );
    zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_SUB );
    if ( NULL == pSocket )
    {
        fprintf( stderr, "Couldn't create socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }
    try
    {
        pSocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
        pSocket->bind( pLocalUri.c_str() );
    }
    catch( ... )
    {
        fprintf( stderr, "Couldn't bind socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }

    int msgCounter = 0;
    printf( "Subscriber Up and running... waiting for messages\n" );
    fflush( NULL );

    do
    {
        try
        {
            zmq::message_t inTask;
            pSocket->recv ( &inTask );
            printf( "Message received : [%s]\n", inTask.data() );
            fflush( NULL );
            msgCounter++;
        }
        catch( ... )
        {
            fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
            exit ( EXIT_FAILURE );
        }
    }
    while ( msgCounter < 5 );

    // pSocket->setsockopt( ZMQ_UNSUBSCRIBE, "", 0 ); NOT UNSUBSCRIBING
    pSocket->close();
    exit ( EXIT_SUCCESS );
}
like image 569
Kalki70 Avatar asked Jan 17 '17 14:01

Kalki70


People also ask

How does ZeroMQ pub/sub work?

It starts by immediately sending messages to each of a thousand topics, and then it sends one update a second to a random topic. A subscriber connects, and subscribes to a topic. Without LVC, a subscriber would have to wait an average of 500 seconds to get any data.

What is rep socket?

A REP socket is used by a service to receive requests from and send replies to a client. This socket type allows only an alternating sequence of receive and subsequent send calls. Each request received is fair-queued from among all clients, and each reply sent is routed to the client that issued the last request.

Does ZeroMQ guarantee delivery?

ZeroMQ guarantees to deliver all the parts (one or more) for a message, or none of them. This allows you to send or receive a list of frames as a single on-the-wire message. A message (single or multipart) must fit in memory.

Can subscriber send Zmq?

It's actually can be done, I'm not sure it is the correct pattern to your problem, but anyway: You should use XSub and XPub, with XSUB you send subscription by calling send on the socket, but make sure you prefix the byte array with 0x1 byte. (0x0 to unsubscribe). In your case it will be one byte array set to 0x1.


1 Answers

Q: Why is this happening?

Because the SUB is actually still connected ( not "disconnected" enough ).

Yes, might be surprising, but killing the SUB-process, be it on .bind()- or .connect()-attached side of the socket's transport-media, does not mean, the Finite-State-Machine of the I/O-pump has "moved" into disconnected-state.

Given that, the PUB-side has no other option but to consider the SUB-side still live and connected ( even while the process was silently killed beyond the line-of-sight of the PUB-side ) and for such "distributed"-state there is a ZeroMQ protocol-defined behaviour ( a PUB-side duty ) to collect all the interim messages for a ( yes, invisibly dead ) SUB-scriber, which the PUB-side still considers fair to live ( but might be having just some temporally intermitent issues somewhere low, on the transport I/O-levels or some kinds of remote CPU-resources starvations or concurrency-introduced transiently intermitent { local | remote } blocking states et al ).

So it buffers...

In case your assassination of the SUB-side agent would appear to have been a bit more graceful ( using the zeroised ZMQ_LINGER + an adequate .close() on the socket-resource instance ) the PUB-side will recognise the "distributed"-system system-wide Finite-State-Automaton shift into an indeed "DISCONNECT"-ed state and a due change-of-behaviour will happen on the PUB-side of the "distributed-FSA", not storing any messages for this "visibly" indeed "DISCONNECT"-ed SUB -- exactly what the documentation states.

"Distributed-FSA" has but quite a weak means to recognise state-change events "beyond it's horizon of localhost contols. KILL-ing a remote process, which implements some remarkable part of the "distributed-FSA" is a devastating event, not a method how to keep the system work. A good option for such external risks might be


Sounds complex?

Oh, yes, it is complex, indeed. That's exactly why ZeroMQ solved this for us, to be free and enjoy designing our application architectures based on top of these ( already solved ) low level complexities.


Distributed-system FSA ( a system-wide FSA of layered composition of sub-FSA-s )

To just imagine what is silently going on under the hood, imagine just having a simple, tandem pair of FSA-FSA - exactly what the pair of .Context() instances try to handle for us in the simplest ever 1:1 PUB/SUB scenario where the use-case KILL-s all the sub-FSA-s on the SUB-side without giving a shot to acknowledge the intention to the PUB-side. Even the TCP-protocol ( living both on the PUB-side and SUB-side ) has several state-transition from [ESTABLISHED] to [CLOSED] state.


A quick X-ray view on a distributed-systems' FSA-of-FSA-s

( just the TCP-protocol FSA was depicted for clarity )

PUB-side:

enter image description here


.socket( .. ) instance's behaviour FSA:

enter image description here


SUB-side:

enter image description here

(Courtesy nanomsg).

like image 158
user3666197 Avatar answered Sep 18 '22 05:09

user3666197