Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I clean up properly when recv is blocking?

Tags:

c++

zeromq

Consider the example code below (I typed it up quickly as an example, if there are errors it doesn't matter - I'm interested in the theory).

bool shutDown = false; //global

int main()
{
  CreateThread(NULL, 0, &MessengerLoop, NULL, 0, NULL);
  //do other programmy stuff...
}


DWORD WINAPI MessengerLoop( LPVOID lpParam )
{
  zmq::context_t context(1);
  zmq::socket_t socket (context, ZMQ_SUB);
  socket.connect("tcp://localhost:5556");
  socket.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);

  while(!shutDown)
  {
    zmq_msg_t getMessage;
    zmq_msg_init(&getMessage);
    zmq_msg_recv (&getMessage, socket, 0); //This line will wait forever for a message
    processMessage(getMessage); 
  }
}

A thread is created to wait for incoming messages and to handle them appropriately. The thread is looping until shutDown is set to true.

In ZeroMQ the Guide specifically states what must be cleaned up, namely the messages, socket and context.

My issue is: Since recv will wait forever for a message, blocking the thread, how can I shut down this thread safely if a message is never received?

like image 386
Ian Avatar asked Apr 25 '13 10:04

Ian


2 Answers

The blocking call will exit in a few ways. First, and this depends on your language and binding, an interrupt (Ctrl-C, SIGINT, SIGTERM) will exit the call. You'll get back (again, depending on your binding) an error or a null message (libzmq returns an EINTR error).

Second, if you terminate the context in another thread, the blocking call will also exit (libzmq returns an ETERM error).

Thirdly, you can set timeouts on the socket so it will return in any case after some timeout, if there's no data. We don't often do this but it can be useful in some cases.

Finally, what we do in practice is never do blocking receives but use zmq_poll to find out when sockets have messages waiting, then receive from those sockets. This is how you scale out to handling more sockets.

like image 141
Pieter Hintjens Avatar answered Sep 28 '22 07:09

Pieter Hintjens


You can use non-blocking call flag ZMQ_DONTWAIT

  while(!shutDown)
  {
    zmq_msg_t getMessage;
    zmq_msg_init(&getMessage);
    while(-1 == zmq_msg_recv(&getMessage, socket, ZMQ_DONTWAIT))
    {
      if (EAGAIN != errno || shutDown)
      {
        break;
      }
      Sleep(100);
    }
    processMessage(getMessage); 
  }
like image 27
inkooboo Avatar answered Sep 28 '22 06:09

inkooboo