Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a Publish / Subscribe architecture using the CZMQ-4.0.2 new zsock API?

Tags:

c++

c

zeromq

I want to create a Publish / Subscribe architecture, using CZMQ-4.0.2, but I am not able to understand how to use the new zsock APIs.

Can anyone point me to some examples using the new APIs?

like image 697
user226869 Avatar asked Jul 10 '17 07:07

user226869


1 Answers

tldr;

Examples are on the bottom of the site

Little explanation

I'm assuming that you're asking for CZMQ specific usage, not how to use ZeroMQ sockets, and what are the quirks of PUB/SUB pattern.

When using CZMQ you don't need to worry about context, it is done internally. zsock_new functions family returns pointer to zsock_t, an opaque identifier for socket. You need to remember to call zsock_destroy(&socket) when you're done with it, to avoid memory leaks.

In most common usage you don't need to worry about connecting and binding because zsock_new_XXX takes care of that. To know what action was taken you can look to manual.

//  Create a PUB socket. Default action is bind.
CZMQ_EXPORT zsock_t *
    zsock_new_pub (const char *endpoint);
//  Create a SUB socket, and optionally subscribe to some prefix string. Default
//  action is connect.
CZMQ_EXPORT zsock_t *
    zsock_new_sub (const char *endpoint, const char *subscribe);

If you're planning to do some unusual binding/connecting, you can add a prefix to endpoint. @ indicates bind, > connect.

zsock_t *sock = zsock_new_push("@ipc://test");

Now, to send message you can use plenty of methods (zsock_send, zmsg_send, zstr_send, zstr_sendx, zstr_sendf, zframe_send), most generic is zsock_send. It has printf like prototype, where you need to pass a picture of a message. Each char in this string represents single frame in message (or more frames because you can also pass another message). It is described in here:

//  Send a 'picture' message to the socket (or actor). The picture is a
//  string that defines the type of each frame. This makes it easy to send
//  a complex multiframe message in one call. The picture can contain any
//  of these characters, each corresponding to one or two arguments:
//
//      i = int (signed)
//      1 = uint8_t
//      2 = uint16_t
//      4 = uint32_t
//      8 = uint64_t
//      s = char *
//      b = byte *, size_t (2 arguments)
//      c = zchunk_t *
//      f = zframe_t *
//      h = zhashx_t *
//      U = zuuid_t *
//      p = void * (sends the pointer value, only meaningful over inproc)
//      m = zmsg_t * (sends all frames in the zmsg)
//      z = sends zero-sized frame (0 arguments)
//      u = uint (deprecated)
//
//  Note that s, b, c, and f are encoded the same way and the choice is
//  offered as a convenience to the sender, which may or may not already
//  have data in a zchunk or zframe. Does not change or take ownership of
//  any arguments. Returns 0 if successful, -1 if sending failed for any
//  reason.
CZMQ_EXPORT int
zsock_send (void *self, const char *picture, ...);

One what may be unclear is this void *self, it is actually our zsock_t * returned from zsock_new. In prototype, it is declared as void * because this function also accepts zactor_t *.

Important: Does not change or take ownership of any arguments.. You need to free/destroy data after sending.

Receiving looks very similar. It is like sscanf, and zsock_recv creates objects, so again, you need to take care of memory.

Big difference in behavior between ZeroMQ and CZMQ is LINGER socket option. For ZeroMQ it was infinite (-1), where CZMQ has a default value of 0 (no blocking). So anytime when you'll have zsock_send followed by zsock_destroy, your message may not be delivered. Linger value can be set individually for the socket by using zsock_set_linger, or globally zsys_set_linger.

Example of Publisher

#include <czmq.h>

int main(int argc, char ** argv) {
  zsock_t *socket = zsock_new_pub("ipc://example.sock");
  assert(socket);

  while(!zsys_interrupted) {
    zsys_info("Publishing");
    zsock_send(socket, "sss", "TOPIC", "MESSAGE PART", "ANOTHER");
    zclock_sleep(1000);
  }

  zsock_destroy(&socket);
  return 0;
}

Example of Subscriber

#include <czmq.h>

int main(int argc, char ** argv) {
  zsock_t *socket = zsock_new_sub("ipc://example.sock", "TOPIC");
  assert(socket);

  char *topic;
  char *frame;
  zmsg_t *msg;
  int rc = zsock_recv(socket, "sm", &topic, &msg);
  assert(rc == 0);

  zsys_info("Recv on %s", topic);
  while(frame = zmsg_popstr(msg)) {
    zsys_info("> %s", frame);
    free(frame);
  }
  free(topic);
  zmsg_destroy(&msg);

  zsock_destroy(&socket);
  return 0;
}
like image 134
Jakub Piskorz Avatar answered Oct 20 '22 20:10

Jakub Piskorz