I am trying to get a python program to communicate with another python program via zeromq by using the request-reply pattern. The client program should send a request to the server program which replies.
I have two servers such that when one server fails the other takes over. Communication works perfect when the first server works, however, when the first server fails and when I make a request to the second server, I see the error:
zmp.error.ZMQError: Operation cannot be accomplished in current state
Code of the server 1:
# Run the server
while True:
# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5677")
data = sock.recv().decode("utf-8")
res = "Recvd"
sock.send(res.encode('utf-8'))
Code of the server 2:
# Run the server
while True:
# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5877")
data = sock.recv().decode("utf-8")
res = "Recvd"
sock.send(res.encode('utf-8'))
Code of client:
# ZeroMQ Context For distributed Message amogst processes
context = zmq.Context()
sock_1 = context.socket(zmq.REQ)
sock_2 = context.socket(zmq.REQ)
sock_1.connect("tcp://127.0.0.1:5677")
sock_2.connect("tcp://127.0.0.1:5877")
try:
sock_1.send(data.encode('utf-8'), zmq.NOBLOCK)
socks_1.setsockopt(zmq.RCVTIMEO, 1000)
socks_1.setsockopt(zmq.LINGER, 0)
data = socks_1.recv().decode('utf-8') #receive data from the main node
except:
try:
#when server one fails
sock_2.send(data.encode('utf-8'), zmq.NOBLOCK)
socks_2.setsockopt(zmq.RCVTIMEO, 1000)
socks_2.setsockopt(zmq.LINGER, 0)
data = socks_2.recv().decode('utf-8')
except Exception as e:
print(str(e))
What is the problem with this approach? How can I resolve this?
ZeroMQ is a library that allows you to perform low-level message passing, but unlike message-oriented middleware, an ØMQ system can run without a dedicated message broker. To understand ØMQ, you need to think in the sense of network sockets that carry atomic messages across various transports.
The most important thing is to use socket. close() to close the client connection. Secondly, the LINGER parameter can be set to a low value or zero. This clears the buffer after the timeout value from the time the socket is closed.
ZMQ. Context Context is an object serving as a container for all the sockets of a single process. By creating a new context, you start one or more input/output threads: DEFINE context ZMQ.Context. Associated methods: socket()
REQ/REP
deadlocking!While the ZeroMQ is a powerful framework, understanding its internal composition is necessary for robust and reliable distributed systems design and prototyping.
After a closer look, using a common REQ/REP
Formal Communication Pattern may leave ( and does leave ) counter-parties in a mutual dead-lock: where one is expecting the other to do a step, which will be never accomplished, and there is no way to escape from the deadlocked state.
For more illustrated details and FSA-schematic diagram, see this post
Next, a fail-over system has to survive any collisions of its own components. Thus, one has to design well the distributed system state-signalling and avoid as many dependencies on element-FSA-design/stepping/blocking as possible, otherwise, the fail-safe behaviour remains just an illusion.
Always handle resources with care, do not consider components of the ZeroMQ smart-signalling/messaging as any kind of "expendable disposables", doing so might be tolerated in scholar examples, not in production system environments. You still have to pay the costs ( time, resources allocations / de-allocations / garbage-collection(s) ). As noted in comments, never let resources creation/allocation without a due control. while True: .socket(); .bind(); .send();
is brutally wrong in principle and deteriorating the rest of the design.
On server side, "receive" and "send" pair is critical. I was facing a simiar issue, while socket.send was missed.
def zmq_listen():
global counter
message = socket_.recv().decode("utf-8")
logger.info(f"[{counter}] Message: {message}")
request = json.loads(message)
request["msg_id"] = f"m{counter}"
ack = {"msg_id": request["msg_id"]}
socket_.send(json.dumps(ack).encode("utf-8"))
return request
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With