I have set up two small scripts imitating a publish and subscribe procedure with pyzmq. However, I am unable to send messages over to my subscriber client using the inproc
transport. I am able to use tcp://127.0.0.1:8080
fine, just not inproc.
pub_server.py
import zmq
import random
import sys
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("inproc://stream")
while True:
socket.send_string("Hello")
time.sleep(1)
sub_client.py
import sys
import zmq
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '')
socket.connect("inproc://stream")
for x in range (5):
string = socket.recv()
print(string)
How can I successfully alter my code so that I'm able to use the inproc transport method between my two scripts?
EDIT:
I have updated my code to further reflect @larsks comment. I am still not receiving my published string - what is it that I am doing wrong?
import threading
import zmq
def pub():
context = zmq.Context()
sender = context.socket(zmq.PUB)
sender.connect("inproc://hello")
lock = threading.RLock()
with lock:
sender.send(b"")
def sub():
context = zmq.Context()
receiver = context.socket(zmq.SUB)
receiver.bind("inproc://hello")
pub()
# Wait for signal
string = receiver.recv()
print(string)
print("Test successful!")
receiver.close()
if __name__ == "__main__":
sub()
As the name implies, inproc
sockets can only be used within the same process. If you were to rewrite your client and server such that there were two threads in the same process you could use inproc
, but otherwise this socket type simply isn't suitable for what you're doing.
The documentation is very clear on this point:
The in-process transport passes messages via memory directly between threads sharing a single ØMQ context.
Update
Taking a look at the updated code, the problem that stands out first is that while the documentation quoted above says "...between threads sharing a single ØMQ context", you are creating two contexts in your code. Typically, you will only call zmq.Context()
once in your program.
Next, you are never subscribing your subscriber to any messages, so even in the event that everything else was working correctly you would not actually receive any messages.
Lastly, your code is going to experience the slow joiner problem:
There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.
The pub/sub model isn't meant for single messages, nor is it meant to be a reliable transport.
So, to sum up:
inproc
sockets you're going to need to put your two functions into separate threads.There is an example using PAIR
sockets in the ZMQ documentation that might provide a useful starting point. PAIR
sockets are designed for coordinating threads over inproc
sockets, and unlike pub/sub sockets they are bidirectional and are not impacted by the "slow joiner" issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With