Scenario :
We were evaluating ZeroMQ
(specifically jeroMq
) for an event driven mechanism.
The application is distributed where multiple services (both publishers and subscribers are services) can exist either in the same jvm or in distinct nodes, which depends on the deployment architecture.
Observation
For playing around I created a pub
/sub
pattern with inproc:
as the transport , using jero mq (version :0.3.5)
Question
Is using inproc:
along with pub
/sub
feasible?
Tried googling but couldn't find anything specific, any insights?
Code sample for pub
/sub
with inproc:
The working code sample for inproc pub sub using jero mq (version :0.3.5), would be useful for someone later visiting this post. One publisher publishing topics A
and B
, and two subscribers receiving A
and B
separately
/**
* @param args
*/
public static void main(String[] args) {
// The single ZMQ instance
final Context context = ZMQ.context(1);
ExecutorService executorService = Executors.newFixedThreadPool(3);
//Publisher
executorService.execute(new Runnable() {
@Override
public void run() {
startPublishing(context);
}
});
//Subscriber for topic "A"
executorService.execute(new Runnable() {
@Override
public void run() {
startFirstSubscriber(context);
}
});
// Subscriber for topic "B"
executorService.execute(new Runnable() {
@Override
public void run() {
startSecondSubscriber(context);
}
});
}
/**
* Prepare the publisher and publish
*
* @param context
*/
private static void startPublishing(Context context) {
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("inproc://test");
while (!Thread.currentThread().isInterrupted()) {
// Write two messages, each with an envelope and content
try {
publisher.sendMore("A");
publisher.send("We don't want to see this");
LockSupport.parkNanos(1000);
publisher.sendMore("B");
publisher.send("We would like to see this");
} catch (Throwable e) {
e.printStackTrace();
}
}
publisher.close();
context.term();
}
/**
* Prepare and receive through the subscriber
*
* @param context
*/
private static void startFirstSubscriber(Context context) {
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("B".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber1 " + address + " : " + contents);
}
subscriber.close();
context.term();
}
/**
* Prepare and receive though the subscriber
*
* @param context
*/
private static void startSecondSubscriber(Context context) {
// Prepare our context and subscriber
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("A".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber2 " + address + " : " + contents);
}
subscriber.close();
context.term();
}
ZeroMQ provides a whole slew of language APIs which run on most operating systems and allows you to communicate seamlessly between all sorts of programs. It also provides a collection of patterns, such as request-reply and publish-subscribe which assist you in creating and structuring your network.
The difference is that a PUB socket sends the same message to all subscribers, whereas PUSH does a round-robin amongst all its connected PULL sockets. In your example, if you send just a single message from the root, then all the subscribers will receive it (barring slow subscribers, etc.) but only 1 worker.
To track subscriptions, Redis uses a global variable pubsub_channels which maps channel names to sets of subscribed client objects. A client object represents a TCP-connected client by tracking that connection's file descriptor.
The ZMQ inproc
transport is intended for use within a single process, between different threads. When you say "can exist either in the same jvm or in distinct nodes" (emphasis mine) I assume you mean that you're spinning up multiple processes as distributed services rather than multiple threads within a single process.
If that's the case, then no, what you're trying to do won't work with inproc
. PUB-SUB/inproc
would work fine within a single process between multiple threads.
Edit to address further questions in the comments:
The reason to use a transport like inproc
or ipc
is because it's a little more efficient (faster) than the tcp transport when you're in the right context to use them. You could conceivably use a mixture of transports, but you'll always have to bind and connect on the same transport to make it work.
This means that each node would need up to three PUB
or SUB
sockets - a tcp
publisher to talk to nodes on remote hosts, an ipc
publisher to talk to nodes on different processes on the same host, and an inproc
publisher to talk to nodes in different threads in the same process.
Practically, in most cases you'd just use the tcp
transport and only spin up one socket for everything - tcp
works everywhere. It could make sense to spin up multiple sockets if each socket is responsible for a particular kind of information.
If there's a reason that you'll always be sending one message type to other threads and a different message type to other hosts, then multiple sockets makes sense, but in your case it sounds like, from the perspective of one node, all other nodes are equal. In that case I would use tcp
everywhere and be done with it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With