I have a simple PUSH/PULL
ZeroMQ code in Python. It looks like below.
def zmqtest(self):
print('zmq')
Process(target=start_consumer, args=('1', 9999)).start()
Process(target=start_consumer, args=('2', 9999)).start()
ctx = zmq.Context()
socket = ctx.socket(zmq.PUSH)
socket.bind('tcp://127.0.0.1:9999')
# sleep(.5) # I have to wait here...
for i in range(5):
socket.send_unicode('{}'.format(i))
The problem is I have to wait more than .5 second before sending message, otherwise only one consumer process can receive a message. If I wait more than .5 second, everything looks fine.
I guess it takes a while before the socket binding to settle down, and it is done asynchronously.
I wonder if there's a more reliable way to know when the socket is ready.
No, there's no method in the API to check if a socket is connected. ZeroMq abstracts the network; client and server connections are completely transparent to the peer making the connection.
ZeroMQ patterns are implemented by pairs of sockets with matching types. The built-in core ZeroMQ patterns are: Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) is an asynchronous messaging library, aimed at use in distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ZeroMQ system can run without a dedicated message broker; the zero in the name is for zero broker.
The communication is bidirectional. There can only be one connected peer. The server listens on a certain port and a client connects to it.
Let's damage first a bit the terminology.
ZeroMQ is a great framework. Each distributed-system's client, willing to use it ( except using just the inproc://
transport class ), first instantiates an async data-pumping engine .. the Context()
instance(s), as needed.
Each Scalable Formal Communication Pattern { PUSH | PULL | ... | XSUB | SUB | PAIR }
does not create a socket,
but
rather instantiates an access-point, that may later .connect()
or .bind()
to some counterparty ( another access-point, of a suitable type, in some Context()
instance, be it local or not ( again, the local-inproc://
-only infrastructures being the known exception to this rule ) ).
In this sense, an answer to a question "When the socket is ready?" requires an end-to-end investigation "across" the distributed-system, handling all the elements, that participate on the socket-alike behaviour's implementation.
For this, your agent may self-connect a receiving access-point ( working as a PULL
archetype ), so as to "sniff", when the local-end Context()
instance has reached an RTO-state + a .bind()
- created O/S L3+ interface starts distributing the intended agent's-PUSH
-ed messages.
This part can have an indirect or an explicit testing. An indirect way may use a message-embedded index. That can contain a raising number ( an ordinal ), which bears a weak information about an order. Given the PUSH
-side message-routing strategy is Round-robin, the local-agent can be sure, that until it's local PULL
-access-point receives all messages indicating a contiguous sequence of ordinals, there is no other "remote"-PULL
-ing agent in an RTO-state. Once the "local" PULL
-access-point receives "gap" in the stream of ordinals, that means ( sure, only in the case all the PUSH
's .setsockopt()
-s were setup properly ) there is another -- non-local -- PULL
-ing agent in an RTO-state.
Maybe yes, maybe not. The point was to better understand the new challenges that any distributed-system has to somehow cope with.
The nature of multi-stage message queuing, multi-layered implementation ( local-PUSH
-agent's-code, local Context()
-thread(s), local-O/S, local-kernel, LAN/WAN, remote-kernel, remote-O/S, remote Context()
-thread(s), remote-PULL
-agent's-code to name just a few ) and multi-agent behaviour simply introduce many places, where an operation may gain latency / block / deadlock / fail in some other manner.
Yes, a walk on a wild-side.
Nevertheless, one may opt to use a much richer, explicit signalling ( besides the initially thought just a raw-data transport ) and help to solve the context-specific, signalling-RTO-aware behaviour inside the multi-agent worlds, that may better reflect the actual situations and survive also the other issues that start to appear in non-monolythic worlds of distributed-systems.
Explicit signalling is one way to cope with.
Recent API versions started to add more options to fine-tune the ZeroMQ behaviour for particular use-cases. Be sure to read carefully all details available to setup Context()
-instance to tweak the socket instance access-point behaviour, so that it best matches your distributed-system signalling + transport needs:
.setsockopt( ZMQ_LINGER, 0 ) # always, indeed ALWAYS
.setsockopt( ZMQ_SNDBUF, .. ) # always, additional O/S + kernel rules apply ( read more about proper sizing )
.setsockopt( ZMQ_SNDHWM, .. ) # always, problem-specific data-engineered sizing
.setsockopt( ZMQ_TOS, .. ) # always, indeed ALWAYS for critical systems
.setsockopt( ZMQ_IMMEDIATE, .. ) # prevents "loosing" messages pumped into incomplete connections
and many more. Without these, design would remain nailed into a coffin in the real-world transaction's jungle.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With