I'm very new to ZeroMQ. I've read the guide and am currently going through the examples as well as looking at other relevant info around the web. I'm having some indecision with what message pattern(s) to use or if I should use a combination of 2 patterns.
I have an existing software application that has a home grown messaging system that is in need of replacement. I have a fairly simple architecture:
|Client|<----->|driver1|
|
|---|driverN|
Only one "client" connects to a driver at a time currently, and there may be many drivers.
(in actuality, the client, in this case, isn't truly my client application, but a middleman of sorts. For this discussion, it can be treated as a client)
Messaging:
Drivers may exist on the same system or remotely on a LAN. This is not a public network.
I'm currently thinking that I would have a pub and sub socket on each driver and a sub/pub socket on the client. Messages shouldn't be dropped once a connection is made. I assume that the client would subscribe to the different driver data types and the driver would then subscribe to the clients command messages.
Important considerations: low latency, lowest possible bandwidth overhead.
I would appreciate any suggestions or recommendations! Thanks in advance!
You picked a great learning exercise, that's for sure!
Read up on these, they provide the basic implementation for request/reply using a custom router-to-router proxy with polling and should address your client-to-device problem.
The solution is synchronous, so any request sent from the client blocks until it gets a response. Personally I would use async for both request and reply for total flexibility, but that solution is way more complex. There are, however, examples in the book called Freelance
and Dealer/Router
that illustrate async request/reply.
Here's an example for synchronous many-to-many request/reply. You MUST know how ZeroMq enveloping works to fully understand the mechanics of this approach; see example lbbroker1.
Set the client identity with setIdentity()
; important for response routing.
Client sends requests for device1
, device2
, etc, in a loop; if device exists, status messages returned from specific device, otherwise, "no device" returned to client.
Socket client = context.socket(ZMQ.REQ);
client.setIdentity("client1".getBytes());
client.connect("tcp://localhost:5550");
for( int i = 0; i < 5; i++){
client.send("device" + i);
String reply = client.recvStr();
log("Received message: " + reply);
Thread.currentThread().sleep(1000);
}
Device sets id just like client for unique routing.
Device sends device.send("DEVICEREADY")
to server to signal online availability.
Device does recvStr()
three times to read full envelope from server.
String deviceId = "device1"
Socket device = context.socket(ZMQ.REQ);
device.setIdentity(deviceId.getBytes());
device.connect( "tcp://localhost:5560");
device.send( "DEVICEREADY");
while (!Thread.currentThread().isInterrupted()) {
String clientAddress = device.recvStr();
String empty = device.recvStr();
String clientRequest = device.recvStr();
//create envelope to send reply to same client who made request
device.sendMore(clientAddress);
device.sendMore("");
device.send( "stauts on " + deviceId + " is ok");
}
A custom proxy using ROUTER sockets; clients connect to frontend ROUTER socket while devices connect to backend router. Server polls on both sockets for messages.
Context context = ZMQ.context(1);
Socket frontend = context.socket(ZMQ.ROUTER);
Socket backend = context.socket(ZMQ.ROUTER);
frontend.bind( "tcp://localhost:5550");
backend.bind( "tcp://localhost:5560");
Poller poller = new Poller(2);
poller.register(frontend, Poller.POLLIN);
poller.register(backend, Poller.POLLIN);
while (!Thread.currentThread().isInterrupted()) {
poller.poll();
//frontend poller
if (poller.pollin(0)) {
String clientId = frontend.recvStr();
String empty = frontend.recvStr(); //empty frame
String deviceId = frontend.recvStr();
//if client is requesting to talk to nonexistent deviceId,
//return message "no device", otherwise, create envelope and send
//request on backend router to device.
if( deviceMap.get( deviceId) == null ){
frontend.sendMore(clientId);
frontend.sendMore("");
frontend.send("no deviceId: " + deviceId);
} else {
//request envelope addressed to specific device
backend.sendMore(deviceId);
backend.sendMore("");
backend.sendMore(clientId);
backend.sendMore("");
backend.send("hello from " + clientId);
}
}
//backend poller
if(poller.pollin(1)){
String deviceId = backend.recvStr();
String empty = backend.recvStr();
String clientId = backend.recvStr();
//device signaling it's ready
//store deviceId in map, don't send a response
if( clientId.equals("DEVICEREADY"))
deviceMap.put(deviceId, deviceId);
else {
//the device is sending a response to a client
//create envelope addressed to client, send on frontend socket
empty = backend.recvStr();
String reply = backend.recvStr();
frontend.sendMore(clientId);
frontend.sendMore("");
frontend.send(reply);
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With